Running Your First Pipeline
1. Introduction
Executing an Apache Beam pipeline is the final phase where the code definition is compiled and sent to the runner. To manage this phase effectively, you must know how to trigger execution from your terminal, read and parse console output logs, inspect generated data files, and handle runtime execution status codes.
2. Why This Concept Exists
Writing correct code is only half the battle. Once your pipeline executes, it runs on distributed hardware (or simulated multi-threaded processes). If errors occur during processing (like a record with a missing data field), standard IDE crash indicators might not display clearly. You need a structured approach to run, log, monitor, and verify your pipeline's execution lifecycle.
3. Key Terminology
logging: Python's built-in module for writing status logs at different severity levels (INFO, WARNING, ERROR).WaitUntilFinish: A method called on the pipeline result object that halts script execution until the remote runner finishes processing.- Console Output: The stdout/stderr stream from the terminal window running your execution command.
- Sharded Outputs: Output files written in parts (e.g.
out.txt-00000-of-00002) to accommodate concurrent writes.
4. How It Works
- Execution Command: You execute your Python script from the shell terminal (e.g.
python my_pipeline.py). - Initialization Logs: The terminal displays client compilation messages.
- Runner Handshake: The runner receives the graph and prints engine-specific details.
- Data Processing Logs: As elements are processed, custom logger calls inside your maps print logs to track progress.
- Result Validation: The process exits with a status code (0 for success, non-zero for failure). You inspect the files written to confirm output accuracy.
5. Visual Diagram
[ Run: python script.py ]
│
▼
[ Print Compilation Logs ]
│
▼
[ Execute Transforms ] — (logs warnings or info dynamically)
│
▼
[ Write Sharded Files ] — (creates output partitions)
│
▼
[ Confirm Status Code ] — (checks if job succeeded/failed)
6. Code Example
Here is a complete script demonstrating execution tracking, command line parameter parsing, and logging:
import logging
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run_logged_pipeline(argv=None):
# Setup argparse for input/output files
parser = argparse.ArgumentParser()
parser.add_argument("--input", default="input_words.txt")
parser.add_argument("--output", default="output_words.txt")
args, beam_args = parser.parse_known_args(argv)
# Configure logging level to INFO
logging.getLogger().setLevel(logging.INFO)
logging.info("Initializing logging pipeline configuration...")
# Initialize Pipeline Options
options = PipelineOptions(beam_args)
with beam.Pipeline(options=options) as p:
logging.info("Constructing pipeline graph...")
words = p | "CreateWords" >> beam.Create(["hello", "world", "beam"])
def process_word(word):
logging.info(f"Processing word record: {word}")
if len(word) < 5:
logging.warning(f"Short word encountered: {word}")
return word.upper()
uppercased = words | "Uppercase" >> beam.Map(process_word)
uppercased | "SaveOutput" >> beam.io.WriteToText(args.output)
logging.info("Pipeline execution completed successfully.")
if __name__ == "__main__":
run_logged_pipeline()
7. Code Explanation
logging.getLogger().setLevel(logging.INFO)ensures that INFO-level messages are output to the terminal.argparseparses the custom arguments--inputand--outputwhile passing remaining runner-specific arguments (beam_args) toPipelineOptions.- Inside
process_word,logging.info()prints details for every processed record, andlogging.warning()flags short elements. - The script waits for execution to complete before logging the final success message.
8. Real Production Example
In a CI/CD environment, pipelines are scheduled to run nightly using orchestration tools (like Apache Airflow or Google Cloud Composer). Airflow runs a CLI bash command to trigger the Beam script on Google Cloud Dataflow. Airflow monitors the execution logs and watches the result status code. If Dataflow returns a failure status, Airflow sends alert notifications to the engineering team.
9. Common Mistakes
- Silencing Logs: Not configuring Python's root logger. If you call
logging.info()without setting the logging level tologging.INFO, standard output logs will be silenced, leaving you with an empty terminal during local execution. - Expecting Sequential Console Logs: Assuming logs will print in order. Because Beam runs steps in parallel across threads or workers, log messages from different elements will interleave.
10. Interview Perspective
- Question: Why are my output files generated as
output.txt-00000-of-00001instead of exactlyoutput.txt? - Answer: Distributed pipelines write outputs in parallel using multiple threads or workers. To prevent write conflicts (race conditions), each worker writes to a separate "shard" or partition. If you want a single file, you can set the
num_shards=1parameter in the write transform (though this reduces write throughput). - Question: How do you check if a pipeline succeeded programmatically when executing asynchronously?
- Answer: You capture the pipeline result object
result = p.run()and inspect its state:result.wait_until_finish()blocks execution and returns a status enum likePipelineState.DONEorPipelineState.FAILED.
11. Best Practices
- Never print sensitive passwords or client keys to execution logs.
- Always configure log levels to
INFOorWARNINGin production to avoid cluttering storage logs with excessiveDEBUGlines.
12. Summary
- Pipelines are executed from terminals using standard shell runs.
- Use Python's
loggingmodule to track element states dynamically. - Outputs are written in sharded partitions by default.
- The pipeline result object tracks runtime execution status.
- Command-line parameters must be split between custom inputs and runner flags.