MQTT Protocol

1. Project Overview

Purpose and Scope:
This system is an MQTT subscriber designed for Freeport FMI. It connects to a local Mosquitto MQTT broker, listens for sensor data on a specific topic pattern, and performs two main actions:

  • Publishes received sensor data directly to a Google Cloud Pub/Sub topic for downstream processing.
  • Saves the data to a Google BigQuery table for audit and historical analysis.

Primary Use Cases:

  • Real-time ingestion of sensor readings from industrial equipment or IoT devices.
  • Forwarding sensor data to cloud-based event processing pipelines (via Pub/Sub).
  • Persisting sensor data for compliance, auditing, or analytics (via BigQuery).

Explicitly Not Handled:

  • Data transformation or enrichment beyond basic field extraction.
  • Downstream processing or analytics (handled elsewhere).
  • MQTT message publishing (this is a subscriber only).
  • Authentication/authorization for MQTT broker (assumes local/trusted broker).

2. System Architecture

Core Components:

  • MQTT Client: Subscribes to a topic pattern and receives messages.
  • Pub/Sub Publisher: Publishes validated sensor events to a Google Pub/Sub topic.
  • BigQuery Client: Inserts validated sensor events into a BigQuery table.
  • Logging: Uses Python logging for local and cloud-based logs.

Data and Control Flow:

  1. MQTT client connects to broker and subscribes to a topic.
  2. On message receipt:
    • Parses topic and payload.
    • Validates required fields.
    • Publishes event to Pub/Sub (if enabled).
    • Inserts event into BigQuery (if enabled).
    • Logs all actions and errors.

External Services and Dependencies:

  • Mosquitto MQTT broker (local or remote).
  • Google Cloud Pub/Sub (for event publishing).
  • Google BigQuery (for data persistence).
  • Google Cloud Logging (optional, not actively used in this script).

3. Core Concepts & Domain Logic

Key Abstractions and Domain Terms:

  • Sensor Reading: A JSON payload containing site_code, sensor_id, timestamp, and value.
  • KPI Event: The structure sent to Pub/Sub, mapping sensor readings to KPI semantics.
  • Topic Parsing: MQTT topics are expected in the form FCTS/FAE/<site>/SolvExtract/<pi_tag>.

Business/Technical Invariants:

  • Only messages with all required fields are processed.
  • Data is published to Pub/Sub and/or BigQuery only in certain environments (DEV, PPRD, PRD).
  • Ordering key for Pub/Sub is the site code, ensuring per-site event ordering.

Mental Model:

  • The system is a stateless, real-time bridge between MQTT, Pub/Sub, and BigQuery, with environment-driven feature toggling.

4. Codebase Structure

Single-file Script:

  • All logic is contained in one Python file.
  • Major sections: configuration, client setup, message handling, main loop.

Responsibility Boundaries:

  • MQTT handling (connection, subscription, message receipt).
  • Data validation and transformation.
  • Pub/Sub publishing.
  • BigQuery insertion.
  • Logging and error handling.

What Changes Together:

  • Topic parsing and payload validation logic.
  • Pub/Sub and BigQuery schema changes.

5. Configuration & Environment

Environment Variables:

  • APP_ENV: Controls environment (LOCAL, DEV, PPRD, PRD).
  • GCP_PROJECT_ID: Google Cloud project.
  • BIGQUERY_DATASET, BIGQUERY_TABLE: BigQuery dataset/table.
  • PUBSUB_TOPIC: Pub/Sub topic name.
  • MQTT_BROKER, MQTT_PORT, MQTT_TOPIC: MQTT connection details.

Configuration Files:

  • None; all configuration is via environment variables.

Environment Differences:

  • In LOCAL, only logs are written (no Pub/Sub or BigQuery).
  • In DEV/PPRD/PRD, Pub/Sub and BigQuery are enabled.

6. Runtime Behavior

Startup Sequence:

  • Loads configuration from environment.
  • Initializes BigQuery and Pub/Sub clients if enabled.
  • Connects to MQTT broker and subscribes to topic.

Normal Execution Flow:

  • Waits for MQTT messages.
  • On message:
    • Parses and validates payload.
    • Publishes to Pub/Sub (if enabled).
    • Inserts into BigQuery (if enabled).
    • Logs all actions.

Error Handling and Logging:

  • Uses Python logging for all actions and errors.
  • Catches and logs JSON decode errors, Pub/Sub/BigQuery errors, and connection issues.
  • Continues running on recoverable errors.

7. Deployment & Operations

Build Process:

  • No explicit build; run as a Python script.
  • Requires Python 3 and dependencies (paho-mqtt, google-cloud-bigquery, google-cloud-pubsub).

Deployment Method:

  • Typically deployed as a container or systemd service on a VM or cloud instance.
  • Environment variables must be set appropriately.

Runtime Dependencies:

  • Access to MQTT broker.
  • Google Cloud credentials (for BigQuery and Pub/Sub).

Scaling and Rollback:

  • Stateless; can run multiple instances for higher throughput (ensure unique MQTT client IDs).
  • Rollback is as simple as redeploying a previous version.

8. Extending the System

Where and How to Add New Features:

  • Add new message processing logic in on_message.
  • Extend Pub/Sub or BigQuery schemas as needed (update both code and cloud resources).
  • Add new output sinks (e.g., HTTP, file) by extending the message handler.

Recommended Patterns:

  • Keep message validation strict.
  • Use environment variables for all configuration.
  • Modularize new output integrations.

Anti-patterns and Risk Areas:

  • Avoid hardcoding credentials or configuration.
  • Do not relax payload validation (could lead to data quality issues).
  • Avoid blocking operations in the message handler (could delay message processing).

Testing Strategy:

  • Unit test message parsing and validation logic.
  • Use mocks for Pub/Sub and BigQuery in tests.
  • Integration test with test MQTT broker and cloud resources.

9. Security & Compliance

Authentication and Authorization:

  • Assumes trusted access to MQTT broker (no explicit auth in code).
  • Uses Google Cloud credentials for Pub/Sub and BigQuery (should be managed via service accounts).

Secrets Handling:

  • No secrets in code; relies on environment and Google Cloud IAM.

Data Sensitivity:

  • Sensor data may be sensitive; ensure BigQuery and Pub/Sub permissions are tightly scoped.

10. Common Pitfalls & Gotchas

  • Topic Structure: Expects exactly 5 parts in MQTT topic; malformed topics are ignored.
  • Environment Toggling: Features are enabled/disabled strictly by APP_ENV variable; misconfiguration can lead to data loss or test data in production.
  • BigQuery Schema Drift: Changes to table schema require code and cloud resource updates.
  • Pub/Sub Ordering: Uses site code as ordering key; if site codes are not unique or consistent, ordering may break.
  • Error Handling: Some errors are only logged, not retried; transient failures may result in data loss unless handled externally.
  • Single-threaded: Blocking operations in message handler can delay processing of subsequent messages.

11. Other components of the protocol.

  1. Mosquitto configuration
  2. Deployment