advanced

Handling Out-of-Order Events

8 min readLast updated: 2026-07-01

1. Introduction

In real-time systems, events frequently arrive at the processing pipeline in a different order than they occurred in the physical world. Handling out-of-order events is the process of using event-time semantics, watermarks, allowed lateness, and triggers to guarantee correct processing regardless of arrival order.

2. Why This Concept Exists

Distributed networks, mobile connections, and retry mechanisms introduce unpredictable latency. A user might perform an action in a mobile app at 12:00 (Event Time), but because they enter a tunnel, the message is queued offline and only reaches the cloud pipeline at 12:15 (Processing Time). If the pipeline processed events based purely on arrival order, it would associate this event with the 12:15 interval, distorting historical accuracy.

3. Key Terminology

  • Event Time Skew: The time difference between when an event occurred (Event Time) and when it is processed by the pipeline (Processing Time).
  • Allowed Lateness: A configurable duration during which the runner preserves window state to process late-arriving events.
  • Trigger: A rule that determines when the accumulated results of a window should be materialized and emitted.
  • Accumulating Mode: A trigger setting where subsequent panes for a window include all historical data plus the new late data.
  • Discarding Mode: A trigger setting where subsequent panes for a window emit only the delta (new data) since the last trigger firing.

4. How It Works

  • Timestamping: Every element entering the pipeline is associated with an event timestamp.
  • Watermarking: The watermark acts as the system's clock. When the watermark passes the end of a window (e.g. 12:05), the window is considered complete.
  • Allowed Lateness: If an event with timestamp 12:03 arrives after the watermark has passed 12:05, it is marked as "late." If it arrives within the allowed_lateness period, the runner retrieves the window state, updates the aggregation, and emits a new pane.
  • Triggers: The trigger controls when these updates are sent downstream (e.g., emit immediately when late data arrives).

5. Visual Diagram

Event Time (12:00 - 12:05)
e1: 12:01 (On-Time)
e2: 12:04 (On-Time)

Late Event (12:03)
Arrives at Wall Clock 12:12.
Processed because Watermark lateness window is open.

6. Code Example

The following pipeline configures 5-minute fixed windows that allow data to arrive up to 10 minutes late, updating the aggregation results:

python
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AccumulationMode

# Define pipeline execution
with beam.Pipeline() as p:
    (
        p
        | "ReadStream" >> beam.io.ReadFromPubSub(subscription="projects/my-proj/subscriptions/logs")
        | "Parse" >> beam.Map(lambda x: eval(x.decode("utf-8")))  # Returns (user, score)
        
        # Configure windowing with late data support
        | "WindowWithLateness" >> beam.WindowInto(
            FixedWindows(5 * 60),  # 5-minute windows
            trigger=AfterWatermark(
                late=beam.transforms.trigger.AfterCount(1)  # Fire instantly on each late element
            ),
            allowed_lateness=10 * 60,  # Keep state for 10 minutes after watermark passes
            accumulation_mode=AccumulationMode.ACCUMULATING  # Emit full updated counts
        )
        
        | "SumScores" >> beam.CombinePerKey(sum)
        | "Format" >> beam.Map(print)
    )

7. Code Explanation

  • FixedWindows(5 * 60) creates 5-minute windows.
  • allowed_lateness=10 * 60 (10 minutes) instructs the runner to keep the window's state (accumulators) in memory or disk for 600 seconds after the watermark passes 12:05.
  • trigger=AfterWatermark(late=AfterCount(1)) tells the runner: "Fire once when the watermark passes the end of the window. If any late elements arrive afterward, fire an updated pane immediately for every 1 late element."
  • accumulation_mode=AccumulationMode.ACCUMULATING ensures that if the first pane emitted 10, and a late element of value 5 arrives, the next pane will emit the updated total 15 (instead of just the delta 5).

8. Real Production Example

A ridesharing company receives trip distance logs. Drivers in remote areas accumulate logs on their local phones. The pipeline calculates daily mileage. By allowing 24 hours of lateness, the system incorporates rides from offline drivers into the correct day's financial metrics when they re-establish network connection.

9. Common Mistakes

  • Leaving allowed_lateness at 0: By default, allowed_lateness is 0. Any data arriving even one millisecond after the watermark passes the window boundary will be dropped, leading to undercounted metrics.
  • Infinite Lateness Memory Exhaustion: Setting allowed_lateness to a huge value (or infinity) forces the runner to store window states forever. This will quickly exhaust worker RAM or local disk space.
  • Mismatch of Accumulation Modes: Using DISCARDING mode when your downstream database expects complete totals, or using ACCUMULATING mode when the sink expects deltas, leading to duplicate calculations in external databases.

10. Interview Perspective

  • Question: What happens to elements that arrive after the allowed lateness window has expired?
  • Answer: They are considered "droppably late." The runner discards these elements immediately without processing them. They do not trigger window execution.
  • Question: How does event-time processing differ from processing-time processing?
  • Answer: Processing-time processing assigns events to windows based on the machine clock when the event is processed, ignoring when it happened. Event-time processing groups events based on their embedded timestamps, ensuring deterministic results even if data is replayed.

11. Best Practices

  • Analyze your historical network skew to set an allowed_lateness that covers 99.9% of late-arriving data.
  • Use ACCUMULATING mode for updating internal metrics and DISCARDING mode if you are sending deltas to append-only databases.
  • Monitor your pipeline's "system lag" metric to ensure the watermark is advancing and not stuck due to slow workers.

12. Summary

  • Out-of-order data is caused by network latency and client-side queuing.
  • Beam uses allowed lateness to retain state for late events.
  • Triggers determine when to emit updated results (panes) for windows.

13. Interactive Challenges

Challenge 1: Configure 10-Minute Allowed Lateness (Beginner)

Apply a 2-minute fixed window to the PCollection stream with an allowed lateness threshold of 10 minutes (600 seconds). Do not configure custom triggers.

Challenge 2: Set Accumulating Mode (Intermediate)

Configure a windowing step that applies 1-minute fixed windows, allowing 5 minutes of lateness, and sets the accumulation mode explicitly to DISCARDING.

Challenge 3: Late Event Triggering (Advanced)

Set up a windowing transform that uses 5-minute fixed windows, allows 20 minutes of lateness, and configures a trigger that emits results immediately when the watermark passes, and emits subsequent updates immediately for each late event.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)