beginner

Project Structure

7 min readLast updated: 2026-07-01

1. Introduction

When building production data pipelines, writing code inside a single script file is not maintainable. A production-grade Apache Beam project requires a structured directory tree that isolates pipeline logic, configuration parameters, reusable transformations, and unit tests, and packages dependencies for remote worker distribution.

2. Why This Concept Exists

When a pipeline runs on a distributed cluster (such as Apache Flink or Google Cloud Dataflow), the runner packages your code and sends it to multiple worker VMs. If your pipeline imports custom local helper classes or external libraries, remote workers will crash with a ModuleNotFoundError because they lack those files.

Creating a standardized project structure with a setup.py configuration solves this by packaging your project code as a redistributable module that the runner installs on all remote workers.

3. Key Terminology

  • setup.py: A Python metadata script used to package the pipeline and distribute custom modules to cluster worker nodes.
  • requirements.txt: Outlines the external Python libraries (such as pandas or requests) that must be installed on worker environments.
  • TestPipeline: A specialized pipeline runner provided by the Beam SDK for testing pipeline transforms and verifying outputs using assertions.

4. How It Works

  • Divide Code: Separate entry points (executables) from transformation modules.
  • Write setup.py: Configure packages using Python's standard setuptools.
  • Build Tests: Write unit tests in a dedicated folder using standard testing frameworks (like pytest or unittest).
  • Package for Execution: When submitting the pipeline, pass the --setup_file option to tell the runner to package and install the custom code on worker VMs before processing data.

5. Visual Diagram

A typical production-ready directory structure:

📂 my_beam_project/

📄 setup.py

📄 requirements.txt

📄 run_pipeline.py

📂 pipelines/

📄 init.py

📄 main_pipeline.py

📂 transforms/

📄 init.py

📄 cleaning.py

📂 tests/

📄 init.py

📄 test_cleaning.py

6. Code Example

Here is a sample setup.py script configuration:

python
# setup.py
import setuptools

setuptools.setup(
    name="my_beam_project",
    version="1.0.0",
    install_requires=[
        "pandas>=2.0.0",
        "requests>=2.28.0"
    ],
    packages=setuptools.find_packages(),
)

And here is a unit test in tests/test_cleaning.py that utilizes TestPipeline:

python
# tests/test_cleaning.py
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

def clean_name(name):
    return name.strip().lower()

class TestCleaningTransforms(unittest.TestCase):
    def test_clean_name(self):
        # Create a TestPipeline context specifically designed for assertions
        with TestPipeline() as p:
            inputs = p | beam.Create([" ALICE ", "Bob   ", "  charlie"])
            outputs = inputs | beam.Map(clean_name)
            
            # Verify outputs match expectations
            assert_that(outputs, equal_to(["alice", "bob", "charlie"]))

if __name__ == "__main__":
    unittest.main()

7. Code Explanation

  • The setup.py uses setuptools.setup to automatically package the subfolders (pipelines/, transforms/) and declare external dependencies.
  • In the test code, we import TestPipeline instead of the standard beam.Pipeline.
  • We use the assert_that utility and the equal_to match checker. These run assertions inside the pipeline graph executor on the workers, ensuring that the test fails if the output elements do not match.

8. Real Production Example

In a CI/CD deployment pipeline, a Git action runs pytest on the tests/ directory to run all pipeline unit tests. If the tests pass, the pipeline deployment script executes the entry script:

bash
python run_pipeline.py \
  --runner=DataflowRunner \
  --project=my-gcp-project \
  --setup_file=./setup.py \
  --staging_location=gs://my-bucket/staging

The --setup_file=./setup.py flag ensures that the worker nodes dynamically import custom transformations during cluster execution.

9. Common Mistakes

  • Missing __init__.py files: Forgetting to add empty __init__.py files to subdirectories. Without these, Python will not treat folders as packages, and find_packages() will fail to package modules.
  • Using standard assert statements: Using standard Python assertions (e.g. assert value == "expected") inside unit tests to verify PCollection values. A PCollection is a distributed object; standard assertions will fail or run improperly. You must always use assert_that(pcoll, equal_to(...)).

10. Interview Perspective

  • Question: Why is the --setup_file flag needed when submitting a job to a runner like Dataflow?
  • Answer: Standard Python pipelines read local directories on execution submission. However, when the job starts on cluster nodes, workers do not have access to those local packages. --setup_file packages the local module as a tarball, uploads it to staging, and installs it on each worker machine.
  • Question: What is the difference between TestPipeline and a regular Pipeline?
  • Answer: TestPipeline is a subclass designed for testing. It automates capturing pipeline execution statuses, allows testing state transitions, and hooks into assert_that to verify node data without writing outputs to external disks.

11. Best Practices

  • Never run raw database credentials or API keys inside code folders; manage credentials using secret managers and feed them using options.
  • Write unit tests for every custom DoFn and composite PTransform using TestPipeline.

12. Summary

  • Production pipelines require packaging logic into modules.
  • setup.py defines packages and installs remote worker requirements.
  • __init__.py files are mandatory to declare python package folders.
  • Use TestPipeline and assert_that to execute logic assertions.

13. Interactive Challenges

Challenge 1: Identify Package Errors (Beginner)

Explain why a remote pipeline execution fails with ModuleNotFoundError: No module named 'transforms' when running on Dataflow, even though it works locally on the developer's laptop.

Challenge 2: Declare requirements.txt (Intermediate)

Write a Python setup.py block configuring a package named "beam_ingest" with dependency configurations requiring numpy (version 1.22 or higher) and google-cloud-storage.

Challenge 3: Write a Test Asserting Element Counts (Advanced)

Write a complete Python unit test using TestPipeline that creates a PCollection containing names ["Tom", "Jerry"], filters out names with length less than 4, and asserts the output contains only ["Jerry"].

14. Related Content

Advertisement
AdSense Slot #000001Leaderboard Banner (728x90)