Assemble a batch analytics processing pipeline generating financial purchase metrics. Reads transaction and profile logs, joins them using CoGroupByKey, and creates customer lifetime value metrics.
This batch pipeline reads transactions and customer profile data from Cloud Storage CSV files, parses them inside custom DoFn classes, joins them on Customer ID using CoGroupByKey, aggregates the sums of order purchases, and writes the output report to BigQuery.
To run the batch join pipeline locally, create two text CSV files in your workspace directory and populate them with these mock datasets:
customer_id,name,join_date
C101,Alice Smith,2026-01-15
C102,Bob Jones,2026-02-20
C103,Charlie Brown,2026-03-10
C104,Diana Prince,2026-04-05order_id,customer_id,amount,date
O501,C101,89.99,2026-06-01
O502,C102,45.50,2026-06-02
O503,C101,120.00,2026-06-03
O504,C103,15.75,2026-06-04
O505,C104,250.00,2026-06-05
O506,C102,12.50,2026-06-06Use ReadFromText to read files and implement custom DoFn extractors.
import csv
class ParseCSVOrderFn(beam.DoFn):
def process(self, element):
row = list(csv.reader([element]))[0]
if row[0] == "order_id": # Skip CSV Header
return
customer_id = row[1]
amount = float(row[2])
yield (customer_id, amount)Output key-value tuples containing the matching join key (customer_id).
# Transactions KV PCollection format: (customer_id, amount)
# Member Profiles KV PCollection format: (customer_id, customer_name)Combine both collections inside a dictionary and pass to CoGroupByKey.
joined_dataset = (
{"names": profiles_kv, "amounts": transactions_kv}
| "Join Data by Customer ID" >> beam.CoGroupByKey()
)Sum the transaction amounts list to calculate the Lifetime Spent metric.
def format_joined_records(element):
# element matches: (customer_id, {'names': [name], 'amounts': [amt1, amt2, ...]})
customer_id, data = element
names = data["names"]
amounts = data["amounts"]
name = names[0] if names else "Unknown Customer"
total_spent = sum(amounts) if amounts else 0.0
return {
"customer_id": customer_id,
"customer_name": name,
"total_lifetime_spent": total_spent
}Configure your BigQuery sink with WRITE_TRUNCATE to overwrite tables.
| "Write to BQ Table" >> beam.io.WriteToBigQuery(
"project:dataset.table",
schema="customer_id:STRING, customer_name:STRING, total_lifetime_spent:FLOAT",
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)Try writing the pipeline code yourself using the steps above! Once you are finished, open the collapsible container below to review the complete, production-ready solution script.