In stream processing, most operations are stateless: every element is processed in isolation, without knowledge of past elements.
However, complex use cases—such as user session stitching, anomaly detection, or transactional windowing—require maintaining history. This is Stateful Processing.
Apache Beam provides low-level State and Timer APIs that allow you to read and write state values per key, and schedule time-based callback triggers.
Apache Beam offers several state structures:
Here is how you use ValueState to deduplicate streaming elements per user ID within a custom DoFn:
import apache_beam as beam
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.coders import VarIntCoder
class DeduplicateFn(beam.DoFn):
# Declare the state specification
STATE_SPEC = ReadModifyWriteStateSpec('seen_state', VarIntCoder())
def process(self, element, seen_state=beam.DoFn.StateParam(STATE_SPEC)):
# Element format: (user_id, timestamp)
user_id, ts = element
# Read current state (returns None if empty)
is_seen = seen_state.read()
if is_seen is None:
# First time seeing this user: write state and output element
seen_state.write(1)
yield element
else:
# Duplicate: discard element
pass
Timers allow you to execute callback actions at specific event-time or processing-time thresholds.
from apache_beam.transforms.userstate import TimerSpec
from apache_beam.transforms.timeutil import TimeDomain
class TimeoutAlertFn(beam.DoFn):
# Declare state and timer specs
STATE_SPEC = ReadModifyWriteStateSpec('buffer', VarIntCoder())
TIMER_SPEC = TimerSpec('expiry_timer', TimeDomain.PROCESSING_TIME)
def process(self, element,
buffer=beam.DoFn.StateParam(STATE_SPEC),
timer=beam.DoFn.TimerParam(TIMER_SPEC)):
user_id, val = element
# Write state and schedule timer to fire in 60 seconds
buffer.write(val)
timer.set(time.time() + 60.0)
yield element
@beam.on_timer(TIMER_SPEC)
def on_expiry(self, buffer=beam.DoFn.StateParam(STATE_SPEC)):
# Action triggered when the timer fires
print("Timer expired! Clear buffer state.")
buffer.clear()
state.clear()) once they are no longer needed (e.g. after a session closes or a timer expires) to reclaim worker memory.