Retail ProjectIntermediate

Project Walkthrough: E-Commerce Customer Order Analytics

Apache BeamGCS BucketBigQueryPython

1. Project Overview & Objective

Assemble a batch analytics processing pipeline generating financial purchase metrics. Reads transaction and profile logs, joins them using CoGroupByKey, and creates customer lifetime value metrics.

This batch pipeline reads transactions and customer profile data from Cloud Storage CSV files, parses them inside custom DoFn classes, joins them on Customer ID using CoGroupByKey, aggregates the sums of order purchases, and writes the output report to BigQuery.

System Pipeline Architecture

Cloud Storage CSV Files (Orders & Profiles)Relational CoGroupByKey Join & Map FormattersBigQuery Reporting Tables

2. Get the Sample Data

To run the batch join pipeline locally, create two text CSV files in your workspace directory and populate them with these mock datasets:

1. members.csv
text
customer_id,name,join_date
C101,Alice Smith,2026-01-15
C102,Bob Jones,2026-02-20
C103,Charlie Brown,2026-03-10
C104,Diana Prince,2026-04-05
2. orders.csv
text
order_id,customer_id,amount,date
O501,C101,89.99,2026-06-01
O502,C102,45.50,2026-06-02
O503,C101,120.00,2026-06-03
O504,C103,15.75,2026-06-04
O505,C104,250.00,2026-06-05
O506,C102,12.50,2026-06-06

3. Step-by-Step Implementation

Step 3.1: Read CSV Files & Handle Headers

Use ReadFromText to read files and implement custom DoFn extractors.

python
import csv

class ParseCSVOrderFn(beam.DoFn):
    def process(self, element):
        row = list(csv.reader([element]))[0]
        if row[0] == "order_id":  # Skip CSV Header
            return
        customer_id = row[1]
        amount = float(row[2])
        yield (customer_id, amount)

Step 3.2: Map Datasets to Key-Value Join Pairs

Output key-value tuples containing the matching join key (customer_id).

python
# Transactions KV PCollection format: (customer_id, amount)
# Member Profiles KV PCollection format: (customer_id, customer_name)

Step 3.3: Execute Joins using CoGroupByKey

Combine both collections inside a dictionary and pass to CoGroupByKey.

python
joined_dataset = (
    {"names": profiles_kv, "amounts": transactions_kv}
    | "Join Data by Customer ID" >> beam.CoGroupByKey()
)

Step 3.4: Form Aggregations & Calculate LTV values

Sum the transaction amounts list to calculate the Lifetime Spent metric.

python
def format_joined_records(element):
    # element matches: (customer_id, {'names': [name], 'amounts': [amt1, amt2, ...]})
    customer_id, data = element
    names = data["names"]
    amounts = data["amounts"]
    
    name = names[0] if names else "Unknown Customer"
    total_spent = sum(amounts) if amounts else 0.0
    return {
        "customer_id": customer_id,
        "customer_name": name,
        "total_lifetime_spent": total_spent
    }

Step 3.5: Write final reports using Table Truncations

Configure your BigQuery sink with WRITE_TRUNCATE to overwrite tables.

python
| "Write to BQ Table" >> beam.io.WriteToBigQuery(
    "project:dataset.table",
    schema="customer_id:STRING, customer_name:STRING, total_lifetime_spent:FLOAT",
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
)

4. Complete Code Solution

Try writing the pipeline code yourself using the steps above! Once you are finished, open the collapsible container below to review the complete, production-ready solution script.

5. Production Deployment Safety

Deployment Safety Check
To ensure deployment security when running on Cloud Dataflow, verify that the executing Service Account possesses the 'Dataflow Worker' and 'Storage Object Viewer' roles to authorize file reads and VM scaling commands.
Advertisement
AdSense Slot #445566Leaderboard Banner (728x90)