advanced

Best Practices

7 min readLast updated: 2026-06-30

1. Introduction

Best Practices in Apache Beam focus on performance tuning, cost reduction, memory management, and deployment safety for production-grade pipelines running on Google Cloud Dataflow.

2. Why This Concept Exists

Data engineering pipelines often process millions of events per second. Poorly designed logic that runs fine on small local datasets can result in giant cloud bills, slow throughput, out-of-memory worker crashes, or database lockouts when deployed at scale. Implementing optimized design patterns ensures stability and efficiency.

3. Key Terminology

  • Hot Key: A key in a key-value collection that contains significantly more elements than other keys, causing a single worker to bottleneck.
  • Side Input Size: Auxiliary data passed to transforms that must be small enough to fit within worker memory.
  • API Bottleneck: Making individual external database calls per element inside process(), slowing throughput.

4. Core Design Patterns

  • Filter Early: Apply filtering steps as early as possible in your pipeline graph to avoid carrying garbage records through downstream transforms.
  • Batching API calls: Instead of querying external database API endpoints for every record individually, buffer elements locally and make bulk queries.
  • Combiner Lift: Prefer using CombinePerKey over GroupByKey + Map(sum) to enable workers to aggregate records locally before network shuffles.

5. Visual Diagram

Stateless API Calls
1 network request per element (Slow!):

Row 1 ➔ API | Row 2 ➔ API | Row 3 ➔ API

Batched Bundle Pattern
1 network connection per bundle (Fast!):

[Row 1, Row 2, Row 3] ➔ Batch API

6. Code Example

Implementing Bundle Batching inside a DoFn to perform bulk database updates:

python
import apache_beam as beam

class BulkDatabaseInsertDoFn(beam.DoFn):
    def __init__(self):
        self.batch_size = 50

    def start_bundle(self):
        # Initialize an empty buffer for this worker batch
        self.buffer = []

    def process(self, element):
        self.buffer.append(element)
        # Flush if buffer reaches batch threshold limit
        if len(self.buffer) >= self.batch_size:
            self.flush()

    def finish_bundle(self):
        # Flush remaining elements at the end of the worker bundle
        if self.buffer:
            self.flush()

    def flush(self):
        # Insert records into database in bulk (1 call instead of 50)
        db_client.bulk_insert(self.buffer)
        self.buffer.clear()

7. Code Explanation

  • start_bundle() initializes self.buffer as an empty list.
  • process() appends elements to the buffer, checking if the count reaches batch_size.
  • If the limit is met, flush() executes a single bulk write, clearing the container.
  • finish_bundle() processes any remaining items left in the buffer before the bundle closes.

8. Real Production Example: Dealing with Hot Keys

If your pipeline groups transactions by country, a country like "US" might contain 100x more elements than other keys. This creates a Hot Key, forcing one worker VM to process all "US" elements, running out of memory. To resolve this, you append a random integer to keys to distribute them (salt keys) before grouping, and strip the salt after aggregating:

python
salted_keys = records | beam.Map(lambda x: (f"{x['country']}_{random.randint(0, 9)}", x['amount']))

9. Common Mistakes

  • Connecting to databases in __init__: Constructors execute on the submission client machine, not the cloud worker VMs. Initialize connections inside the setup() method.
  • Passing giant side inputs: Side inputs are stored in worker RAM. If you pass a 10GB dictionary as a side input to a worker with 4GB of RAM, the worker VM will crash. Use Co-Grouping instead.

10. Interview Perspective

  • Question: How do you optimize external API calls inside a pipeline?
  • Answer: Use bundle-level batching inside start_bundle and finish_bundle to group records into arrays, calling batch APIs (like BigQuery Write API or bulk DynamoDB writes) once per bundle instead of once per element.
  • Question: What is Fusion Optimization in Dataflow?
  • Answer: The Dataflow Runner merges adjacent stateless steps (like a Map followed by a Filter) into a single execution stage to avoid network and disk overhead. If you need to force a step partition, insert a Reshuffle transform.

11. Best Practices

  • Use PipelineOptions to set autoscaling limits (--max_num_workers).
  • Pin dependencies in requirements.txt to prevent VM boot failures during worker provisioning.

12. Summary

  • Optimize performance using early filters and bulk operations.
  • Mitigate Hot Keys via key salting techniques.
  • Buffer records locally within bundle lifecycles (start_bundle/finish_bundle).

13. Interactive Challenges

Challenge 1: Setup Connection Resource (Beginner)

Write a DoFn class named CachedLookupDoFn that initializes an API client once during worker setup and sets it to self.client, using it in the process method to query values.

Challenge 2: Batch Insert DoFn (Intermediate)

Define a DoFn class named BatchLoggerDoFn that buffers string logs in a list and flushes them by printing the list when it accumulates exactly 5 logs. Ensure any remaining logs are flushed when the bundle finishes.

Challenge 3: Key Salting Pattern (Advanced)

Write a mapping transform statement that takes a tuple (key, value) and salts the key by appending a random integer between 0 and 4 in the format "key_integer" to distribute elements across workers.

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)