intermediate

Streaming Aggregations

6 min readLast updated: 2026-07-01

1. Introduction

Streaming aggregations are mathematical operations (such as sum, average, min, max, or count) applied to an infinite, continuous stream of data. Because the dataset never ends, streaming engines partition the data into temporal windows to perform these operations.

2. Why This Concept Exists

In batch processing, we aggregate across the entire dataset. In streaming, running a global aggregation across all time is impossible because the runner would have to hold an infinite amount of data in memory. To calculate real-time metrics—like hourly active users or 10-minute transaction rates—we must segment the unbounded stream into finite time slices called windows before applying our aggregations.

3. Key Terminology

  • Windowed Aggregation: An aggregation that computes a result independently for each individual time window.
  • Accumulator: A helper data structure used to maintain intermediate aggregation state (e.g., keeping track of partial sums and counts).
  • Pane: An individual output materialized for a specific window. A window can produce multiple panes if late data or early triggers are configured.
  • Combiner Lift: An optimization where the runner performs local aggregation on worker nodes before shuffling, minimizing network overhead.

4. How It Works

  • Ingestion: Elements arrive at the runner.
  • Window Assignment: Each element is assigned to one or more windows based on its event timestamp.
  • State Accumulation: The runner buffers and aggregates values inside each window.
  • Triggering: When the watermark passes the end of the window (or a trigger condition is met), the runner computes the final aggregation and emits the result (pane).
  • State Cleanup: The runner purges the window's state from memory once the allowed lateness window expires.

5. Visual Diagram

Window 12:00 - 12:05
Elements: [5, 3] ➔ Sum: 8 (Pane 1)
Window 12:05 - 12:10
Elements: [8, 4] ➔ Sum: 12 (Pane 2)

6. Code Example

The following pipeline processes a stream of user actions, windows them into 5-minute fixed windows, and sums the click count per user:

python
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows

options = PipelineOptions(streaming=True)

with beam.Pipeline(options=options) as p:
    (
        p
        # 1. Read input stream containing (user_id, click_count)
        | "ReadStream" >> beam.io.ReadFromPubSub(subscription="projects/my-proj/subscriptions/clicks")
        | "ParseJson" >> beam.Map(lambda x: eval(x.decode("utf-8"))) # Returns (user_id, count)
        
        # 2. Window into 5-minute intervals
        | "Window5m" >> beam.WindowInto(FixedWindows(5 * 60))
        
        # 3. Sum click counts per user within each window
        | "SumPerUser" >> beam.CombinePerKey(sum)
        
        # 4. Format and print output
        | "Format" >> beam.Map(lambda user_sum: f"User {user_sum[0]} clicked {user_sum[1]} times in window")
        | "Print" >> beam.Map(print)
    )

7. Code Explanation

  • FixedWindows(5 * 60) segments the data into non-overlapping 5-minute event-time windows (e.g. 12:00 to 12:05, 12:05 to 12:10).
  • CombinePerKey(sum) performs the aggregation. Because it is applied after WindowInto, Beam automatically computes the sum independently for each user within each 5-minute window.
  • The output contains the aggregated sum, emitted as soon as the watermark passes the end of each window.

8. Real Production Example

A cybersecurity company streams network request logs. The pipeline windows requests into 10-second sliding windows, counts requests per IP address, and flags any IP that exceeds 500 requests per 10 seconds as a potential Denial-of-Service (DoS) attacker.

9. Common Mistakes

  • No Windowing Applied: Attempting to run a grouping transform (GroupByKey or CombinePerKey) on an unbounded stream without setting a windowing strategy will result in a runtime error or an infinite wait, as the runner cannot determine when to output the aggregation.
  • Unbounded Memory Bloat: Choosing an extremely large window size (e.g., 24 hours) with a high volume of keys can exceed worker memory capacity. Use stateful storage or external databases for long-running aggregations.

10. Interview Perspective

  • Question: Can a window emit more than one aggregation result?
  • Answer: Yes. If the pipeline allows late data (allowed_lateness) or uses early triggers, the runner will emit multiple panes for the same window: one for the initial watermark completion, and subsequent panes as late events arrive.
  • Question: How does combiner lift improve streaming aggregation?
  • Answer: It performs partial combining on the worker nodes before sending data over the network (shuffling). For instance, if a worker receives 1,000 clicks, it sums them locally to 1000 and shuffles a single record, rather than shuffling all 1,000 raw clicks.

11. Best Practices

  • Use built-in combining functions (sum, mean, count, max, min) whenever possible to take advantage of combiner lift optimizations.
  • Align your window size with your business latency requirements: smaller windows provide lower latency but higher processing overhead.
  • Always configure allowed_lateness if you expect slow networks to delay some events.

12. Summary

  • Streaming aggregations calculate summary statistics over unbounded data.
  • Data must be windowed before applying aggregations.
  • Aggregations are computed independently for each key and window pair.

13. Interactive Challenges

Challenge 1: Windowed Global Average (Beginner)

Write a pipeline segment that applies a 1-minute fixed window to a PCollection of floats named temperatures and calculates the global average temperature within each window.

Challenge 2: Windowed Maximum Per Key (Intermediate)

Given a PCollection of (sensor_id, speed) tuples named speed_stream, write a pipeline segment that applies 10-second fixed windows and finds the maximum speed recorded by each sensor.

Challenge 3: Multi-metric Windowed Accumulator (Advanced)

Define a custom Python function min_and_max that takes an iterable of numbers and returns a tuple (min_value, max_value). Use it within a 30-second fixed-windowed pipeline to compute the range of incoming sensor readings globally.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)