Join Patterns
1. Introduction
In Apache Beam, Join Patterns refer to standard design recipes for combining two or more PCollections. The two primary strategies are:
- CoGroupByKey Joins (Shuffle-based): Used when both datasets are large.
- Side Input Joins (Broadcast/Map-side): Used when joining a large dataset with a small lookup table that fits in worker memory.
2. Why This Concept Exists
Relational joins in distributed data systems are complex because data is split across multiple worker machines. Performing a standard join requires shuffling (sending) matching keys across the network, which is compute and network intensive. Understanding join patterns allows you to choose the most efficient approach, preventing worker out-of-memory errors and lowering cloud compute costs.
3. Key Terminology
- CoGroupByKey Join: Relational join where all input PCollections are shuffled and grouped by key.
- Side Input / Broadcast Join: An optimization where a small dataset is sent to all workers as a side input (e.g., as a dictionary), allowing workers to perform joins in-memory without shuffling the main large dataset.
- Left Outer Join: Returns all records from the left dataset, matching records from the right, and defaults for missing right records.
- Inner Join: Returns only the records that have matching keys in both datasets.
4. How It Works
- CoGroupByKey Join: Both datasets must be key-value collections. Beam shuffles them to align matching keys onto the same worker. Post-shuffle, custom mapping functions unpack the result dictionary to implement Left, Right, or Inner join logic.
- Side Input Join: The small lookup PCollection is converted into a side input view (like a dictionary using
beam.pvalue.AsDict(small_pcoll)). AParDoorMaptransform iterates over the large PCollection, passing the side input dictionary to retrieve matching properties in-memory.
5. Visual Diagram
PColl 1 & PColl 2 ➔ Shuffle ➔ Match Key A
Large PColl ➔ Workers (Local Map Lookup)
6. Code Example
Implementing a Left Outer Join with CoGroupByKey and a Broadcast Join with Side Inputs:
import apache_beam as beam
# Pattern 1: CoGroupByKey Left Outer Join
def left_outer_join(element):
user_id, data = element
names = data["name"]
emails = data["email"]
# Left dataset (names) must exist
for name in names:
# If right dataset (emails) is empty, yield default
if not emails:
yield (user_id, (name, "No Email"))
else:
for email in emails:
yield (user_id, (name, email))
# Pattern 2: Side Input (Broadcast) Join
with beam.Pipeline() as p:
# Small lookup dataset
user_emails_pcoll = p | "Emails" >> beam.Create([
("user-1", "alice@gmail.com"),
("user-2", "bob@gmail.com")
])
# Convert small lookup to dict side input
emails_dict = beam.pvalue.AsDict(user_emails_pcoll)
# Large event stream PCollection
click_events = p | "Clicks" >> beam.Create([
{"user_id": "user-1", "click": "button_a"},
{"user_id": "user-2", "click": "button_b"},
{"user_id": "user-3", "click": "button_a"} # Not in emails
])
enriched_clicks = (
click_events
| "EnrichWithEmail" >> beam.Map(
lambda event, emails: {
**event,
"email": emails.get(event["user_id"], "unknown@mail.com")
},
emails=emails_dict
)
)
7. Code Explanation
- Left Outer Join: Unpacks the lists in
CoGroupByKey's output. If theemailslist is empty, it loops anyway and outputs"No Email"instead of skipping the record. - Side Input Join: Maps over
click_events. The lambda takes the event and the dictionaryemails. It looks upuser_idinside the memory-residentemailsside input, defaulting to"unknown@mail.com"if missing.
8. Real Production Example
When building advertising metric pipelines, a streaming PCollection of impressions (billions of events) is enriched with user segment details. The user segments PCollection is small and updated periodically. Performing a Side Input join is highly optimal here as it avoids shuffling billions of impression events.
9. Common Mistakes
- Side Input size too large: If the PCollection passed as a side input is too large (e.g. gigabytes of data), the worker node will crash with an
OutOfMemoryErrorsince the lookup table is loaded into the RAM of every JVM/worker instance. - Side Input updates in Streaming: Side inputs are cached. If your lookup database changes, you must set windowing on the side input PCollection so workers re-read the updated windows.
10. Interview Perspective
- Question: When would you prefer a Side Input join over a CoGroupByKey join?
- Answer: When one of the datasets is small enough to fit comfortably inside a single worker's memory. This removes the need for network shuffles on the large dataset, accelerating performance.
- Question: How do you perform a Right Outer Join using CoGroupByKey?
- Answer: In the mapping function after
CoGroupByKey, ensure you iterate through all elements of the right collection. If the left collection is empty, output defaults for the left fields.
11. Best Practices
- Use Side Inputs when the lookup table is under 100-200 MB.
- Pre-filter both collections before joining to discard unused keys or values, reducing both network and worker memory footprints.
12. Summary
CoGroupByKeyis the standard route for joining two massive datasets.- Side Inputs act as broadcast joins, mapping elements in-memory against a loaded lookup dictionary.
- Post-CoGroupByKey mappings define Left, Right, Inner, or Full Outer behaviors.