intermediate

Deduplication

6 min readLast updated: 2026-07-01

1. Introduction

Deduplication is the process of identifying and removing duplicate records from a data stream. In streaming applications, ensuring that each event is processed only once is critical for preserving data integrity and maintaining accurate downstream reports.

2. Why This Concept Exists

Distributed message brokers (like Kafka, Pub/Sub, or RabbitMQ) generally operate on an at-least-once delivery model. If network connectivity drops or a consumer fails to acknowledge a message before a timeout, the broker will redeliver the message. Without a deduplication layer, your pipeline would process the duplicate event again, resulting in incorrect double-counting or double-billing.

3. Key Terminology

  • Idempotency: An operation that yields the same result whether it is run once or multiple times (e.g., setting a status to "ACTIVE" is idempotent; adding 10 to a balance is not).
  • Deduplication Key: A unique identifier (such as a transaction ID or UUID) embedded in a record, used to detect duplicate payloads.
  • At-Least-Once Delivery: A broker guarantee that a message will be delivered to the consumer at least once, though duplicates may occur.
  • Exactly-Once Processing: A system guarantee that the end results reflect each message being processed exactly once, regardless of retries.

4. How It Works

To deduplicate a stream, the pipeline must maintain a history of previously seen message IDs.

  1. Incoming Message: An event arrives with a unique identifier.
  2. State Lookup: A stateful transform checks if the identifier exists in a persistent state store.
  3. Filter Decision:
    • If the ID exists in the state store, the event is marked as a duplicate and discarded.
    • If the ID does not exist, the event is passed downstream, and the ID is added to the state.
  4. State Expiry: To prevent state storage from growing indefinitely, a timer is set to clean up the stored ID after a designated time window (e.g., 1 hour).

5. Visual Diagram

Stream Ingress
IDs: 101, 102, 101 (Dup)

Deduplication State
Checks cache (101 is duplicate)

Filtered Stream
IDs: 101, 102 (Unique)

6. Code Example

Here is how to implement a stateful and timer-based deduplication DoFn in Apache Beam:

python
import apache_beam as beam
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec, TimerSpec
from apache_beam.transforms.timeutil import TimeDomain

class DeduplicateDoFn(beam.DoFn):
    # 1. Declare state to hold seen IDs
    SEEN_STATE = ReadModifyWriteStateSpec(name="seen_state", coder=beam.coders.BooleanCoder())
    # 2. Declare a timer to clear state after expiry
    EXPIRY_TIMER = TimerSpec(name="expiry_timer", time_domain=TimeDomain.PROCESSING_TIME)

    def process(self, element, 
                seen_state=beam.DoFn.StateParam(SEEN_STATE),
                expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):
        msg_id, payload = element
        
        # Check if ID has been seen
        is_seen = seen_state.read()
        
        if is_seen:
            # Duplicate found, discard element
            return
        
        # Mark as seen
        seen_state.write(True)
        
        # Set cleanup timer for 10 minutes in the future (600 seconds)
        expiry_timer.set(10 * 60)
        
        yield (msg_id, payload)
        
    @beam.transforms.userstate.on_timer(EXPIRY_TIMER)
    def expiry_callback(self, seen_state=beam.DoFn.StateParam(SEEN_STATE)):
        # Clear state to reclaim memory
        seen_state.clear()

7. Code Explanation

  • SEEN_STATE acts as our boolean registry tracking whether a specific message ID key has been encountered.
  • EXPIRY_TIMER schedules a callback in processing time.
  • If seen_state.read() returns True, the execution returns early, effectively filtering out the duplicate.
  • Otherwise, we write True to state, set the timer for 10 minutes from now (expiry_timer.set(600)), and yield the event.
  • When the timer fires, expiry_callback executes seen_state.clear(), preventing memory leak.

8. Real Production Example

An online mobile game registers player score updates. If a player submits a score and loses cell coverage, the game client retries the API call. The pipeline uses the transaction ID to filter duplicate scores, preventing players from having inflated high scores on the global leaderboard.

9. Common Mistakes

  • Infinite State Growth: Forgetting to define an expiration timer on your seen states will cause your workers to run out of memory or disk space as the pipeline runs over days and weeks.
  • Deduplicating Across Unaligned Sinks: Deduplicating in-stream doesn't guarantee exactly-once writing if your sink does not support transactions or idempotent inserts. If a write fails and the pipeline restarts, the runner may replay records.

10. Interview Perspective

  • Question: Why can't we keep deduplication state forever?
  • Answer: In streaming, data flows forever. Storing every ID indefinitely would require infinite memory. Practically, we only need to store IDs for the maximum expected network retry window (e.g., 10 minutes to a few hours).
  • Question: How does Kafka's idempotent producer compare to in-stream deduplication?
  • Answer: Kafka's idempotent producer prevents duplicates between client and broker. In-stream deduplication in Beam prevents duplicates inside the processing pipeline caused by worker restarts, shuffles, or multi-step processing loops.

11. Best Practices

  • Use a compact hash or UUID string for your deduplication keys to minimize state size.
  • Set your cleanup timer window to be slightly larger than the maximum message retry duration of your producers.
  • If your runner supports it (like Google Cloud Dataflow), use the built-in Deduplicate transform for native performance optimizations.

12. Summary

  • Duplicates are common in streaming due to at-least-once delivery protocols.
  • Stateful DoFns compare incoming event IDs against a local cache of seen IDs.
  • State must be cleared using expiry timers to prevent memory leaks.

13. Interactive Challenges

Challenge 1: Stateful Seen-State Checker (Beginner)

Complete the logic of a DoFn that reads a ValueStateSpec named seen_state. If the state is not None, yield the element with a duplicate flag set to True. If it is None, set state to True and yield the element with a duplicate flag set to False.

Challenge 2: Apply Expiry Timer (Intermediate)

Add a processing-time timer named CLEANUP to a stateful DoFn that clears the state SEEN_STORE after 5 minutes (300 seconds) of inactivity.

Challenge 3: Deduplicate Key-Value String Pairs (Advanced)

Configure a pipeline step using Apache Beam's built-in Deduplicate transform (or equivalent state-timer pattern) that removes duplicate string values from an incoming PCollection of (str, str) where the first element is the message key and the second is the payload. Ensure duplicates are ignored over a 600-second window.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)