Cloud Storage
1. Introduction
Apache Beam provides a unified interface for working with Cloud Storage providers. By abstracting filesystem specifics under URI schemes, Beam allows pipelines to read and write data seamlessly across local disk, Google Cloud Storage (GCS), AWS S3, and Azure Blob Storage.
2. Why This Concept Exists
In cloud-native data systems, compute and storage are decoupled. Data is ingested into object stores before processing. Rather than rewriting pipelines to handle different storage APIs, Beam allows you to change a target URI string, routing calls to the right vendor drivers automatically.
3. Key Terminology
- URI Scheme: The path prefix defining the protocol (e.g.,
gs://for GCS,s3://for AWS,wasb://for Azure). - FileSystems: The Beam core API wrapper that resolves URIs and routes operations to storage plugins.
- Object Store: Blob-based storage that indexes files by keys instead of hierarchical directories.
4. How It Works
- Beam uses dynamic registration to load filesystem handlers during runner initialization.
- When a path like
s3://my-bucket/logs/is passed, the runner invokes AWS client libraries. - Credentials and configurations are fetched from the runner's environments (e.g., IAM, environment variables).
5. Visual Diagram
Google Cloud Storage
gs://bucket/data
Amazon S3
s3://bucket/data
Azure Blob Storage
wasb://bucket/data
6. Code Example
Reading raw text files from different cloud storage services by swapping URIs:
import apache_beam as beam
# Switch path easily without changing code logic
cloud_path = "gs://company-data-bucket/imports/*.txt"
# cloud_path = "s3://company-data-bucket/imports/*.txt"
with beam.Pipeline() as p:
(p
| "ReadCloud" >> beam.io.ReadFromText(cloud_path)
| "Clean" >> beam.Map(lambda x: x.strip())
| "WriteBack" >> beam.io.WriteToText("gs://company-data-bucket/outputs/")
)
7. Code Explanation
- The path uses
gs://representing GCS. Swapping this tos3://makes Beam use the S3 protocol. - Authentication is handled behind the scenes by active service accounts or environment credentials.
8. Real Production Example
Using the FileSystems utility directly to delete or copy files dynamically inside a pipeline:
import apache_beam as beam
from apache_beam.io.filesystems import FileSystems
class CleanupFn(beam.DoFn):
def process(self, element):
path = element["temp_file_path"]
# Match files and delete them via FileSystems API
match_result = FileSystems.match([path])[0]
paths_to_delete = [metadata.path for metadata in match_result.metadata_list]
FileSystems.delete(paths_to_delete)
yield f"Cleaned up {len(paths_to_delete)} files"
9. Common Mistakes
- Hardcoding credentials: Storing API keys or access tokens directly in your pipeline code. Always use IAM roles or secret managers.
- Wrong package extras: Forgetting to install the appropriate cloud SDK dependencies (e.g.,
apache-beam[gcp]orapache-beam[aws]).
10. Interview Perspective
- Question: How does Beam manage network failures and retries when communicating with cloud storage?
- Answer: Beam's underlying storage adapters implement backoff and retry policies. If a temporary network drop occurs, the worker retries the API request before failing the bundle.
- Question: Why doesn't Beam support traditional directory renaming on cloud storage?
- Answer: Object stores are key-value stores. Renaming a directory requires copy-and-delete operations on every file key, which is highly inefficient.
11. Best Practices
- Use regional buckets near your compute instances (Dataflow workers) to minimize latency and transfer costs.
- Limit filesystem metadata checks (
FileSystems.match) as they call external APIs which can scale slowly.
12. Summary
- Beam uses URI schemes to route operations to storage providers.
- Allows portability across cloud providers.
- FileSystems API offers utilities to copy, delete, and list objects.