advanced

Join Patterns

7 min readLast updated: 2026-07-01

1. Introduction

In Apache Beam, Join Patterns refer to standard design recipes for combining two or more PCollections. The two primary strategies are:

  1. CoGroupByKey Joins (Shuffle-based): Used when both datasets are large.
  2. Side Input Joins (Broadcast/Map-side): Used when joining a large dataset with a small lookup table that fits in worker memory.

2. Why This Concept Exists

Relational joins in distributed data systems are complex because data is split across multiple worker machines. Performing a standard join requires shuffling (sending) matching keys across the network, which is compute and network intensive. Understanding join patterns allows you to choose the most efficient approach, preventing worker out-of-memory errors and lowering cloud compute costs.

3. Key Terminology

  • CoGroupByKey Join: Relational join where all input PCollections are shuffled and grouped by key.
  • Side Input / Broadcast Join: An optimization where a small dataset is sent to all workers as a side input (e.g., as a dictionary), allowing workers to perform joins in-memory without shuffling the main large dataset.
  • Left Outer Join: Returns all records from the left dataset, matching records from the right, and defaults for missing right records.
  • Inner Join: Returns only the records that have matching keys in both datasets.

4. How It Works

  • CoGroupByKey Join: Both datasets must be key-value collections. Beam shuffles them to align matching keys onto the same worker. Post-shuffle, custom mapping functions unpack the result dictionary to implement Left, Right, or Inner join logic.
  • Side Input Join: The small lookup PCollection is converted into a side input view (like a dictionary using beam.pvalue.AsDict(small_pcoll)). A ParDo or Map transform iterates over the large PCollection, passing the side input dictionary to retrieve matching properties in-memory.

5. Visual Diagram

CoGroupByKey Join (Large + Large)
Network shuffle groups keys across workers:

PColl 1 & PColl 2 ➔ Shuffle ➔ Match Key A

Side Input Join (Large + Small)
Broadcast small collection to all workers:

Large PColl ➔ Workers (Local Map Lookup)

6. Code Example

Implementing a Left Outer Join with CoGroupByKey and a Broadcast Join with Side Inputs:

python
import apache_beam as beam

# Pattern 1: CoGroupByKey Left Outer Join
def left_outer_join(element):
    user_id, data = element
    names = data["name"]
    emails = data["email"]
    
    # Left dataset (names) must exist
    for name in names:
        # If right dataset (emails) is empty, yield default
        if not emails:
            yield (user_id, (name, "No Email"))
        else:
            for email in emails:
                yield (user_id, (name, email))

# Pattern 2: Side Input (Broadcast) Join
with beam.Pipeline() as p:
    # Small lookup dataset
    user_emails_pcoll = p | "Emails" >> beam.Create([
        ("user-1", "alice@gmail.com"),
        ("user-2", "bob@gmail.com")
    ])
    # Convert small lookup to dict side input
    emails_dict = beam.pvalue.AsDict(user_emails_pcoll)

    # Large event stream PCollection
    click_events = p | "Clicks" >> beam.Create([
        {"user_id": "user-1", "click": "button_a"},
        {"user_id": "user-2", "click": "button_b"},
        {"user_id": "user-3", "click": "button_a"} # Not in emails
    ])

    enriched_clicks = (
        click_events
        | "EnrichWithEmail" >> beam.Map(
            lambda event, emails: {
                **event,
                "email": emails.get(event["user_id"], "unknown@mail.com")
            },
            emails=emails_dict
        )
    )

7. Code Explanation

  • Left Outer Join: Unpacks the lists in CoGroupByKey's output. If the emails list is empty, it loops anyway and outputs "No Email" instead of skipping the record.
  • Side Input Join: Maps over click_events. The lambda takes the event and the dictionary emails. It looks up user_id inside the memory-resident emails side input, defaulting to "unknown@mail.com" if missing.

8. Real Production Example

When building advertising metric pipelines, a streaming PCollection of impressions (billions of events) is enriched with user segment details. The user segments PCollection is small and updated periodically. Performing a Side Input join is highly optimal here as it avoids shuffling billions of impression events.

9. Common Mistakes

  • Side Input size too large: If the PCollection passed as a side input is too large (e.g. gigabytes of data), the worker node will crash with an OutOfMemoryError since the lookup table is loaded into the RAM of every JVM/worker instance.
  • Side Input updates in Streaming: Side inputs are cached. If your lookup database changes, you must set windowing on the side input PCollection so workers re-read the updated windows.

10. Interview Perspective

  • Question: When would you prefer a Side Input join over a CoGroupByKey join?
  • Answer: When one of the datasets is small enough to fit comfortably inside a single worker's memory. This removes the need for network shuffles on the large dataset, accelerating performance.
  • Question: How do you perform a Right Outer Join using CoGroupByKey?
  • Answer: In the mapping function after CoGroupByKey, ensure you iterate through all elements of the right collection. If the left collection is empty, output defaults for the left fields.

11. Best Practices

  • Use Side Inputs when the lookup table is under 100-200 MB.
  • Pre-filter both collections before joining to discard unused keys or values, reducing both network and worker memory footprints.

12. Summary

  • CoGroupByKey is the standard route for joining two massive datasets.
  • Side Inputs act as broadcast joins, mapping elements in-memory against a loaded lookup dictionary.
  • Post-CoGroupByKey mappings define Left, Right, Inner, or Full Outer behaviors.

13. Interactive Challenges

Challenge 1: Left Outer Join post CoGroupByKey (Beginner)

Write a mapping function left_join_formatter that takes a CoGroupByKey output tuple (user_id, {"profile": [...], "activity": [...]}) and outputs (user_id, {"name": name, "has_activity": flag}). If a user profile has no matching activity, has_activity should be False; otherwise True. Assume profile lists always contain exactly 1 element.

Challenge 2: Side Input Category Enrichment (Intermediate)

Write a pipeline segment that takes a PCollection of transaction dicts {"item_id": "item10", "price": 12.5} and enriches them with a product category using a small dictionary side input mapping item_id to category names.

Challenge 3: Inner Join Side Input Filter (Advanced)

Write a pipeline segment that uses a side input dictionary of allowed user IDs: {"user1": True, "user3": True}. If an incoming event dictionary has a "user_id" that is NOT present in the side input, filter it out completely (an Inner Join filtering pattern).

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)