MQTT Protocol
1. Project Overview
Purpose and Scope:
This system is an MQTT subscriber designed for Freeport FMI. It connects to a local Mosquitto MQTT broker, listens for sensor data on a specific topic pattern, and performs two main actions:
- Publishes received sensor data directly to a Google Cloud Pub/Sub topic for downstream processing.
- Saves the data to a Google BigQuery table for audit and historical analysis.
Primary Use Cases:
- Real-time ingestion of sensor readings from industrial equipment or IoT devices.
- Forwarding sensor data to cloud-based event processing pipelines (via Pub/Sub).
- Persisting sensor data for compliance, auditing, or analytics (via BigQuery).
Explicitly Not Handled:
- Data transformation or enrichment beyond basic field extraction.
- Downstream processing or analytics (handled elsewhere).
- MQTT message publishing (this is a subscriber only).
- Authentication/authorization for MQTT broker (assumes local/trusted broker).
2. System Architecture
Core Components:
- MQTT Client: Subscribes to a topic pattern and receives messages.
- Pub/Sub Publisher: Publishes validated sensor events to a Google Pub/Sub topic.
- BigQuery Client: Inserts validated sensor events into a BigQuery table.
- Logging: Uses Python logging for local and cloud-based logs.
Data and Control Flow:
- MQTT client connects to broker and subscribes to a topic.
- On message receipt:
- Parses topic and payload.
- Validates required fields.
- Publishes event to Pub/Sub (if enabled).
- Inserts event into BigQuery (if enabled).
- Logs all actions and errors.
External Services and Dependencies:
- Mosquitto MQTT broker (local or remote).
- Google Cloud Pub/Sub (for event publishing).
- Google BigQuery (for data persistence).
- Google Cloud Logging (optional, not actively used in this script).
3. Core Concepts & Domain Logic
Key Abstractions and Domain Terms:
- Sensor Reading: A JSON payload containing
site_code, sensor_id, timestamp, and value. - KPI Event: The structure sent to Pub/Sub, mapping sensor readings to KPI semantics.
- Topic Parsing: MQTT topics are expected in the form
FCTS/FAE/<site>/SolvExtract/<pi_tag>.
Business/Technical Invariants:
- Only messages with all required fields are processed.
- Data is published to Pub/Sub and/or BigQuery only in certain environments (DEV, PPRD, PRD).
- Ordering key for Pub/Sub is the site code, ensuring per-site event ordering.
Mental Model:
- The system is a stateless, real-time bridge between MQTT, Pub/Sub, and BigQuery, with environment-driven feature toggling.
4. Codebase Structure
Single-file Script:
- All logic is contained in one Python file.
- Major sections: configuration, client setup, message handling, main loop.
Responsibility Boundaries:
- MQTT handling (connection, subscription, message receipt).
- Data validation and transformation.
- Pub/Sub publishing.
- BigQuery insertion.
- Logging and error handling.
What Changes Together:
- Topic parsing and payload validation logic.
- Pub/Sub and BigQuery schema changes.
5. Configuration & Environment
Environment Variables:
APP_ENV: Controls environment (LOCAL, DEV, PPRD, PRD).GCP_PROJECT_ID: Google Cloud project.BIGQUERY_DATASET, BIGQUERY_TABLE: BigQuery dataset/table.PUBSUB_TOPIC: Pub/Sub topic name.MQTT_BROKER, MQTT_PORT, MQTT_TOPIC: MQTT connection details.
Configuration Files:
- None; all configuration is via environment variables.
Environment Differences:
- In LOCAL, only logs are written (no Pub/Sub or BigQuery).
- In DEV/PPRD/PRD, Pub/Sub and BigQuery are enabled.
6. Runtime Behavior
Startup Sequence:
- Loads configuration from environment.
- Initializes BigQuery and Pub/Sub clients if enabled.
- Connects to MQTT broker and subscribes to topic.
Normal Execution Flow:
- Waits for MQTT messages.
- On message:
- Parses and validates payload.
- Publishes to Pub/Sub (if enabled).
- Inserts into BigQuery (if enabled).
- Logs all actions.
Error Handling and Logging:
- Uses Python logging for all actions and errors.
- Catches and logs JSON decode errors, Pub/Sub/BigQuery errors, and connection issues.
- Continues running on recoverable errors.
7. Deployment & Operations
Build Process:
- No explicit build; run as a Python script.
- Requires Python 3 and dependencies (
paho-mqtt, google-cloud-bigquery, google-cloud-pubsub).
Deployment Method:
- Typically deployed as a container or systemd service on a VM or cloud instance.
- Environment variables must be set appropriately.
Runtime Dependencies:
- Access to MQTT broker.
- Google Cloud credentials (for BigQuery and Pub/Sub).
Scaling and Rollback:
- Stateless; can run multiple instances for higher throughput (ensure unique MQTT client IDs).
- Rollback is as simple as redeploying a previous version.
8. Extending the System
Where and How to Add New Features:
- Add new message processing logic in
on_message. - Extend Pub/Sub or BigQuery schemas as needed (update both code and cloud resources).
- Add new output sinks (e.g., HTTP, file) by extending the message handler.
Recommended Patterns:
- Keep message validation strict.
- Use environment variables for all configuration.
- Modularize new output integrations.
Anti-patterns and Risk Areas:
- Avoid hardcoding credentials or configuration.
- Do not relax payload validation (could lead to data quality issues).
- Avoid blocking operations in the message handler (could delay message processing).
Testing Strategy:
- Unit test message parsing and validation logic.
- Use mocks for Pub/Sub and BigQuery in tests.
- Integration test with test MQTT broker and cloud resources.
9. Security & Compliance
Authentication and Authorization:
- Assumes trusted access to MQTT broker (no explicit auth in code).
- Uses Google Cloud credentials for Pub/Sub and BigQuery (should be managed via service accounts).
Secrets Handling:
- No secrets in code; relies on environment and Google Cloud IAM.
Data Sensitivity:
- Sensor data may be sensitive; ensure BigQuery and Pub/Sub permissions are tightly scoped.
10. Common Pitfalls & Gotchas
- Topic Structure: Expects exactly 5 parts in MQTT topic; malformed topics are ignored.
- Environment Toggling: Features are enabled/disabled strictly by
APP_ENV variable; misconfiguration can lead to data loss or test data in production. - BigQuery Schema Drift: Changes to table schema require code and cloud resource updates.
- Pub/Sub Ordering: Uses site code as ordering key; if site codes are not unique or consistent, ordering may break.
- Error Handling: Some errors are only logged, not retried; transient failures may result in data loss unless handled externally.
- Single-threaded: Blocking operations in message handler can delay processing of subsequent messages.
11. Other components of the protocol.

