advanced

Custom IO

7 min readLast updated: 2026-07-01

1. Introduction

While Apache Beam offers native connectors for popular technologies, you may encounter proprietary databases, custom APIs, or specialized binary files that lack built-in support. Custom IO enables you to build custom Source and Sink connectors using Beam's extensible API wrappers.

2. Why This Concept Exists

Modern business architectures use custom internally developed REST APIs or legacy mainframes. In order to process this data at scale, pipelines need custom connectors that can parallelize work, divide datasets, track reader offsets, and manage bundle writes.

3. Key Terminology

  • BoundedSource: A Beam base class representing a finite dataset that can be read and split into sub-sources.
  • RangeTracker: A thread-safe helper class used to track the reader's current position and prevent overlapping record reads during dynamic splits.
  • Dynamic Work Rebalancing: The ability of a runner to dynamically split a source's remaining work across multiple workers based on current progress.

4. How It Works

  • Source: You subclass BoundedSource and implement the read(), get_range_tracker(), and split() methods.
  • Sink: You subclass FileBasedSink (for files) or implement a custom DoFn that manages batch writes in its lifecycle hook methods (setup, process, finish_bundle).

5. Visual Diagram

BoundedSource
Splits into desired bundle sizes

Worker 1 (RangeTracker)
Index range: [0, 500]
Worker 2 (RangeTracker)
Index range: [501, 1000]

6. Code Example

A custom Source that generates a sequence of integers:

python
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker

class RangeSource(iobase.BoundedSource):
    def __init__(self, start, end):
        self._start = start
        self._end = end

    def estimate_size(self):
        return self._end - self._start

    def get_range_tracker(self, start_position, end_position):
        if start_position is None:
            start_position = self._start
        if end_position is None:
            end_position = self._end
        return OffsetRangeTracker(start_position, end_position)

    def read(self, range_tracker):
        for i in range(range_tracker.start_position(), range_tracker.stop_position()):
            if not range_tracker.try_claim(i):
                return
            yield i

    def split(self, desired_bundle_size, start_position=None, end_position=None):
        # Allow splitting the source range into smaller segments
        pass

7. Code Explanation

  • estimate_size() helps the runner decide how many workers to allocate.
  • get_range_tracker() creates a position coordinator.
  • read() loops through the range, claiming indices via range_tracker.try_claim(i). If a claim fails (indicating the runner split the work), execution halts.

8. Real Production Example

A custom file writer using standard DoFn batching to write data to a proprietary REST API:

python
import apache_beam as beam

class ApiWriteDoFn(beam.DoFn):
    def __init__(self, endpoint):
        self.endpoint = endpoint
        
    def setup(self):
        # Initialize client
        self.client = self.connect_to_api(self.endpoint)
        self.batch = []
        
    def process(self, element):
        self.batch.append(element)
        # Flush if batch size exceeded
        if len(self.batch) >= 100:
            self.flush()
            
    def finish_bundle(self):
        # Guarantee remaining elements are flushed when bundle completes
        if self.batch:
            self.flush()
            
    def flush(self):
        self.client.post_batch(self.batch)
        self.batch = []
        
    def connect_to_api(self, endpoint):
        class DummyClient:
            def post_batch(self, b): pass
        return DummyClient()

9. Common Mistakes

  • Failing to claim positions: Modifying range-tracker values inside read() loops without calling try_claim(), which breaks pipeline work rebalancing.
  • Not flushing in finish_bundle(): Buffering elements in process() but forgetting to write the remaining buffered elements in finish_bundle(), resulting in data loss.

10. Interview Perspective

  • Question: Why is the RangeTracker important in custom sources?
  • Answer: It prevents race conditions. When the runner splits a running source, the RangeTracker coordinates the boundary dynamically. The worker can only process records it successfully claims.
  • Question: When should you implement a custom Source versus a custom DoFn reader?
  • Answer: Implement a custom Source when you need advanced dynamic splitting, sizing estimations, and runner-level parallel optimizations. Use a custom DoFn reader when reading from standard APIs where partitioning is simple or fixed.

11. Best Practices

  • Implement estimate_size() carefully; poor estimates lead to bad worker provisioning.
  • Always implement finish_bundle() to flush batch writers and protect against data loss.

12. Summary

  • Custom IO allows connecting to proprietary storage APIs.
  • BoundedSource subclasses define read bounds and sizing.
  • RangeTracker claims records, supporting dynamic work rebalancing.

13. Interactive Challenges

Challenge 1: Size Estimator (Beginner)

Complete the estimate_size() method inside a source class that has attributes self.total_rows = 5000 and self.row_size_bytes = 100 to return the total byte size estimate.

Challenge 2: Finish Bundle Flush (Intermediate)

Complete the finish_bundle method inside a custom DoFn that calls self.flush() and yields nothing.

Challenge 3: Range Claim Check (Advanced)

Write the reader conditional check inside a source read loop that checks if a record index idx can be claimed on range_tracker. If it cannot be claimed, return.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)