Event Time
1. Introduction
In stream processing, time can be measured from different perspectives. Event Time is the time when a data event actually occurred on the client device or system of record (e.g., when a user clicked a button on a smartphone, when a server log was generated, or when an IoT sensor measured temperature).
Event Time is typically embedded directly in the data payload as a timestamp field (such as an ISO 8601 string or epoch millisecond value) generated by the source system.
2. Why This Concept Exists
In distributed systems, data does not arrive at the processing engine instantaneously or in order. Factors like network congestion, device disconnections, and system retries introduce variable delays.
If we aggregate a streaming metric (like daily active users) based on the time the data arrives at our workers (Processing Time), the results will be highly inaccurate:
- A user who clicked a button on a mobile phone while inside a subway tunnel at
12:00might have their event uploaded at12:15when they exit. - Grouping this event by arrival time would skew our analytics, attributing the user's action to the wrong hour.
- Event Time ensures that our computations remain accurate and deterministic, regardless of network delays, system latency, or out-of-order delivery.
3. Key Terminology
- Event Time: The timestamp when the event was generated at the source.
- Ingestion Time: The timestamp when the event was received by the ingestion system (e.g., Pub/Sub or Kafka).
- System Skew: The time gap between Event Time and Processing Time. Skew increases when there is network latency or consumer backlogs.
- Out-of-Order Data: Elements that arrive at the processing engine in a sequence different from the order in which they actually occurred.
4. How It Works
- Generation: The client device records the event and attaches a timestamp (e.g.,
2026-07-01T12:00:00Z). - Transmission: The event travels through the network, experiencing a 15-minute delay.
- Extraction: The pipeline parses the incoming payload, extracts the embedded timestamp, and overrides the runner's metadata timestamp using
beam.window.TimestampedValue. - Windowing & Aggregation: The pipeline runner places the event in the
12:00 - 12:05window based on the extracted Event Time, even though the worker is currently executing the code at12:15.
5. Visual Diagram
6. Code Example
The following code extracts a custom Event Time timestamp from a JSON record and associates it with the element:
import json
import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue
class ParseAndAssignTimestamp(beam.DoFn):
def process(self, element):
# Parse byte string to dictionary
record = json.loads(element.decode('utf-8'))
# Extract the epoch timestamp (in seconds)
event_time_epoch = float(record["timestamp_epoch"])
# Yield the element associated with its event timestamp
yield TimestampedValue(record, event_time_epoch)
with beam.Pipeline() as p:
(p
| "ReadFromSource" >> beam.io.ReadFromPubSub(subscription="projects/my-gcp/subs/logs")
| "AssignEventTime" >> beam.ParDo(ParseAndAssignTimestamp())
| "PrintEventTime" >> beam.Map(lambda x, ts=beam.DoFn.TimestampParam: print(f"Data: {x}, Event Time: {ts.to_utc_datetime()}"))
)
7. Code Explanation
json.loads(...)parses the raw message into a Python dictionary.event_time_epoch = float(record["timestamp_epoch"])extracts the Event Time payload.TimestampedValue(record, event_time_epoch)wraps the payload and updates the internal metadata timestamp metadata in Apache Beam.ts=beam.DoFn.TimestampParamis a special parameter injection in a DoFn or Map lambda that exposes the element's metadata timestamp so it can be logged.
8. Real Production Example
In multiplayer online gaming, user achievements or scores are stored locally if connection drops. If a player scores 1,000 points at 18:00 during a connectivity outage and uploads the score at 22:00 when they reconnect, Event Time allows the gaming server to credit the score to the correct 18:00 hourly tournament pool, ensuring fair competition.
9. Common Mistakes
- Relying on Default Ingestion Time: Processing data without explicitly extracting timestamps. By default, Beam will assign the time the message was read from Pub/Sub, which is Processing/Ingestion Time, not Event Time.
- Using String ISO Formats Directly: Passing ISO 8601 string timestamps (like
2026-07-01T12:00:00Z) directly toTimestampedValue. Beam expects timestamps to be numeric epoch seconds, so strings must be parsed first. - Timezone Discrepancies: Neglecting to convert client timestamps to UTC epoch seconds, resulting in mismatched windows and skewed statistics.
10. Interview Perspective
- Question: What is the primary advantage of Event Time over Processing Time?
- Answer: Event time provides deterministic results. If you rerun the same pipeline on historical logs, Event Time will yield the exact same aggregations. Processing Time changes based on when the pipeline runs.
- Question: How does Beam know an element's event time?
- Answer: It is either set automatically by the source connector (e.g. Pub/Sub's publish time metadata) or assigned manually using
TimestampedValueinside aParDo.
11. Best Practices
- Always store time in UTC epoch format at the ingestion layer to simplify parsing.
- Ensure that client clocks are synchronized via NTP (Network Time Protocol) to prevent skewed Event Time values.
- Use Event Time for business metrics (billing, analytics, reports) and Processing Time for operational diagnostics (pipeline lag, CPU loads).
12. Summary
- Event Time is the time when an event actually occurred at the client/source.
- It ensures data accuracy regardless of network delays or out-of-order delivery.
- Beam maps elements to windows based on their Event Time.
TimestampedValueis used to explicitly set Event Time.- Rerunning pipelines using Event Time produces consistent, deterministic results.