advanced

Custom CombineFn

7 min readLast updated: 2026-07-01

1. Introduction

A Custom CombineFn is created by subclassing apache_beam.CombineFn to write custom, highly optimized aggregations. When simple combiners like sum, max, or min are insufficient, you define custom accumulator states to run complex multi-stage distributed reductions.

2. Why This Concept Exists

For complex mathematical operations—such as calculating standard deviation, tracking median value, computing click-through rates, or building custom sessions—a simple single-pass reduction will not work. These calculations require tracking intermediate values (like both a sum and a count). Subclassing CombineFn allows you to define a structured accumulator that handles intermediate states efficiently across distributed workers while fully supporting Combiner Lift.

3. Key Terminology

  • Accumulator: An intermediate state object used to store progress during aggregation.
  • CombineFn: The base class in Apache Beam for custom global or key-grouped combiners.

4. How It Works

A custom CombineFn class implements four key lifecycle methods:

  1. create_accumulator(): Returns the initial, empty accumulator state (e.g. (0.0, 0)).
  2. add_input(accumulator, element): Takes the current accumulator and a new element from the partition, returning an updated accumulator. Runs locally on worker nodes.
  3. merge_accumulators(accumulators): Merges multiple worker accumulators into a single combined accumulator. Runs during the network shuffle phase.
  4. extract_output(accumulator): Performs final calculations on the merged accumulator (e.g. dividing total sum by total count) and outputs the final result.

5. Visual Diagram

Worker 1 (add_input)
Element: 5
Accumulator: (min=5, max=5)
Worker 2 (add_input)
Element: 12
Accumulator: (min=12, max=12)
▼ (merge_accumulators)

Merged Accumulator: (min=5, max=12)

▼ (extract_output: max - min)

Final Output Result
12 - 5 ➔ 7

6. Code Example

Creating a custom RangeFn combiner that calculates the difference between the maximum and minimum values in a PCollection:

python
import apache_beam as beam

class RangeFn(beam.CombineFn):
    def create_accumulator(self):
        # Initialize with positive and negative infinity
        return (float('inf'), float('-inf')) # (min, max)

    def add_input(self, accumulator, element):
        (current_min, current_max) = accumulator
        return (min(current_min, element), max(current_max, element))

    def merge_accumulators(self, accumulators):
        # Separate min and max values from all accumulators
        mins, maxs = zip(*accumulators)
        return (min(mins), max(maxs))

    def extract_output(self, accumulator):
        (current_min, current_max) = accumulator
        # If no elements were aggregated, return 0
        if current_min == float('inf') or current_max == float('-inf'):
            return 0.0
        return current_max - current_min

with beam.Pipeline() as p:
    temperatures = p | "CreateTemps" >> beam.Create([15.5, 32.0, 10.2, 25.4])
    temp_range = temperatures | "CalcRange" >> beam.CombineGlobally(RangeFn())

7. Code Explanation

  • create_accumulator returns the base tuple: (inf, -inf).
  • add_input updates the min and max values locally on each worker.
  • merge_accumulators merges the workers' local min/max tuples to find the absolute min and max: (10.2, 32.0).
  • extract_output subtracts the min from the max to return 21.8.

8. Real Production Example

When building network analytics pipelines, you might calculate click-through rates (CTR). You pass pairs of (clicks, impressions) and use a custom CombineFn to sum clicks and impressions separately, and then divide them in extract_output to output the final CTR percentage.

9. Common Mistakes

  • Mutable Accumulator Bugs: In Python, modifying mutable structures (like sets or lists) inside add_input or merge_accumulators can cause concurrency bugs. It is safer to return new tuple or dict instances.
  • Assuming input ordering: Because add_input runs concurrently on different chunks of data, elements are merged in arbitrary order. Your logic must not depend on the order in which items arrive.

10. Interview Perspective

  • Question: Which method in a CombineFn runs on the main merger node rather than local workers?
  • Answer: merge_accumulators and extract_output execute on the merger node post-shuffle. create_accumulator and add_input run locally on workers.
  • Question: How do you use a Custom CombineFn per key?
  • Answer: You simply pass the custom combiner class instance to CombinePerKey instead of CombineGlobally (e.g. beam.CombinePerKey(RangeFn())).

11. Best Practices

  • Ensure that the accumulator state is as small and lightweight as possible to minimize the serialization overhead during network shuffles.
  • Use immutable types like named tuples, tuples, or dataclasses for accumulators.

12. Summary

  • CombineFn enables custom, multi-stage parallel aggregations.
  • Implement create_accumulator, add_input, merge_accumulators, and extract_output.
  • Provides high performance by utilizing local pre-aggregation (Combiner Lift).

13. Interactive Challenges

Challenge 1: Accumulator Initialization and Addition (Beginner)

Write the create_accumulator and add_input methods for a custom SumFn combiner that calculates the total sum of elements.

Challenge 2: Custom MeanCombineFn Implementation (Intermediate)

Implement a complete custom AverageCombineFn class subclassing beam.CombineFn that computes the arithmetic mean of a PCollection of floats.

Challenge 3: Unique Item Set Combiner (Advanced)

Write a custom UniqueCountCombineFn class subclassing beam.CombineFn that computes the number of unique elements in a PCollection. Use Python sets as the accumulator state.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)