intermediate

Timestamp Assignment

6 min readLast updated: 2026-07-01

1. Introduction

In Apache Beam, every element in a PCollection is implicitly paired with a metadata timestamp. Timestamp Assignment is the process of setting or updating this hidden metadata timestamp.

By default, some data sources (like Google Cloud Pub/Sub) automatically attach timestamps to incoming elements. However, for other sources (like files, databases, or raw TCP sockets), elements enter the pipeline without a meaningful timestamp. In these scenarios, you must manually parse the payload and assign the appropriate timestamp.

2. Why This Concept Exists

Without explicit timestamp assignment, Apache Beam cannot associate data elements with their real-world occurrence times.

Timestamp assignment is necessary when:

  • The Ingestion Source Lacks Temporal Metadata: Standard file readers (e.g., ReadFromText) assign the same default timestamp (usually the start of the pipeline or the ingestion block) to every line in the file.
  • Timezones Need Standardizing: Payloads contain local time zones and must be normalized to UTC epoch seconds for consistent windowing.
  • Correcting Historical Data: Replaying historical records requires assigning the timestamp when the records were originally recorded, rather than the moment they are currently read from storage.

3. Key Terminology

  • Metadata Timestamp: An internal metadata property linked to each element in a PCollection, used by the runner to assign elements to windows.
  • TimestampedValue: A utility class in Apache Beam used to pair an element with a specific timestamp value.
  • Epoch Time: The number of seconds that have elapsed since January 1, 1970 (Midnight UTC). Beam's Python SDK represents timestamps as epoch seconds.

4. How It Works

  1. Read raw records: Elements are pulled from a source. They carry a default timestamp (e.g., the global minimum timestamp -9223372036854.775).
  2. Parse Payload: A user-defined function (DoFn or Map lambda) parses the element to locate the time field.
  3. Construct Wrapper: The function wraps the element in a TimestampedValue(element, epoch_seconds).
  4. Emit & Update: The runner receives the wrapper, extracts the epoch_seconds, updates the element's metadata timestamp, and forwards the element down the pipeline.

5. Visual Diagram

Raw JSON Element
time_ms: 1780000000000

▼ (Extract Epoch Seconds)

Epoch Seconds
1780000000.0

▼ (Wrap with TimestampedValue)

Beam Element (Assigned)
Timestamp: 1780000000.0

6. Code Example

The following pipeline reads sensor data from a text file, extracts a millisecond epoch value from the comma-separated text, converts it to seconds, and assigns the event timestamp:

python
import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue

class AssignSensorTimestamp(beam.DoFn):
    def process(self, element):
        # Element format: "sensor_id,temperature,timestamp_ms"
        # Example: "sensor_4,22.8,1780185600000"
        parts = element.split(',')
        sensor_id = parts[0]
        temp = float(parts[1])
        timestamp_ms = float(parts[2])
        
        # Convert milliseconds to seconds (Beam Python uses seconds)
        timestamp_sec = timestamp_ms / 1000.0
        
        payload = {"sensor_id": sensor_id, "temp": temp}
        # Yield the payload wrapped with the assigned timestamp
        yield TimestampedValue(payload, timestamp_sec)

with beam.Pipeline() as p:
    (p
     | "ReadLogFile" >> beam.io.ReadFromText("d:/Builds/beam-acadamy/beam-academy-hub/content/lessons/sensor_data.txt")
     | "ParseAndAssign" >> beam.ParDo(AssignSensorTimestamp())
     | "LogTimestamps" >> beam.Map(lambda x, ts=beam.DoFn.TimestampParam: print(f"Sensor: {x['sensor_id']} | Event Time: {ts}"))
    )

7. Code Explanation

  • parts = element.split(',') splits the raw string payload.
  • timestamp_sec = timestamp_ms / 1000.0 converts the millisecond epoch to seconds. This is critical because passing milliseconds directly will result in a timestamp thousands of years in the future.
  • yield TimestampedValue(payload, timestamp_sec) updates the element's metadata timestamp.
  • ts=beam.DoFn.TimestampParam is injected to verify that the metadata timestamp was successfully updated.

8. Real Production Example

In cellular networks, CDRs (Call Detail Records) are stored in files on cloud buckets. Each CDR contains details about the call, including call start time. To count active calls per minute, an Apache Beam pipeline reads the files, extracts the start time from the record, converts it to epoch seconds, and assigns the timestamp using a DoFn. The downstream pipeline then aggregates the data using fixed windows to analyze network cell tower congestion patterns.

9. Common Mistakes

  • Using Milliseconds Instead of Seconds: Providing a millisecond epoch timestamp directly. Beam's Java SDK uses milliseconds, but the Python SDK uses epoch seconds (float/int). Passing milliseconds (e.g. 1600000000000) assigns a time far in the future, resulting in errors or elements that never trigger windows.
  • Assigning Timestamps in the Wrong Order: Applying the WindowInto transform before assigning timestamps. The runner will assign windows based on the old default timestamps, rendering the new timestamp assignment useless.
  • Assigning Timestamps to Late Data: Assigning a timestamp that is further back than the current watermark in a streaming pipeline. This classifies the element as "late data" immediately, causing it to be silently discarded.

10. Interview Perspective

  • Question: How does the unit of timestamp assignment differ between Apache Beam Java and Python?
  • Answer: The Java SDK uses joda.time.Instant which represents milliseconds, whereas the Python SDK uses epoch seconds represented as float or integer values.
  • Question: What happens if you do not assign a timestamp to data read from a file?
  • Answer: It receives a default timestamp corresponding to the global minimum (-9223372036854.775), placing all elements into a single global window. They will not be segmentable by time.

11. Best Practices

  • Always perform timestamp assignment as close to the data source as possible.
  • Validate that timestamps are positive numbers and handle parsing errors gracefully to prevent pipeline failure.
  • If your source data timestamp has millisecond precision, preserve it as a float value (e.g. 1600000000.123) to maintain high precision.

12. Summary

  • Beam uses metadata timestamps to map elements to windows.
  • Timestamp Assignment binds event timestamps to elements manually.
  • TimestampedValue is the primary class used to assign timestamps.
  • Python SDK uses epoch seconds (float) for timestamps.
  • Assign timestamps before applying any windowing strategies.

13. Interactive Challenges

Challenge 1: Millisecond Timestamp Assigner (Beginner)

Write a Map lambda function that takes a dictionary containing a "time_ms" field, converts it to epoch seconds, and returns it wrapped inside TimestampedValue.

Challenge 2: Log String Date Assigner (Intermediate)

Write a DoFn class called AssignDateStrTimestamp that parses a date string in the format "YYYY-MM-DD HH:MM:SS" from the key "event_date", converts it to UTC epoch seconds, and yields the element with the assigned timestamp.

Challenge 3: Shift Window Elements (Advanced)

Write a Map transform that shifts each element's metadata timestamp forward by exactly 5 minutes (300 seconds).

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)