Accumulation Modes
1. Introduction
When a windowing strategy is combined with custom triggers or allowed lateness, a single window can fire multiple times. Each emission is called a pane.
Accumulation Modes define how Apache Beam handles the data from previous firings when a window emits subsequent panes. Beam offers two primary modes: Accumulating (which keeps previous results and adds to them) and Retracting (which retracts previous emissions and sends updated values).
2. Why This Concept Exists
If a window triggers multiple times, downstream systems need a clear strategy to interpret the updates.
Choosing the correct accumulation mode is essential for:
- Preventing Double-Counting: If you sum transactions in a window, and a late event arrives, you need to ensure the downstream database updates the total rather than adding the new total to the old total.
- Downstream Agnostic Pipelines: Ensuring that subsequent aggregations in the pipeline (like a secondary
GroupByKey) calculate correct results even when their inputs change over time. - Idempotency: Aligning pipeline outputs with the capabilities of your storage layer (e.g., upserting into Elasticsearch vs. appending to a CSV file).
3. Key Terminology
- Accumulating Mode: Subsequent panes contain all the elements that have arrived in the window so far, including elements from previous panes.
- Retracting Mode: Subsequent panes emit both a "retraction" (a subtraction of the previous pane's value) and the new aggregated value.
- Pane: A single output materialization from a window. A window's first pane is index
0, followed by late panes1, 2, ....
4. How It Works
Let's trace a window that receives elements [2, 3] in its first firing, and later receives late element [4]:
- First Firing: The sum of
[2, 3]is 5. - Late Event Arrives:
[4]is added to the window. - Second Firing:
- Accumulating Mode: The runner emits the new cumulative sum: 9. The downstream system must replace the previous value
5with9(e.g., using a database overwrite). - Retracting Mode: The runner emits two signals: a retraction of the previous value -5 (retraction), followed by the new value 9 (addition). A downstream aggregator processes both, resulting in
5 + (-5) + 9 = 9.
- Accumulating Mode: The runner emits the new cumulative sum: 9. The downstream system must replace the previous value
5. Visual Diagram
6. Code Example
The following code compares how to configure Accumulating and Retracting modes within WindowInto:
import apache_beam as beam
from apache_beam.transforms.trigger import AccumulationMode, Repeatedly, AfterCount
from apache_beam.transforms.window import FixedWindows
# 1. Accumulating Mode Pipeline
with beam.Pipeline() as p1:
(p1
| "ReadAccumulating" >> beam.Create([("user1", 10), ("user1", 20)])
| "WindowAccumulate" >> beam.WindowInto(
FixedWindows(60),
trigger=Repeatedly(AfterCount(1)),
accumulation_mode=AccumulationMode.ACCUMULATING
)
| "SumAccumulate" >> beam.CombinePerKey(sum)
| "LogAccumulating" >> beam.Map(lambda x: print(f"Accumulating Pane Output: {x}"))
)
# 2. Retracting Mode Pipeline
with beam.Pipeline() as p2:
(p2
| "ReadRetracting" >> beam.Create([("user1", 10), ("user1", 20)])
| "WindowRetract" >> beam.WindowInto(
FixedWindows(60),
trigger=Repeatedly(AfterCount(1)),
accumulation_mode=AccumulationMode.RETRACTING
)
| "SumRetract" >> beam.CombinePerKey(sum)
| "LogRetracting" >> beam.Map(lambda x: print(f"Retracting Pane Output: {x}"))
)
7. Code Explanation
AccumulationMode.ACCUMULATINGtells the runner to retain window state across firings. For the first element10, it emits10. For the second element20, it emits the accumulated total30.AccumulationMode.RETRACTINGtells the runner to emit a retraction for the previous aggregate. For10, it emits10. When20arrives, it retracts-10and emits30. Downstream combiners use these retractions to update their internal states.
8. Real Production Example
In a monthly utility billing system, a pipeline aggregates electricity consumption logs per household. The billing database is append-only for audit integrity. When late logs arrive (e.g., due to smart-meter connectivity outages), using Retracting mode is mandatory. If the initial bill was calculated as $150 and late logs add $30, the pipeline emits a retraction of -$150 and an addition of $180. The billing system appends both transactions, yielding a correct net account balance of $180 without losing historical audit states.
9. Common Mistakes
- Double-Counting in Databases: Using Accumulating Mode and writing output panes to a database table using standard SQL
INSERTstatements. If a window sum updates from10to15, the table will hold both rows, leading to an incorrect sum of25. UseUPSERTor key-based overwrites for Accumulating Mode. - Forgetting to Set Accumulation Mode: Setting a trigger in
WindowIntobut omittingaccumulation_mode. The pipeline will crash with a compilation error. - High Memory Overhead in Retracting Mode: Using Retracting Mode requires the runner to keep track of every value ever emitted in order to generate retractions, which increases memory and storage utilization.
10. Interview Perspective
- Question: Why is Retracting Mode necessary if the pipeline contains multiple grouping operations?
- Answer: If you group by user, sum their score, and then group by country, the country-level aggregator needs to subtract the user's old score before adding the user's updated score. Retracting Mode enables this correction.
- Question: Which database types are best suited for Accumulating Mode?
- Answer: Key-value stores and document databases that support overwrites based on a key (e.g., Redis, Cassandra, Bigtable, Elasticsearch).
11. Best Practices
- Use Accumulating Mode when writing directly to dashboards or key-value stores where updates are handled as overwrites.
- Use Retracting Mode when the downstream pipeline performs secondary groupings or aggregations that must remain mathematically sound.
- Verify that your downstream database connector supports retractions or updates before choosing your accumulation mode.
12. Summary
- Accumulation modes govern how multi-pane window outputs are represented.
- Accumulating Mode outputs the entire running total with each pane.
- Retracting Mode outputs a retraction of the old value plus the new value.
- Accumulating Mode requires key-based upserts in downstream storage.
- Retracting Mode prevents double-counting in downstream pipeline aggregations.