Build an advanced batch compliance audit pipeline that reads transaction logs, filters them against a blacklisted merchant side input, and routes errors to a failed table while writing clean data to a good table.
This batch compliance pipeline reads raw transaction logs, validates the schemas, checks the merchant ID against a blacklisted list loaded as a Side Input set, and splits the data into two targets: good records go to good_bq, while parsing errors are routed to failed_bq using tagged Side Outputs.
To run this audit compliance pipeline locally, create two data files in your workspace folder containing this mock data:
M501
M504transaction_id,merchant_id,amount,timestamp
T101,M501,450.00,2026-06-01T10:00:00Z
T102,M502,-12.00,2026-06-01T10:05:00Z
T103,M503,75.25,2026-06-01T10:10:00Z
T104,M501,INVALID,2026-06-01T10:15:00Z
T105,M504,8900.00,2026-06-01T10:20:00Z
T106,M502,150.00,2026-06-01T10:25:00ZRead the blacklist text file and convert it into a Set Side Input. Loading it as a Set allows O(1) lookup searches inside your main transformations.
blacklist_side = (
p
| "Read Blacklist File" >> beam.io.ReadFromText("gs://bucket/blacklist.txt")
| "Clean Whitespace" >> beam.Map(lambda x: x.strip())
| "Collect List" >> beam.CombineGlobally(beam.combiners.ToListCombineFn())
| "Convert to Set" >> beam.Map(lambda lst: set(lst))
)Subclass beam.DoFn. Inside `process()`, add a try-catch block. If records fail validation (e.g. invalid string inside amount), yield a TaggedOutput to route them to a dead-letter queue. Otherwise, check against the blacklist side input before yielding clean data.
class ValidateAndFilterFn(beam.DoFn):
def process(self, element, blacklist):
# blacklist is passed as a Side Input set
try:
row = list(csv.reader([element]))[0]
if row[0] == "transaction_id": # Skip header row
return
tx_id = row[0]
merchant_id = row[1]
amount_str = row[2]
timestamp = row[3]
# 1. Validation check
amount = float(amount_str)
if amount <= 0:
raise ValueError("Amount must be positive")
record = {
"transaction_id": tx_id,
"merchant_id": merchant_id,
"amount": amount,
"timestamp": timestamp
}
# 2. Side Input Check
if merchant_id in blacklist:
return # Discard blacklisted merchant records
yield record # Yield good records
except Exception as e:
# Yield to tagged side output 'failed_records'
yield TaggedOutput("failed_records", {
"raw_line": element,
"error_reason": str(e)
})Call the ParDo transform using with_outputs() to split your pipeline flow. This returns a multi-valued PCollection containing your good and failed streams.
validated_branches = (
raw_transactions
| "Validate & Filter" >> beam.ParDo(
ValidateAndFilterFn(),
blacklist=beam.pvalue.AsSingleton(blacklist_side) # Side Input
).with_outputs("failed_records", main="good_records")
)
good_records = validated_branches.good_records
failed_records = validated_branches.failed_recordsRoute your main PCollection branch to the BigQuery compliance target table using the WRITE_APPEND write disposition.
(good_records
| "Write to Good BQ" >> beam.io.WriteToBigQuery(
"project:dataset.good_bq",
schema="transaction_id:STRING, merchant_id:STRING, amount:FLOAT, timestamp:STRING",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))Route your tagged error PCollection branch to a separate dead-letter BigQuery table so that compliance engineers can inspect parsing errors.
(failed_records
| "Write to Failed BQ" >> beam.io.WriteToBigQuery(
"project:dataset.failed_bq",
schema="raw_line:STRING, error_reason:STRING",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))Try writing the pipeline code yourself using the steps above! Once you are finished, open the collapsible container below to review the complete, production-ready solution script.