intermediate

Side Inputs

8 min readLast updated: 2026-07-01

1. Introduction

A Side Input allows a transform to access additional, read-only data from an auxiliary PCollection while processing elements in a main PCollection. This is useful for lookup maps, global configurations, or threshold parameters.

2. Why This Concept Exists

In standard map or ParDo transforms, you process one element at a time in isolation. However, you often need context. For example, if you are processing sales transactions, you may need a map of currency exchange rates to convert values. Since these rates change daily, they cannot be hardcoded. Side inputs let you inject this auxiliary dataset directly into processing functions.

3. Key Terminology

  • Main Input: The primary PCollection being processed.
  • Side Input: The auxiliary PCollection provided to the transform as additional read-only parameters.
  • Side Input Views: Adapters that convert a PCollection into common structures like singletons, lists, or dictionaries (AsSingleton, AsList, AsDict).

4. How It Works

  • You designate one PCollection as the side input.
  • You wrap it using a view constructor (e.g. beam.pvalue.AsDict(my_pcoll)).
  • You pass the wrapped PCollection as an argument to the main ParDo transform.
  • Beam ensures the side input data is broadcast to the workers before processing the main elements.

5. Visual Diagram

Main PCollection
Large source stream

ParDo Transform
Combines & maps elements

Side PCollection
AsDict config / static list

6. Code Example

Using a dictionary side input to map user IDs to usernames:

python
import apache_beam as beam

with beam.Pipeline() as p:
    # Small lookup dataset
    usernames = p | "Usernames" >> beam.Create([("1", "Alice"), ("2", "Bob")])
    
    # Main transaction dataset
    transactions = p | "Transactions" >> beam.Create([
        {"user_id": "1", "amount": 50.0},
        {"user_id": "2", "amount": 75.0}
    ])
    
    # Enrich transactions using the side input map
    def enrich_user(txn, lookup_map):
        txn_copy = dict(txn)
        txn_copy["username"] = lookup_map.get(txn["user_id"], "Unknown")
        return txn_copy

    enriched = (transactions 
                | "Enrich" >> beam.Map(enrich_user, lookup_map=beam.pvalue.AsDict(usernames)))
    enriched | "Print" >> beam.Map(print)

7. Code Explanation

  • usernames is a PCollection of key-value tuples.
  • beam.pvalue.AsDict(usernames) translates the PCollection to a readable lookup dictionary.
  • We pass lookup_map as a keyword argument to beam.Map.
  • Inside enrich_user, we fetch usernames using the key user_id.

8. Real Production Example

Passing a global threshold configuration from an external file to filter outliers:

python
import apache_beam as beam

with beam.Pipeline() as p:
    # Read threshold configuration as a singleton
    threshold_pcoll = p | "ReadConfig" >> beam.Create([80.0]) # could read from GCS
    
    # Main metrics stream
    metrics = p | "CreateMetrics" >> beam.Create([45.2, 89.1, 78.4, 91.3])
    
    # Filter using threshold side input
    def filter_outliers(val, threshold):
        if val > threshold:
            yield val

    outliers = (metrics 
                | "FilterOutliers" >> beam.ParDo(filter_outliers, threshold=beam.pvalue.AsSingleton(threshold_pcoll)))

9. Common Mistakes

  • Large Side Inputs: Passing massive datasets (e.g. millions of database records) as side inputs. Side inputs are kept in worker memory. If the side input is too large, workers will run out of memory.
  • Modifying Side Inputs: Attempting to alter or write to a side input object inside a step. Side inputs are strictly read-only.

10. Interview Perspective

  • Question: What is the difference between AsList and AsIter side inputs?
  • Answer: AsList loads the entire side input collection into worker memory as a Python list. AsIter streams the elements on demand, which is slower but helps handle larger collections without running out of memory.
  • Question: How does Beam handle updating side inputs in streaming pipelines?
  • Answer: Side inputs are resolved per window. In streaming pipelines, side inputs are updated as window boundaries progress, matching windows between main and side collections.

11. Best Practices

  • Keep side inputs small (typically under a few megabytes).
  • For massive join operations, use CoGroupByKey instead of side inputs to avoid worker memory starvation.

12. Summary

  • Side inputs pass additional data to transformations.
  • They are read-only and broadcast to workers.
  • Resolved into view types: AsSingleton, AsList, AsDict, and AsIter.

13. Interactive Challenges

Challenge 1: Singleton Multiplier (Beginner)

Write a pipeline segment that applies a multiplication factor side input (as a singleton containing the value 5) to a main PCollection of numbers [1, 2, 3].

Challenge 2: Blacklist Filtering (Intermediate)

Write a pipeline segment that filters out elements from a main PCollection ["apple", "banana", "cherry"] if they are present in a blacklist side input collection passed as a list view containing ["banana", "cherry"].

Challenge 3: Dict lookup (Advanced)

Enrich a collection of transaction dictionary elements with product descriptions using a side input dict containing {"SKU1": "Widget", "SKU2": "Gizmo"}.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)