Deduplication
1. Introduction
Deduplication is the process of identifying and removing duplicate records from a data stream. In streaming applications, ensuring that each event is processed only once is critical for preserving data integrity and maintaining accurate downstream reports.
2. Why This Concept Exists
Distributed message brokers (like Kafka, Pub/Sub, or RabbitMQ) generally operate on an at-least-once delivery model. If network connectivity drops or a consumer fails to acknowledge a message before a timeout, the broker will redeliver the message. Without a deduplication layer, your pipeline would process the duplicate event again, resulting in incorrect double-counting or double-billing.
3. Key Terminology
- Idempotency: An operation that yields the same result whether it is run once or multiple times (e.g., setting a status to "ACTIVE" is idempotent; adding 10 to a balance is not).
- Deduplication Key: A unique identifier (such as a transaction ID or UUID) embedded in a record, used to detect duplicate payloads.
- At-Least-Once Delivery: A broker guarantee that a message will be delivered to the consumer at least once, though duplicates may occur.
- Exactly-Once Processing: A system guarantee that the end results reflect each message being processed exactly once, regardless of retries.
4. How It Works
To deduplicate a stream, the pipeline must maintain a history of previously seen message IDs.
- Incoming Message: An event arrives with a unique identifier.
- State Lookup: A stateful transform checks if the identifier exists in a persistent state store.
- Filter Decision:
- If the ID exists in the state store, the event is marked as a duplicate and discarded.
- If the ID does not exist, the event is passed downstream, and the ID is added to the state.
- State Expiry: To prevent state storage from growing indefinitely, a timer is set to clean up the stored ID after a designated time window (e.g., 1 hour).
5. Visual Diagram
Stream Ingress
IDs: 101, 102, 101 (Dup)
Deduplication State
Checks cache (101 is duplicate)
Filtered Stream
IDs: 101, 102 (Unique)
6. Code Example
Here is how to implement a stateful and timer-based deduplication DoFn in Apache Beam:
import apache_beam as beam
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, TimerSpec
from apache_beam.transforms.timeutil import TimeDomain
class DeduplicateDoFn(beam.DoFn):
# 1. Declare state to hold seen IDs
SEEN_STATE = ReadModifyWriteStateSpec(name="seen_state", coder=beam.coders.BooleanCoder())
# 2. Declare a timer to clear state after expiry
EXPIRY_TIMER = TimerSpec(name="expiry_timer", time_domain=TimeDomain.PROCESSING_TIME)
def process(self, element,
seen_state=beam.DoFn.StateParam(SEEN_STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
msg_id, payload = element
# Check if ID has been seen
is_seen = seen_state.read()
if is_seen:
# Duplicate found, discard element
return
# Mark as seen
seen_state.write(True)
# Set cleanup timer for 10 minutes in the future (600 seconds)
expiry_timer.set(10 * 60)
yield (msg_id, payload)
@beam.transforms.userstate.on_timer(EXPIRY_TIMER)
def expiry_callback(self, seen_state=beam.DoFn.StateParam(SEEN_STATE)):
# Clear state to reclaim memory
seen_state.clear()
7. Code Explanation
SEEN_STATEacts as our boolean registry tracking whether a specific message ID key has been encountered.EXPIRY_TIMERschedules a callback in processing time.- If
seen_state.read()returnsTrue, the execution returns early, effectively filtering out the duplicate. - Otherwise, we write
Trueto state, set the timer for 10 minutes from now (expiry_timer.set(600)), and yield the event. - When the timer fires,
expiry_callbackexecutesseen_state.clear(), preventing memory leak.
8. Real Production Example
An online mobile game registers player score updates. If a player submits a score and loses cell coverage, the game client retries the API call. The pipeline uses the transaction ID to filter duplicate scores, preventing players from having inflated high scores on the global leaderboard.
9. Common Mistakes
- Infinite State Growth: Forgetting to define an expiration timer on your seen states will cause your workers to run out of memory or disk space as the pipeline runs over days and weeks.
- Deduplicating Across Unaligned Sinks: Deduplicating in-stream doesn't guarantee exactly-once writing if your sink does not support transactions or idempotent inserts. If a write fails and the pipeline restarts, the runner may replay records.
10. Interview Perspective
- Question: Why can't we keep deduplication state forever?
- Answer: In streaming, data flows forever. Storing every ID indefinitely would require infinite memory. Practically, we only need to store IDs for the maximum expected network retry window (e.g., 10 minutes to a few hours).
- Question: How does Kafka's idempotent producer compare to in-stream deduplication?
- Answer: Kafka's idempotent producer prevents duplicates between client and broker. In-stream deduplication in Beam prevents duplicates inside the processing pipeline caused by worker restarts, shuffles, or multi-step processing loops.
11. Best Practices
- Use a compact hash or UUID string for your deduplication keys to minimize state size.
- Set your cleanup timer window to be slightly larger than the maximum message retry duration of your producers.
- If your runner supports it (like Google Cloud Dataflow), use the built-in
Deduplicatetransform for native performance optimizations.
12. Summary
- Duplicates are common in streaming due to at-least-once delivery protocols.
- Stateful DoFns compare incoming event IDs against a local cache of seen IDs.
- State must be cleared using expiry timers to prevent memory leaks.