Custom CombineFn
1. Introduction
A Custom CombineFn is created by subclassing apache_beam.CombineFn to write custom, highly optimized aggregations. When simple combiners like sum, max, or min are insufficient, you define custom accumulator states to run complex multi-stage distributed reductions.
2. Why This Concept Exists
For complex mathematical operations—such as calculating standard deviation, tracking median value, computing click-through rates, or building custom sessions—a simple single-pass reduction will not work. These calculations require tracking intermediate values (like both a sum and a count). Subclassing CombineFn allows you to define a structured accumulator that handles intermediate states efficiently across distributed workers while fully supporting Combiner Lift.
3. Key Terminology
- Accumulator: An intermediate state object used to store progress during aggregation.
- CombineFn: The base class in Apache Beam for custom global or key-grouped combiners.
4. How It Works
A custom CombineFn class implements four key lifecycle methods:
create_accumulator(): Returns the initial, empty accumulator state (e.g.(0.0, 0)).add_input(accumulator, element): Takes the current accumulator and a new element from the partition, returning an updated accumulator. Runs locally on worker nodes.merge_accumulators(accumulators): Merges multiple worker accumulators into a single combined accumulator. Runs during the network shuffle phase.extract_output(accumulator): Performs final calculations on the merged accumulator (e.g. dividing total sum by total count) and outputs the final result.
5. Visual Diagram
Accumulator: (min=5, max=5)
Accumulator: (min=12, max=12)
Merged Accumulator: (min=5, max=12)
Final Output Result
12 - 5 ➔ 7
6. Code Example
Creating a custom RangeFn combiner that calculates the difference between the maximum and minimum values in a PCollection:
import apache_beam as beam
class RangeFn(beam.CombineFn):
def create_accumulator(self):
# Initialize with positive and negative infinity
return (float('inf'), float('-inf')) # (min, max)
def add_input(self, accumulator, element):
(current_min, current_max) = accumulator
return (min(current_min, element), max(current_max, element))
def merge_accumulators(self, accumulators):
# Separate min and max values from all accumulators
mins, maxs = zip(*accumulators)
return (min(mins), max(maxs))
def extract_output(self, accumulator):
(current_min, current_max) = accumulator
# If no elements were aggregated, return 0
if current_min == float('inf') or current_max == float('-inf'):
return 0.0
return current_max - current_min
with beam.Pipeline() as p:
temperatures = p | "CreateTemps" >> beam.Create([15.5, 32.0, 10.2, 25.4])
temp_range = temperatures | "CalcRange" >> beam.CombineGlobally(RangeFn())
7. Code Explanation
create_accumulatorreturns the base tuple:(inf, -inf).add_inputupdates the min and max values locally on each worker.merge_accumulatorsmerges the workers' local min/max tuples to find the absolute min and max:(10.2, 32.0).extract_outputsubtracts the min from the max to return21.8.
8. Real Production Example
When building network analytics pipelines, you might calculate click-through rates (CTR). You pass pairs of (clicks, impressions) and use a custom CombineFn to sum clicks and impressions separately, and then divide them in extract_output to output the final CTR percentage.
9. Common Mistakes
- Mutable Accumulator Bugs: In Python, modifying mutable structures (like sets or lists) inside
add_inputormerge_accumulatorscan cause concurrency bugs. It is safer to return new tuple or dict instances. - Assuming input ordering: Because
add_inputruns concurrently on different chunks of data, elements are merged in arbitrary order. Your logic must not depend on the order in which items arrive.
10. Interview Perspective
- Question: Which method in a CombineFn runs on the main merger node rather than local workers?
- Answer:
merge_accumulatorsandextract_outputexecute on the merger node post-shuffle.create_accumulatorandadd_inputrun locally on workers. - Question: How do you use a Custom CombineFn per key?
- Answer: You simply pass the custom combiner class instance to
CombinePerKeyinstead ofCombineGlobally(e.g.beam.CombinePerKey(RangeFn())).
11. Best Practices
- Ensure that the accumulator state is as small and lightweight as possible to minimize the serialization overhead during network shuffles.
- Use immutable types like named tuples, tuples, or dataclasses for accumulators.
12. Summary
CombineFnenables custom, multi-stage parallel aggregations.- Implement
create_accumulator,add_input,merge_accumulators, andextract_output. - Provides high performance by utilizing local pre-aggregation (Combiner Lift).