Apache Beam TutorialsEvergreen Article

Stateful Processing and Timers in Apache Beam

Published: July 02, 20268 min read

In stream processing, most operations are stateless: every element is processed in isolation, without knowledge of past elements.

However, complex use cases—such as user session stitching, anomaly detection, or transactional windowing—require maintaining history. This is Stateful Processing.

Apache Beam provides low-level State and Timer APIs that allow you to read and write state values per key, and schedule time-based callback triggers.


1. Core State APIs

Apache Beam offers several state structures:

  • ValueState: Stores a single value (e.g. counter, flag).
  • BagState: Stores a list of values (e.g. buffering click events).
  • MapState: Stores key-value mappings within the state container.

2. Coding Example: Stateful Deduplication

Here is how you use ValueState to deduplicate streaming elements per user ID within a custom DoFn:

python
import apache_beam as beam
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.coders import VarIntCoder

class DeduplicateFn(beam.DoFn):
    # Declare the state specification
    STATE_SPEC = ReadModifyWriteStateSpec('seen_state', VarIntCoder())

    def process(self, element, seen_state=beam.DoFn.StateParam(STATE_SPEC)):
        # Element format: (user_id, timestamp)
        user_id, ts = element
        
        # Read current state (returns None if empty)
        is_seen = seen_state.read()
        
        if is_seen is None:
            # First time seeing this user: write state and output element
            seen_state.write(1)
            yield element
        else:
            # Duplicate: discard element
            pass

3. Scheduling Callback Timers

Timers allow you to execute callback actions at specific event-time or processing-time thresholds.

python
from apache_beam.transforms.userstate import TimerSpec
from apache_beam.transforms.timeutil import TimeDomain

class TimeoutAlertFn(beam.DoFn):
    # Declare state and timer specs
    STATE_SPEC = ReadModifyWriteStateSpec('buffer', VarIntCoder())
    TIMER_SPEC = TimerSpec('expiry_timer', TimeDomain.PROCESSING_TIME)

    def process(self, element, 
                buffer=beam.DoFn.StateParam(STATE_SPEC),
                timer=beam.DoFn.TimerParam(TIMER_SPEC)):
        user_id, val = element
        
        # Write state and schedule timer to fire in 60 seconds
        buffer.write(val)
        timer.set(time.time() + 60.0)
        
        yield element

    @beam.on_timer(TIMER_SPEC)
    def on_expiry(self, buffer=beam.DoFn.StateParam(STATE_SPEC)):
        # Action triggered when the timer fires
        print("Timer expired! Clear buffer state.")
        buffer.clear()

4. Stateful Best Practices

  • [ ] Ensure Small Keyspaces: Stateful APIs store state partitioned by key. If you partition state over a high-cardinality key (e.g., UUIDs), your state store will bloat, leading to disk limits and out-of-memory errors on workers.
  • [ ] Implement State Cleans: Always clear state stores (state.clear()) once they are no longer needed (e.g. after a session closes or a timer expires) to reclaim worker memory.
  • [ ] Utilize RocksDB under Flink: If deploying to Apache Flink, use the RocksDB state backend. It offloads active state to local disk rather than storing all state in Java Heap memory, enabling scaling to millions of keys.