Side Inputs
1. Introduction
A Side Input allows a transform to access additional, read-only data from an auxiliary PCollection while processing elements in a main PCollection. This is useful for lookup maps, global configurations, or threshold parameters.
2. Why This Concept Exists
In standard map or ParDo transforms, you process one element at a time in isolation. However, you often need context. For example, if you are processing sales transactions, you may need a map of currency exchange rates to convert values. Since these rates change daily, they cannot be hardcoded. Side inputs let you inject this auxiliary dataset directly into processing functions.
3. Key Terminology
- Main Input: The primary PCollection being processed.
- Side Input: The auxiliary PCollection provided to the transform as additional read-only parameters.
- Side Input Views: Adapters that convert a PCollection into common structures like singletons, lists, or dictionaries (
AsSingleton,AsList,AsDict).
4. How It Works
- You designate one PCollection as the side input.
- You wrap it using a view constructor (e.g.
beam.pvalue.AsDict(my_pcoll)). - You pass the wrapped PCollection as an argument to the main
ParDotransform. - Beam ensures the side input data is broadcast to the workers before processing the main elements.
5. Visual Diagram
Main PCollection
Large source stream
ParDo Transform
Combines & maps elements
Side PCollection
AsDict config / static list
6. Code Example
Using a dictionary side input to map user IDs to usernames:
import apache_beam as beam
with beam.Pipeline() as p:
# Small lookup dataset
usernames = p | "Usernames" >> beam.Create([("1", "Alice"), ("2", "Bob")])
# Main transaction dataset
transactions = p | "Transactions" >> beam.Create([
{"user_id": "1", "amount": 50.0},
{"user_id": "2", "amount": 75.0}
])
# Enrich transactions using the side input map
def enrich_user(txn, lookup_map):
txn_copy = dict(txn)
txn_copy["username"] = lookup_map.get(txn["user_id"], "Unknown")
return txn_copy
enriched = (transactions
| "Enrich" >> beam.Map(enrich_user, lookup_map=beam.pvalue.AsDict(usernames)))
enriched | "Print" >> beam.Map(print)
7. Code Explanation
usernamesis a PCollection of key-value tuples.beam.pvalue.AsDict(usernames)translates the PCollection to a readable lookup dictionary.- We pass
lookup_mapas a keyword argument tobeam.Map. - Inside
enrich_user, we fetch usernames using the keyuser_id.
8. Real Production Example
Passing a global threshold configuration from an external file to filter outliers:
import apache_beam as beam
with beam.Pipeline() as p:
# Read threshold configuration as a singleton
threshold_pcoll = p | "ReadConfig" >> beam.Create([80.0]) # could read from GCS
# Main metrics stream
metrics = p | "CreateMetrics" >> beam.Create([45.2, 89.1, 78.4, 91.3])
# Filter using threshold side input
def filter_outliers(val, threshold):
if val > threshold:
yield val
outliers = (metrics
| "FilterOutliers" >> beam.ParDo(filter_outliers, threshold=beam.pvalue.AsSingleton(threshold_pcoll)))
9. Common Mistakes
- Large Side Inputs: Passing massive datasets (e.g. millions of database records) as side inputs. Side inputs are kept in worker memory. If the side input is too large, workers will run out of memory.
- Modifying Side Inputs: Attempting to alter or write to a side input object inside a step. Side inputs are strictly read-only.
10. Interview Perspective
- Question: What is the difference between
AsListandAsIterside inputs? - Answer:
AsListloads the entire side input collection into worker memory as a Python list.AsIterstreams the elements on demand, which is slower but helps handle larger collections without running out of memory. - Question: How does Beam handle updating side inputs in streaming pipelines?
- Answer: Side inputs are resolved per window. In streaming pipelines, side inputs are updated as window boundaries progress, matching windows between main and side collections.
11. Best Practices
- Keep side inputs small (typically under a few megabytes).
- For massive join operations, use
CoGroupByKeyinstead of side inputs to avoid worker memory starvation.
12. Summary
- Side inputs pass additional data to transformations.
- They are read-only and broadcast to workers.
- Resolved into view types:
AsSingleton,AsList,AsDict, andAsIter.