GitLab: GitLab
ALMINA Plant Data Pipeline – Technical Documentation
Overview
This pipeline automates the extraction, transformation, validation, and loading (ETL) of daily plant data from ALMINA’s Excel workbooks into a BigQuery summary table. It is designed for both batch and event-driven (GCP Cloud Function) operation, with robust error handling, logging, and schema management.
Architecture & Workflow
High-level steps:
- Ingestion: Excel files are read from Google Cloud Storage (GCS), either in batch or via Cloud Function trigger.
- Mapping: The
MappingTestworksheet defines how to extract and map data from daily sheets. - Extraction & Transformation: Data is extracted per shift, cleaned, and converted to the correct types.
- Validation: Data quality is checked, and metadata is attached.
- Loading: The processed DataFrame is uploaded to BigQuery, with schema alignment and idempotency.
- Logging: All steps are logged with structured, context-rich messages.
Configuration
- All configuration is centralized in the
Settingsdataclass, loaded from environment variables. - Required:
GCP_PROJECT_ID,GCS_BUCKET_NAME,BQ_DATASET_ID - Optional:
BQ_TABLE_ID,CUSTOMER_ID,GOOGLE_APPLICATION_CREDENTIALS - Retry, partitioning, and clustering options are configurable.
Module Reference
Main Script (main.py)
- Purpose: Entry point for both local batch and GCP Cloud Function execution.
- Key Functions:
main(): Batch processing from local Excel file to BigQuery.almina_extra_data_mapping_trigger(cloud_event): Cloud Function for GCS-triggered processing.prepare_dataframe_for_bigquery(df): Ensures DataFrame is BigQuery-compatible (types, nulls, etc.).
- Dependencies: All core modules below.
SummaryDataProcessor
- Purpose: Orchestrates the extraction and transformation of plant data from Excel workbooks using mapping rules.
- Key Methods:
process_workbook(raw_workbook_bytes, mapping_csv_rows, file_name, upload_time_to_bucket): Main entry point; processes a workbook and returns a DataFrame._identify_daily_sheets(workbook): Finds daily sheets by name pattern._process_sheet(...): Extracts data for both shifts from a sheet._calculate_derived_fields(record, sheet_name, shift): Derives date, time, shift, and other fields._extract_cell_value(...): Extracts and cleans a value from a cell or range._add_metadata(df, ...): Adds metadata columns._calculate_quality_score(df, error_count): Computes a data quality score.
- Dependencies: Mapping rules, data transformer, logging, sanitizers.
Field Mapping & Mapping Rules
- Purpose: Defines how to interpret the
MappingTestworksheet and map fields to cell references. - Key Classes:
CellReference: Represents a cell or range, with optional sheet override.FieldMappingRule: Holds mapping info for a single field (cell refs, clues, required, examples).MappingConfig: Aggregates all field mapping rules.MappingTestParser: Parses mapping rows into aMappingConfig.
- Key Function:
parse_mapping_test_rows(rows): Parses mapping rows into config.
Data Transformation
- Purpose: Converts raw extracted values to the correct types for BigQuery.
- Key Class:
DataTransformer: Handles numeric, date, timestamp, and string parsing, with error handling and logging.
- Key Function:
transform_dataframe(df, schema_types, logger): Applies type transformation to an entire DataFrame.
BigQuery Schema & Utilities
- Purpose: Defines and generates BigQuery schemas based on mapping fields.
- Key Classes/Functions:
SummaryTableSchema: Canonical schema with field/type lists and utilities.build_schema_from_mapping_fields(field_names): Builds schema from mapping fields, inferring types._infer_field_type(clean_name, original_name): Infers BigQuery type from field name.SummaryBigQuerySchema: Generates schema from mapping config, using example values for type inference.
BigQuery Service
- Purpose: Handles all interactions with BigQuery: table creation, schema management, and data upload.
- Key Class:
SummaryBigQueryServiceensure_table_exists(field_names, ...): Ensures the summary table exists and is up-to-date.upload_dataframe(df, ...): Uploads DataFrame, with idempotency and schema alignment._upsert_via_merge(df): Upserts using staging table and MERGE._prepare_dataframe_for_upload(df): Cleans and converts DataFrame for upload._final_pyarrow_safety_check(df): Ensures PyArrow compatibility._align_dataframe_with_bq_schema(df): Aligns DataFrame types with BQ schema._truncate_datetime_columns_to_microseconds(df): Ensures timestamp precision._ensure_record_id(df): Adds idempotency key.query_recent_data(limit, days_back): Queries recent data for validation/testing.get_table_info(): Returns table metadata.
GCP Storage Service
- Purpose: Encapsulates all Google Cloud Storage operations.
- Key Class:
GCPStorageServicelist_xlsx_files(): Lists all.xlsxfiles in the bucket.download_file(file_name): Downloads a file and returns its bytes and upload time.
- Implements: Retry logic for robustness.
Logging Utilities
- Purpose: Provides structured logging for both local and cloud environments.
- Key Class:
StructuredLogger: Logs messages with severity, component, timestamp, and extra metadata.
- Key Function:
setup_logging(component): Configures and returns aStructuredLogger.
Settings & Configuration
- Purpose: Centralizes all configuration and environment variables.
- Key Class:
Settings: Immutable dataclass for all settings.from_env(): Loads from environment.validate(): Ensures required settings are present.
- Key Functions:
get_settings(): Returns singleton settings instance.
- Global:
settingsis the default instance.
Sanitizers
- Purpose: Ensures all column names are BigQuery-safe and snake_case.
- Key Function:
sanitize_column_name(original_name): Converts any string to a valid BigQuery column name.
Retry Utilities
- Purpose: Provides a decorator for retrying functions with exponential backoff.
- Key Function:
retry_with_backoff(max_retries, backoff_factor): Decorator for robust retry logic, with logging.
Data Flow Summary
- File Ingestion:
- Files are listed/downloaded from GCS via
GCPStorageService.
- Files are listed/downloaded from GCS via
- Mapping Extraction:
- The
MappingTestworksheet is parsed into aMappingConfigusingMappingTestParser.
- The
- Data Extraction:
- For each daily sheet and shift, values are extracted using mapping rules.
- Transformation:
- Raw values are converted to correct types using
DataTransformerandtransform_dataframe.
- Raw values are converted to correct types using
- Validation & Metadata:
- Data quality is checked, and metadata columns are added.
- BigQuery Upload:
- DataFrame is cleaned, aligned, and uploaded to BigQuery using
SummaryBigQueryService.
- DataFrame is cleaned, aligned, and uploaded to BigQuery using
- Logging:
- All steps are logged with context for traceability.
Error Handling & Logging
- StructuredLogger ensures all logs are context-rich and compatible with GCP logging.
- retry_with_backoff ensures transient errors are retried with exponential backoff.
- Validation errors are accumulated and attached as metadata to each record.
- Critical failures are logged and raised, preventing silent data loss.
Extensibility & Best Practices
- Mapping-driven: New fields can be added by updating the
MappingTestworksheet and field mappings. - Schema evolution: The pipeline can add new columns to BigQuery as needed.
- Separation of concerns: Each module handles a single responsibility.
- Cloud-native: Designed for both local and GCP Cloud Function execution.
Security & Compliance
- Authentication: Uses service accounts and environment variables for secure GCP access.
- Data Handling: All sensitive operations are logged; errors are surfaced for review.
- Compliance: Ensure GCP IAM roles and data residency requirements are met for your organization.
Appendix: Example Usage
Local Batch Processing
export GCP_PROJECT_ID=your-project
export GCS_BUCKET_NAME=your-bucket
export BQ_DATASET_ID=your-dataset
python main.py
Cloud Function Deployment
- Deploy
almina_extra_data_mapping_triggeras a GCP Cloud Function (Gen2) with a GCS trigger. - Ensure all environment variables are set in the Cloud Function configuration.
