Timestamp Assignment
1. Introduction
In Apache Beam, every element in a PCollection is implicitly paired with a metadata timestamp. Timestamp Assignment is the process of setting or updating this hidden metadata timestamp.
By default, some data sources (like Google Cloud Pub/Sub) automatically attach timestamps to incoming elements. However, for other sources (like files, databases, or raw TCP sockets), elements enter the pipeline without a meaningful timestamp. In these scenarios, you must manually parse the payload and assign the appropriate timestamp.
2. Why This Concept Exists
Without explicit timestamp assignment, Apache Beam cannot associate data elements with their real-world occurrence times.
Timestamp assignment is necessary when:
- The Ingestion Source Lacks Temporal Metadata: Standard file readers (e.g.,
ReadFromText) assign the same default timestamp (usually the start of the pipeline or the ingestion block) to every line in the file. - Timezones Need Standardizing: Payloads contain local time zones and must be normalized to UTC epoch seconds for consistent windowing.
- Correcting Historical Data: Replaying historical records requires assigning the timestamp when the records were originally recorded, rather than the moment they are currently read from storage.
3. Key Terminology
- Metadata Timestamp: An internal metadata property linked to each element in a
PCollection, used by the runner to assign elements to windows. - TimestampedValue: A utility class in Apache Beam used to pair an element with a specific timestamp value.
- Epoch Time: The number of seconds that have elapsed since January 1, 1970 (Midnight UTC). Beam's Python SDK represents timestamps as epoch seconds.
4. How It Works
- Read raw records: Elements are pulled from a source. They carry a default timestamp (e.g., the global minimum timestamp
-9223372036854.775). - Parse Payload: A user-defined function (
DoFnor Map lambda) parses the element to locate the time field. - Construct Wrapper: The function wraps the element in a
TimestampedValue(element, epoch_seconds). - Emit & Update: The runner receives the wrapper, extracts the
epoch_seconds, updates the element's metadata timestamp, and forwards the element down the pipeline.
5. Visual Diagram
Raw JSON Element
time_ms: 1780000000000
Epoch Seconds
1780000000.0
Beam Element (Assigned)
Timestamp: 1780000000.0
6. Code Example
The following pipeline reads sensor data from a text file, extracts a millisecond epoch value from the comma-separated text, converts it to seconds, and assigns the event timestamp:
import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue
class AssignSensorTimestamp(beam.DoFn):
def process(self, element):
# Element format: "sensor_id,temperature,timestamp_ms"
# Example: "sensor_4,22.8,1780185600000"
parts = element.split(',')
sensor_id = parts[0]
temp = float(parts[1])
timestamp_ms = float(parts[2])
# Convert milliseconds to seconds (Beam Python uses seconds)
timestamp_sec = timestamp_ms / 1000.0
payload = {"sensor_id": sensor_id, "temp": temp}
# Yield the payload wrapped with the assigned timestamp
yield TimestampedValue(payload, timestamp_sec)
with beam.Pipeline() as p:
(p
| "ReadLogFile" >> beam.io.ReadFromText("d:/Builds/beam-acadamy/beam-academy-hub/content/lessons/sensor_data.txt")
| "ParseAndAssign" >> beam.ParDo(AssignSensorTimestamp())
| "LogTimestamps" >> beam.Map(lambda x, ts=beam.DoFn.TimestampParam: print(f"Sensor: {x['sensor_id']} | Event Time: {ts}"))
)
7. Code Explanation
parts = element.split(',')splits the raw string payload.timestamp_sec = timestamp_ms / 1000.0converts the millisecond epoch to seconds. This is critical because passing milliseconds directly will result in a timestamp thousands of years in the future.yield TimestampedValue(payload, timestamp_sec)updates the element's metadata timestamp.ts=beam.DoFn.TimestampParamis injected to verify that the metadata timestamp was successfully updated.
8. Real Production Example
In cellular networks, CDRs (Call Detail Records) are stored in files on cloud buckets. Each CDR contains details about the call, including call start time. To count active calls per minute, an Apache Beam pipeline reads the files, extracts the start time from the record, converts it to epoch seconds, and assigns the timestamp using a DoFn. The downstream pipeline then aggregates the data using fixed windows to analyze network cell tower congestion patterns.
9. Common Mistakes
- Using Milliseconds Instead of Seconds: Providing a millisecond epoch timestamp directly. Beam's Java SDK uses milliseconds, but the Python SDK uses epoch seconds (float/int). Passing milliseconds (e.g.
1600000000000) assigns a time far in the future, resulting in errors or elements that never trigger windows. - Assigning Timestamps in the Wrong Order: Applying the
WindowIntotransform before assigning timestamps. The runner will assign windows based on the old default timestamps, rendering the new timestamp assignment useless. - Assigning Timestamps to Late Data: Assigning a timestamp that is further back than the current watermark in a streaming pipeline. This classifies the element as "late data" immediately, causing it to be silently discarded.
10. Interview Perspective
- Question: How does the unit of timestamp assignment differ between Apache Beam Java and Python?
- Answer: The Java SDK uses
joda.time.Instantwhich represents milliseconds, whereas the Python SDK uses epoch seconds represented as float or integer values. - Question: What happens if you do not assign a timestamp to data read from a file?
- Answer: It receives a default timestamp corresponding to the global minimum (
-9223372036854.775), placing all elements into a single global window. They will not be segmentable by time.
11. Best Practices
- Always perform timestamp assignment as close to the data source as possible.
- Validate that timestamps are positive numbers and handle parsing errors gracefully to prevent pipeline failure.
- If your source data timestamp has millisecond precision, preserve it as a float value (e.g.
1600000000.123) to maintain high precision.
12. Summary
- Beam uses metadata timestamps to map elements to windows.
- Timestamp Assignment binds event timestamps to elements manually.
TimestampedValueis the primary class used to assign timestamps.- Python SDK uses epoch seconds (float) for timestamps.
- Assign timestamps before applying any windowing strategies.