Log in
Linked Applications
Loading…
Spaces
Glossaries
Create
Create
Hit enter to search
Announcement Banner
Help
Online Help
Keyboard Shortcuts
Feed Builder
What’s new
Available Gadgets
About Confluence
Log in
Cash Collection Wiki
Pages
…
Cash Collection Wiki Home
Digital Mining
SolvExtract
MQTT
MQTT
search
attachments
weblink
advanced
image-effects
image-attributes
Paragraph
Paragraph
Heading 1
Heading 2
Heading 3
Heading 4
Heading 5
Heading 6
Preformatted
Quote
Bold
Italic
Underline
Colour picker
More colours
Formatting
Strikethrough
Subscript
Superscript
Monospace
Clear formatting
Bullet list
Numbered list
Task list
Outdent
Indent
Align left
Align center
Align right
Page layout
Link
Table
Insert
Insert content
Files and images
Link
Markup
Horizontal rule
Task list
Date
Symbol
Insert macro
User mention
Jira Issue/Filter
Info
Add Lucidchart Diagram
draw.io Diagram
Embed draw.io Diagram
draw.io Board Diagram
Status
Gallery
Table of Contents
Google Drive Live Link
Embedded Google Drive Folder
Embedded Google Drive File
Google Drive Search Result
Other macros
Page layout
No layout
Two column (simple)
Two column (simple, left sidebar)
Two column (simple, right sidebar)
Three column (simple)
Two column
Two column (left sidebar)
Two column (right sidebar)
Three column
Three column (left and right sidebars)
Undo
Redo
Find/Replace
Keyboard shortcuts help
You are not logged in. Any changes you make will be marked as
anonymous
. You may want to
Log In
if you already have an account.
This page is also being edited by
. Your changes will be merged with theirs when you save.
<h1 style="text-align: left;"><strong>MQTT Protocol</strong></h1><h3 style="text-align: left;">1. Project Overview</h3><p style="text-align: left;"><strong>Purpose and Scope:</strong><br />This system is an MQTT subscriber designed for Freeport FMI. It connects to a local Mosquitto MQTT broker, listens for sensor data on a specific topic pattern, and performs two main actions:</p><ul style="text-align: left;"><li>Publishes received sensor data directly to a Google Cloud Pub/Sub topic for downstream processing.</li><li>Saves the data to a Google BigQuery table for audit and historical analysis.</li></ul><p style="text-align: left;"><strong>Primary Use Cases:</strong></p><ul style="text-align: left;"><li>Real-time ingestion of sensor readings from industrial equipment or IoT devices.</li><li>Forwarding sensor data to cloud-based event processing pipelines (via Pub/Sub).</li><li>Persisting sensor data for compliance, auditing, or analytics (via BigQuery).</li></ul><p style="text-align: left;"><strong>Explicitly Not Handled:</strong></p><ul style="text-align: left;"><li>Data transformation or enrichment beyond basic field extraction.</li><li>Downstream processing or analytics (handled elsewhere).</li><li>MQTT message publishing (this is a subscriber only).</li><li>Authentication/authorization for MQTT broker (assumes local/trusted broker).</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">2. System Architecture</h3><p style="text-align: left;"><strong>Core Components:</strong></p><ul style="text-align: left;"><li><strong>MQTT Client:</strong><span> </span>Subscribes to a topic pattern and receives messages.</li><li><strong>Pub/Sub Publisher:</strong><span> </span>Publishes validated sensor events to a Google Pub/Sub topic.</li><li><strong>BigQuery Client:</strong><span> </span>Inserts validated sensor events into a BigQuery table.</li><li><strong>Logging:</strong><span> </span>Uses Python logging for local and cloud-based logs.</li></ul><p style="text-align: left;"><strong>Data and Control Flow:</strong></p><ol style="text-align: left;"><li>MQTT client connects to broker and subscribes to a topic.</li><li>On message receipt:<ul><li>Parses topic and payload.</li><li>Validates required fields.</li><li>Publishes event to Pub/Sub (if enabled).</li><li>Inserts event into BigQuery (if enabled).</li><li>Logs all actions and errors.</li></ul></li></ol><p style="text-align: left;"><strong>External Services and Dependencies:</strong></p><ul style="text-align: left;"><li>Mosquitto MQTT broker (local or remote).</li><li>Google Cloud Pub/Sub (for event publishing).</li><li>Google BigQuery (for data persistence).</li><li>Google Cloud Logging (optional, not actively used in this script).</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">3. Core Concepts & Domain Logic</h3><p style="text-align: left;"><strong>Key Abstractions and Domain Terms:</strong></p><ul style="text-align: left;"><li><strong>Sensor Reading:</strong><span> </span>A JSON payload containing<span> </span><code class="undefined">site_code</code>,<span> </span><code class="undefined">sensor_id</code>,<span> </span><code class="undefined">timestamp</code>, and<span> </span><code class="undefined">value</code>.</li><li><strong>KPI Event:</strong><span> </span>The structure sent to Pub/Sub, mapping sensor readings to KPI semantics.</li><li><strong>Topic Parsing:</strong><span> </span>MQTT topics are expected in the form<span> </span><code class="undefined">FCTS/FAE/<site>/SolvExtract/<pi_tag></code>.</li></ul><p style="text-align: left;"><strong>Business/Technical Invariants:</strong></p><ul style="text-align: left;"><li>Only messages with all required fields are processed.</li><li>Data is published to Pub/Sub and/or BigQuery only in certain environments (DEV, PPRD, PRD).</li><li>Ordering key for Pub/Sub is the site code, ensuring per-site event ordering.</li></ul><p style="text-align: left;"><strong>Mental Model:</strong></p><ul style="text-align: left;"><li>The system is a stateless, real-time bridge between MQTT, Pub/Sub, and BigQuery, with environment-driven feature toggling.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">4. Codebase Structure</h3><p style="text-align: left;"><strong>Single-file Script:</strong></p><ul style="text-align: left;"><li>All logic is contained in one Python file.</li><li>Major sections: configuration, client setup, message handling, main loop.</li></ul><p style="text-align: left;"><strong>Responsibility Boundaries:</strong></p><ul style="text-align: left;"><li>MQTT handling (connection, subscription, message receipt).</li><li>Data validation and transformation.</li><li>Pub/Sub publishing.</li><li>BigQuery insertion.</li><li>Logging and error handling.</li></ul><p style="text-align: left;"><strong>What Changes Together:</strong></p><ul style="text-align: left;"><li>Topic parsing and payload validation logic.</li><li>Pub/Sub and BigQuery schema changes.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">5. Configuration & Environment</h3><p style="text-align: left;"><strong>Environment Variables:</strong></p><ul style="text-align: left;"><li><code class="undefined">APP_ENV</code>: Controls environment (LOCAL, DEV, PPRD, PRD).</li><li><code class="undefined">GCP_PROJECT_ID</code>: Google Cloud project.</li><li><code class="undefined">BIGQUERY_DATASET</code>,<span> </span><code class="undefined">BIGQUERY_TABLE</code>: BigQuery dataset/table.</li><li><code class="undefined">PUBSUB_TOPIC</code>: Pub/Sub topic name.</li><li><code class="undefined">MQTT_BROKER</code>,<span> </span><code class="undefined">MQTT_PORT</code>,<span> </span><code class="undefined">MQTT_TOPIC</code>: MQTT connection details.</li></ul><p style="text-align: left;"><strong>Configuration Files:</strong></p><ul style="text-align: left;"><li>None; all configuration is via environment variables.</li></ul><p style="text-align: left;"><strong>Environment Differences:</strong></p><ul style="text-align: left;"><li>In LOCAL, only logs are written (no Pub/Sub or BigQuery).</li><li>In DEV/PPRD/PRD, Pub/Sub and BigQuery are enabled.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">6. Runtime Behavior</h3><p style="text-align: left;"><strong>Startup Sequence:</strong></p><ul style="text-align: left;"><li>Loads configuration from environment.</li><li>Initializes BigQuery and Pub/Sub clients if enabled.</li><li>Connects to MQTT broker and subscribes to topic.</li></ul><p style="text-align: left;"><strong>Normal Execution Flow:</strong></p><ul style="text-align: left;"><li>Waits for MQTT messages.</li><li>On message:<ul><li>Parses and validates payload.</li><li>Publishes to Pub/Sub (if enabled).</li><li>Inserts into BigQuery (if enabled).</li><li>Logs all actions.</li></ul></li></ul><p style="text-align: left;"><strong>Error Handling and Logging:</strong></p><ul style="text-align: left;"><li>Uses Python logging for all actions and errors.</li><li>Catches and logs JSON decode errors, Pub/Sub/BigQuery errors, and connection issues.</li><li>Continues running on recoverable errors.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">7. Deployment & Operations</h3><p style="text-align: left;"><strong>Build Process:</strong></p><ul style="text-align: left;"><li>No explicit build; run as a Python script.</li><li>Requires Python 3 and dependencies (<code class="undefined">paho-mqtt</code>,<span> </span><code class="undefined">google-cloud-bigquery</code>,<span> </span><code class="undefined">google-cloud-pubsub</code>).</li></ul><p style="text-align: left;"><strong>Deployment Method:</strong></p><ul style="text-align: left;"><li>Typically deployed as a container or systemd service on a VM or cloud instance.</li><li>Environment variables must be set appropriately.</li></ul><p style="text-align: left;"><strong>Runtime Dependencies:</strong></p><ul style="text-align: left;"><li>Access to MQTT broker.</li><li>Google Cloud credentials (for BigQuery and Pub/Sub).</li></ul><p style="text-align: left;"><strong>Scaling and Rollback:</strong></p><ul style="text-align: left;"><li>Stateless; can run multiple instances for higher throughput (ensure unique MQTT client IDs).</li><li>Rollback is as simple as redeploying a previous version.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">8. Extending the System</h3><p style="text-align: left;"><strong>Where and How to Add New Features:</strong></p><ul style="text-align: left;"><li>Add new message processing logic in<span> </span><code class="undefined">on_message</code>.</li><li>Extend Pub/Sub or BigQuery schemas as needed (update both code and cloud resources).</li><li>Add new output sinks (e.g., HTTP, file) by extending the message handler.</li></ul><p style="text-align: left;"><strong>Recommended Patterns:</strong></p><ul style="text-align: left;"><li>Keep message validation strict.</li><li>Use environment variables for all configuration.</li><li>Modularize new output integrations.</li></ul><p style="text-align: left;"><strong>Anti-patterns and Risk Areas:</strong></p><ul style="text-align: left;"><li>Avoid hardcoding credentials or configuration.</li><li>Do not relax payload validation (could lead to data quality issues).</li><li>Avoid blocking operations in the message handler (could delay message processing).</li></ul><p style="text-align: left;"><strong>Testing Strategy:</strong></p><ul style="text-align: left;"><li>Unit test message parsing and validation logic.</li><li>Use mocks for Pub/Sub and BigQuery in tests.</li><li>Integration test with test MQTT broker and cloud resources.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">9. Security & Compliance</h3><p style="text-align: left;"><strong>Authentication and Authorization:</strong></p><ul style="text-align: left;"><li>Assumes trusted access to MQTT broker (no explicit auth in code).</li><li>Uses Google Cloud credentials for Pub/Sub and BigQuery (should be managed via service accounts).</li></ul><p style="text-align: left;"><strong>Secrets Handling:</strong></p><ul style="text-align: left;"><li>No secrets in code; relies on environment and Google Cloud IAM.</li></ul><p style="text-align: left;"><strong>Data Sensitivity:</strong></p><ul style="text-align: left;"><li>Sensor data may be sensitive; ensure BigQuery and Pub/Sub permissions are tightly scoped.</li></ul><hr style="text-align: left;" /><h3 style="text-align: left;">10. Common Pitfalls & Gotchas</h3><ul style="text-align: left;"><li><strong>Topic Structure:</strong><span> </span>Expects exactly 5 parts in MQTT topic; malformed topics are ignored.</li><li><strong>Environment Toggling:</strong><span> </span>Features are enabled/disabled strictly by<span> </span><code class="undefined">APP_ENV</code><span> </span>variable; misconfiguration can lead to data loss or test data in production.</li><li><strong>BigQuery Schema Drift:</strong><span> </span>Changes to table schema require code and cloud resource updates.</li><li><strong>Pub/Sub Ordering:</strong><span> </span>Uses site code as ordering key; if site codes are not unique or consistent, ordering may break.</li><li><strong>Error Handling:</strong><span> </span>Some errors are only logged, not retried; transient failures may result in data loss unless handled externally.</li><li><strong>Single-threaded:</strong><span> </span>Blocking operations in message handler can delay processing of subsequent messages.</li></ul><h3>11. Other components of the protocol.</h3><ol><li><img class="editor-inline-macro" src="/plugins/servlet/confluence/placeholder/macro?definition=e2FkZC1wYWdlOm5hbWU9TW9zcXVpdHRvIGNvbmZpZ3VyYXRpb258bGlua1RleHQ9TW9zcXVpdHRvIGNvbmZpZ3VyYXRpb259&locale=en_GB&version=2" data-macro-name="add-page" data-macro-id="7ce2df84-677f-4c53-9cdf-bb1420c5057a" role="button" tabindex="0" aria-haspopup="true" aria-label="add-page macro" data-macro-parameters="linkText=Mosquitto configuration|name=Mosquitto configuration" data-macro-schema-version="1"></li><li><img class="editor-inline-macro" src="/plugins/servlet/confluence/placeholder/macro?definition=e2FkZC1wYWdlOm5hbWU9RGVwbG95bWVudHxsaW5rVGV4dD1EZXBsb3ltZW50fQ&locale=en_GB&version=2" data-macro-name="add-page" data-macro-id="e5fd81c6-d361-4b40-8b13-be08a62fc057" role="button" tabindex="0" aria-haspopup="true" aria-label="add-page macro" data-macro-parameters="linkText=Deployment|name=Deployment" data-macro-schema-version="1"></li></ol>
Edit
Preview
Save
Close
{"serverDuration": 75, "requestCorrelationId": "afdcb1f2c3d27a7a"}