Direct Runner
1. Introduction
The Direct Runner is the local execution engine for Apache Beam. It is designed to run pipelines on a single machine, utilizing multi-threading to mock distributed execution. It is the default runner in the Python SDK and is primarily used for testing, prototyping, and debugging pipeline logic.
2. Why This Concept Exists
Deploying and running a pipeline on a distributed cluster like Spark, Flink, or Dataflow can take minutes to spin up resources, execute, and return logs. During development, developers need a fast iteration loop.
The Direct Runner executes pipelines immediately in the local terminal using the CPU threads of your machine. It also runs extra validations to detect concurrency and serialization issues, helping you catch errors before cluster deployment.
3. Key Terminology
- Local Runner: An execution engine that runs code entirely within your local process space, rather than sending it to remote machines.
- Multi-threading: Spawning multiple execution threads to run transforms in parallel, simulating a cluster's worker nodes.
- In-Memory Execution: Processing and routing PCollection elements inside the RAM of the local machine.
4. How It Works
- Graph Construction: The SDK builds the pipeline DAG representation.
- Engine Launch: When the execution triggers, Direct Runner starts a local thread pool.
- Multi-Threaded Processing: It divides the elements of the PCollection and distributes them across threads to run your custom logic.
- Output: Results are output to local disk files, terminals, or local databases.
5. Visual Diagram
Local Machine Runtime
Direct Runner Engine Context
Thread 1
Worker Thread
Thread 2
Worker Thread
Thread 3
Worker Thread
Local Core Processing
In-Memory PCollection Outputs
6. Code Example
Here is a pipeline using the Direct Runner to process data and write it to local output files:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Configure pipeline to explicitly use DirectRunner
options = PipelineOptions(["--runner=DirectRunner"])
with beam.Pipeline(options=options) as p:
(p
| "CreateSampleData" >> beam.Create([
"Alice,24,Finance",
"Bob,32,Engineering",
"Charlie,28,Finance"
])
| "SplitFields" >> beam.Map(lambda row: row.split(","))
| "FilterFinance" >> beam.Filter(lambda fields: fields[2] == "Finance")
| "ExtractNames" >> beam.Map(lambda fields: fields[0])
# Write directly to local disk
| "WriteLocal" >> beam.io.WriteToText("local_output.txt"))
7. Code Explanation
--runner=DirectRunneris set programmatically. If you leavePipelineOptionsempty in Python, it defaults toDirectRunner.beam.Createloads the strings into a local PCollection.- The data is split and filtered in parallel across local CPU threads.
beam.io.WriteToText("local_output.txt")writes the output to the local project directory. In this case, it generates local files namedlocal_output.txt-00000-of-00001containingAliceandCharlie.
8. Real Production Example
A development team maintains an ingestion pipeline. Before pushing code changes to their code repository, their local build tests run a bash command executing the pipeline locally using the DirectRunner on small mock datasets (e.g. 100 rows). This checks for syntax issues, import errors, and logical bugs, saving time and compute costs.
9. Common Mistakes
- Production Scale OOM: Running Direct Runner on massive, production-sized files (e.g. 500GB). Direct Runner stores much of the active collection state in RAM; processing large datasets will crash your computer with an Out of Memory (OOM) error.
- Local Path Hardcoding: Using local paths (e.g.
C:\users\data.csv) in files. This works fine on the Direct Runner but will crash on Flink or Dataflow because remote VMs cannot access your local directory paths.
10. Interview Perspective
- Question: Why does the Direct Runner write output files with suffixes like
-00000-of-00001? - Answer: This is Beam’s default sharding behavior. PCollections are distributed datasets. Even when executing locally, Direct Runner simulates distributed writers by splitting the output into multiple file shards (determined by the number of active threads/writers).
- Question: Does Direct Runner support streaming?
- Answer: Yes. Direct Runner can process unbounded sources and apply windows and watermarks, allowing developers to test streaming logic locally before deploying to production.
11. Best Practices
- Use Direct Runner only for testing and debugging with small datasets (under 10,000 records).
- Always use temporary or parameterized files for local reads and writes to avoid cluttering your development workspace.
12. Summary
- DirectRunner is the default local runner in the Python SDK.
- It utilizes local CPU multi-threading to process data.
- Primarily used for testing, validation, and fast development loops.
- Cannot scale to large datasets due to local hardware limits.
- Local file paths must be parameterized before deploying to cloud engines.