Finance ProjectAdvanced

Project Walkthrough: Financial Transaction Compliance Audit

Apache BeamSide InputsBigQueryPython

1. Project Overview & Objective

Build an advanced batch compliance audit pipeline that reads transaction logs, filters them against a blacklisted merchant side input, and routes errors to a failed table while writing clean data to a good table.

This batch compliance pipeline reads raw transaction logs, validates the schemas, checks the merchant ID against a blacklisted list loaded as a Side Input set, and splits the data into two targets: good records go to good_bq, while parsing errors are routed to failed_bq using tagged Side Outputs.

System Pipeline Architecture

Cloud Storage CSV Files (Transactions & Blacklist)Merchant Blacklist Set Side Input & Tagged Side Output BranchingBigQuery Tables (good_bq & failed_bq)

2. Get the Sample Data

To run this audit compliance pipeline locally, create two data files in your workspace folder containing this mock data:

1. blacklist.txt
text
M501
M504
2. transactions.csv
text
transaction_id,merchant_id,amount,timestamp
T101,M501,450.00,2026-06-01T10:00:00Z
T102,M502,-12.00,2026-06-01T10:05:00Z
T103,M503,75.25,2026-06-01T10:10:00Z
T104,M501,INVALID,2026-06-01T10:15:00Z
T105,M504,8900.00,2026-06-01T10:20:00Z
T106,M502,150.00,2026-06-01T10:25:00Z

3. Step-by-Step Implementation

Step 3.1: Read Blacklist & Convert to Set Side Input

Read the blacklist text file and convert it into a Set Side Input. Loading it as a Set allows O(1) lookup searches inside your main transformations.

python
blacklist_side = (
    p 
    | "Read Blacklist File" >> beam.io.ReadFromText("gs://bucket/blacklist.txt")
    | "Clean Whitespace" >> beam.Map(lambda x: x.strip())
    | "Collect List" >> beam.CombineGlobally(beam.combiners.ToListCombineFn())
    | "Convert to Set" >> beam.Map(lambda lst: set(lst))
)

Step 3.2: Create a ParDo implementing Schema Checks and Side Input Filtering

Subclass beam.DoFn. Inside `process()`, add a try-catch block. If records fail validation (e.g. invalid string inside amount), yield a TaggedOutput to route them to a dead-letter queue. Otherwise, check against the blacklist side input before yielding clean data.

python
class ValidateAndFilterFn(beam.DoFn):
    def process(self, element, blacklist):
        # blacklist is passed as a Side Input set
        try:
            row = list(csv.reader([element]))[0]
            if row[0] == "transaction_id": # Skip header row
                return
            
            tx_id = row[0]
            merchant_id = row[1]
            amount_str = row[2]
            timestamp = row[3]
            
            # 1. Validation check
            amount = float(amount_str)
            if amount <= 0:
                raise ValueError("Amount must be positive")
            
            record = {
                "transaction_id": tx_id,
                "merchant_id": merchant_id,
                "amount": amount,
                "timestamp": timestamp
            }
            
            # 2. Side Input Check
            if merchant_id in blacklist:
                return # Discard blacklisted merchant records
                
            yield record # Yield good records
            
        except Exception as e:
            # Yield to tagged side output 'failed_records'
            yield TaggedOutput("failed_records", {
                "raw_line": element,
                "error_reason": str(e)
            })

Step 3.3: Branch the Pipeline Outputs

Call the ParDo transform using with_outputs() to split your pipeline flow. This returns a multi-valued PCollection containing your good and failed streams.

python
validated_branches = (
    raw_transactions
    | "Validate & Filter" >> beam.ParDo(
        ValidateAndFilterFn(),
        blacklist=beam.pvalue.AsSingleton(blacklist_side) # Side Input
    ).with_outputs("failed_records", main="good_records")
)

good_records = validated_branches.good_records
failed_records = validated_branches.failed_records

Step 3.4: Write Correct Records to Good BQ Table

Route your main PCollection branch to the BigQuery compliance target table using the WRITE_APPEND write disposition.

python
(good_records 
 | "Write to Good BQ" >> beam.io.WriteToBigQuery(
     "project:dataset.good_bq",
     schema="transaction_id:STRING, merchant_id:STRING, amount:FLOAT, timestamp:STRING",
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
 ))

Step 3.5: Write Corrupted Records to Failed BQ Table

Route your tagged error PCollection branch to a separate dead-letter BigQuery table so that compliance engineers can inspect parsing errors.

python
(failed_records
 | "Write to Failed BQ" >> beam.io.WriteToBigQuery(
     "project:dataset.failed_bq",
     schema="raw_line:STRING, error_reason:STRING",
     write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
 ))

4. Complete Code Solution

Try writing the pipeline code yourself using the steps above! Once you are finished, open the collapsible container below to review the complete, production-ready solution script.

5. Production Deployment Safety

Deployment Safety Check
Using Tagged Side Outputs acts as a Dead Letter Queue (DLQ). This ensures that a single corrupted line (like a word inside the amount column) does not fail your entire batch audit run over millions of historical rows.
Advertisement
AdSense Slot #445566Leaderboard Banner (728x90)