advanced
Custom IO
7 min readLast updated: 2026-07-01
1. Introduction
While Apache Beam offers native connectors for popular technologies, you may encounter proprietary databases, custom APIs, or specialized binary files that lack built-in support. Custom IO enables you to build custom Source and Sink connectors using Beam's extensible API wrappers.
2. Why This Concept Exists
Modern business architectures use custom internally developed REST APIs or legacy mainframes. In order to process this data at scale, pipelines need custom connectors that can parallelize work, divide datasets, track reader offsets, and manage bundle writes.
3. Key Terminology
- BoundedSource: A Beam base class representing a finite dataset that can be read and split into sub-sources.
- RangeTracker: A thread-safe helper class used to track the reader's current position and prevent overlapping record reads during dynamic splits.
- Dynamic Work Rebalancing: The ability of a runner to dynamically split a source's remaining work across multiple workers based on current progress.
4. How It Works
- Source: You subclass
BoundedSourceand implement theread(),get_range_tracker(), andsplit()methods. - Sink: You subclass
FileBasedSink(for files) or implement a customDoFnthat manages batch writes in its lifecycle hook methods (setup,process,finish_bundle).
5. Visual Diagram
BoundedSource
Splits into desired bundle sizes
Worker 1 (RangeTracker)
Index range: [0, 500]
Worker 2 (RangeTracker)
Index range: [501, 1000]
6. Code Example
A custom Source that generates a sequence of integers:
python
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io.range_trackers import OffsetRangeTracker
class RangeSource(iobase.BoundedSource):
def __init__(self, start, end):
self._start = start
self._end = end
def estimate_size(self):
return self._end - self._start
def get_range_tracker(self, start_position, end_position):
if start_position is None:
start_position = self._start
if end_position is None:
end_position = self._end
return OffsetRangeTracker(start_position, end_position)
def read(self, range_tracker):
for i in range(range_tracker.start_position(), range_tracker.stop_position()):
if not range_tracker.try_claim(i):
return
yield i
def split(self, desired_bundle_size, start_position=None, end_position=None):
# Allow splitting the source range into smaller segments
pass
7. Code Explanation
estimate_size()helps the runner decide how many workers to allocate.get_range_tracker()creates a position coordinator.read()loops through the range, claiming indices viarange_tracker.try_claim(i). If a claim fails (indicating the runner split the work), execution halts.
8. Real Production Example
A custom file writer using standard DoFn batching to write data to a proprietary REST API:
python
import apache_beam as beam
class ApiWriteDoFn(beam.DoFn):
def __init__(self, endpoint):
self.endpoint = endpoint
def setup(self):
# Initialize client
self.client = self.connect_to_api(self.endpoint)
self.batch = []
def process(self, element):
self.batch.append(element)
# Flush if batch size exceeded
if len(self.batch) >= 100:
self.flush()
def finish_bundle(self):
# Guarantee remaining elements are flushed when bundle completes
if self.batch:
self.flush()
def flush(self):
self.client.post_batch(self.batch)
self.batch = []
def connect_to_api(self, endpoint):
class DummyClient:
def post_batch(self, b): pass
return DummyClient()
9. Common Mistakes
- Failing to claim positions: Modifying range-tracker values inside
read()loops without callingtry_claim(), which breaks pipeline work rebalancing. - Not flushing in finish_bundle(): Buffering elements in
process()but forgetting to write the remaining buffered elements infinish_bundle(), resulting in data loss.
10. Interview Perspective
- Question: Why is the
RangeTrackerimportant in custom sources? - Answer: It prevents race conditions. When the runner splits a running source, the
RangeTrackercoordinates the boundary dynamically. The worker can only process records it successfully claims. - Question: When should you implement a custom Source versus a custom DoFn reader?
- Answer: Implement a custom
Sourcewhen you need advanced dynamic splitting, sizing estimations, and runner-level parallel optimizations. Use a customDoFnreader when reading from standard APIs where partitioning is simple or fixed.
11. Best Practices
- Implement
estimate_size()carefully; poor estimates lead to bad worker provisioning. - Always implement
finish_bundle()to flush batch writers and protect against data loss.
12. Summary
- Custom IO allows connecting to proprietary storage APIs.
- BoundedSource subclasses define read bounds and sizing.
- RangeTracker claims records, supporting dynamic work rebalancing.
13. Interactive Challenges
14. Related Content
Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)