Telecom ProjectAdvanced

Project Walkthrough: Real-Time Telecom Network Monitoring

Apache BeamGCP Pub/SubGCP BigQueryPython

1. Project Overview & Objective

Build an end-to-end streaming ingestion pipeline collecting tower call health metrics, identifying dropped call rates, and alerting downstream logs.

This streaming pipeline consumes real-time call health events from Pub/Sub, parses the raw JSON payload in a ParDo, buckets records into 1-minute fixed windows, aggregates metrics per tower using a custom CombineFn to determine the percentage of dropped calls, and logs the output in BigQuery.

System Pipeline Architecture

Google Cloud Pub/Sub TopicFixed Time Windows & Custom CombineFn AggregatorBigQuery Analytics Table

2. Get the Sample Data

To test this streaming pipeline, you need an active stream of records. Create a local file named generate_stream.py and copy-paste this Python script to generate mock tower events on GCP Pub/Sub:

python
import json
import time
import random
from google.cloud import pubsub_v1

# 1. Configure GCP identifiers
project_id = "your-gcp-project-id"
topic_id = "call-logs"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
statuses = ["COMPLETED", "COMPLETED", "COMPLETED", "DROPPED"]

print("Starting streaming log publisher. Press Ctrl+C to exit.")
while True:
    # 2. Publish random mock telemetric records
    data = {
        "call_id": f"call_{random.randint(1000, 9999)}",
        "tower_id": f"tower_{random.choice(['A', 'B', 'C'])}",
        "status": random.choice(statuses),
        "duration": random.randint(10, 300),
        "timestamp": int(time.time())
    }
    payload = json.dumps(data).encode("utf-8")
    publisher.publish(topic_path, payload)
    print(f"Published event: {data}")
    time.sleep(1.0)

3. Step-by-Step Implementation

Step 3.1: Initialise Pipeline Options & Enable Streaming

Start by setting up your pipeline configuration. Because this is a continuous network monitor, you must set the streaming=True option.

python
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(
    streaming=True, # Critical for real-time streaming
    runner="DataflowRunner",
    project="telecom-gcp-project"
)

Step 3.2: Subscribe to Pub/Sub Streams & Parse Payloads

Read raw byte strings from your active Google Cloud Pub/Sub topic and parse JSON bytes.

python
class ParseCallLogFn(beam.DoFn):
    def process(self, element):
        # element is a Pub/Sub byte string
        data = json.loads(element.decode("utf-8"))
        # Yield key-value tuple: (key, value)
        yield (data["tower_id"], {
            "call_id": data["call_id"],
            "status": data["status"], # "COMPLETED" or "DROPPED"
            "duration": int(data["duration"])
        })

Step 3.3: Segment Data using 1-Minute Fixed Windows

Apply a FixedWindows(60) transformation to bucket elements.

python
from apache_beam.transforms.window import FixedWindows

# Bucket records into 60s windows
windowed_stream = parsed_records | beam.WindowInto(FixedWindows(60))

Step 3.4: Create a Custom CombineFn to Calculate Dropped Rates

Subclass beam.CombineFn to count total calls and track dropped connections.

python
class AggregateTowerMetricsFn(beam.CombineFn):
    def create_accumulator(self):
        return {"total_calls": 0, "dropped_calls": 0}

    def add_input(self, accumulator, element):
        accumulator["total_calls"] += 1
        if element["status"] == "DROPPED":
            accumulator["dropped_calls"] += 1
        return accumulator

    def merge_accumulators(self, accumulators):
        merged = {"total_calls": 0, "dropped_calls": 0}
        for acc in accumulators:
            merged["total_calls"] += acc["total_calls"]
            merged["dropped_calls"] += acc["dropped_calls"]
        return merged

    def extract_output(self, accumulator):
        total = accumulator["total_calls"]
        dropped = accumulator["dropped_calls"]
        rate = (dropped / total * 100.0) if total > 0 else 0.0
        return {"total_calls": total, "dropped_calls": dropped, "dropped_call_rate": rate}

Step 3.5: Write Aggregated Windows to BigQuery Sinks

Map output elements to matching BigQuery schemas and append records.

python
| "Write to BQ" >> beam.io.WriteToBigQuery(
    "project:dataset.table",
    schema="tower_id:STRING, total_calls:INTEGER, dropped_calls:INTEGER, dropped_call_rate:FLOAT",
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)

4. Complete Code Solution

Try writing the pipeline code yourself using the steps above! Once you are finished, open the collapsible container below to review the complete, production-ready solution script.

5. Production Deployment Safety

Deployment Safety Check
When running on Google Cloud Dataflow, enable 'Streaming Engine' in the pipeline configurations. This offloads the windowing state memory from local worker disks to a managed backend service, allowing workers to autoscale much faster and reducing VM overhead costs.
Advertisement
AdSense Slot #445566Leaderboard Banner (728x90)