DoFn Deep Dive
1. Introduction
A DoFn (Do Function) is a class in Apache Beam that defines your custom parallel data processing logic. It is the engine inside a ParDo transform.
2. Why This Concept Exists
Built-in transforms like Map and Filter are great for simple tasks. However, real-world data processing often requires complex, multi-stage logic—such as parsing dynamic schemas, setting up external database connections, or grouping elements in bundles. A DoFn gives you access to lifecycle hooks to manage state, setup resources, and optimize processing.
3. Key Terminology
- DoFn Class: A Python class that inherits from
beam.DoFn. processmethod: The core method called for every element in the input PCollection.- Yielding vs Returning: A
DoFnyields elements usingyield(or returns an iterable) instead of usingreturnto support one-to-many outputs.
4. How It Works: The Lifecycle Hooks
When a runner executes a ParDo transform containing a DoFn, it creates worker instances that call specific hooks:
setup(): Called once per worker initialization (ideal for establishing database or API connections).start_bundle(): Called at the beginning of processing a bundle of elements.process(element): Called for every element in the PCollection.finish_bundle(): Called after a bundle of elements is processed.teardown(): Called once when the worker instance is destroyed (ideal for closing connections).
5. Visual Diagram
setup()
start_bundle()
process() [x N]
finish_bundle()
teardown()
6. Code Example
Creating a basic DoFn that yields elements:
import apache_beam as beam
class FilterNumbersDoFn(beam.DoFn):
def process(self, element):
if element >= 10:
yield element
# Applying the DoFn inside your pipeline
# output = input_pcoll | beam.ParDo(FilterNumbersDoFn())
7. Code Explanation
FilterNumbersDoFninherits frombeam.DoFn.- The
processmethod takeselementas an argument. - It checks the condition and uses
yield elementto output the record. - We apply it inside a pipeline using
beam.ParDo(FilterNumbersDoFn()).
8. Real Production Example
Connecting to an external lookup cache inside the setup method:
class DatabaseLookupDoFn(beam.DoFn):
def setup(self):
# Establish connection once per worker initialization
self.db_client = my_database_library.connect("host-name")
def process(self, element):
# Query database and emit record
user_info = self.db_client.query(element["user_id"])
yield {**element, "user_name": user_info.name}
def teardown(self):
# Close connection when worker shuts down
self.db_client.close()
9. Common Mistakes
- Instantiating non-serializable objects in
__init__: Do not create database connections inside the class constructor. The constructor runs on the developer's computer, not the worker machines. Create connections inside thesetup()method. - Returning values instead of iterables: The
processmethod must return an iterable (such as a list) or yield values. Returning a primitive type (like an integer) will throw a runtime error.
10. Interview Perspective
- Question: What is the difference between
setupandstart_bundle? - Answer:
setupruns once when the worker JVM/container starts up.start_bundleruns multiple times, once before processing each dynamic cluster batch (bundle) of records. - Question: Why is
yieldpreferred overreturn? - Answer: A
DoFnis one-to-many. It can output 0, 1, or multiple records for a single input record.yieldallows you to emit items sequentially as they are ready without holding the entire output list in memory.
11. Best Practices
- Never write local file outputs inside
process(). Worker local disks are ephemeral and isolated. - Catch connection errors in
setup()and implement retry strategies to prevent worker crashes.
12. Summary
DoFnis the processing class executed insideParDo.- Features lifecycle hooks:
setup,start_bundle,process,finish_bundle, andteardown. - Must yield elements or return an iterable.