GitLab: GitLab


ALMINA Plant Data Pipeline – Technical Documentation


Overview

This pipeline automates the extraction, transformation, validation, and loading (ETL) of daily plant data from ALMINA’s Excel workbooks into a BigQuery summary table. It is designed for both batch and event-driven (GCP Cloud Function) operation, with robust error handling, logging, and schema management.


Architecture & Workflow

High-level steps:

  1. Ingestion: Excel files are read from Google Cloud Storage (GCS), either in batch or via Cloud Function trigger.
  2. Mapping: The MappingTest worksheet defines how to extract and map data from daily sheets.
  3. Extraction & Transformation: Data is extracted per shift, cleaned, and converted to the correct types.
  4. Validation: Data quality is checked, and metadata is attached.
  5. Loading: The processed DataFrame is uploaded to BigQuery, with schema alignment and idempotency.
  6. Logging: All steps are logged with structured, context-rich messages.

Configuration

  • All configuration is centralized in the Settings dataclass, loaded from environment variables.
  • Required: GCP_PROJECT_ID, GCS_BUCKET_NAME, BQ_DATASET_ID
  • Optional: BQ_TABLE_ID, CUSTOMER_ID, GOOGLE_APPLICATION_CREDENTIALS
  • Retry, partitioning, and clustering options are configurable.

Module Reference

Main Script (main.py)

  • Purpose: Entry point for both local batch and GCP Cloud Function execution.
  • Key Functions:
    • main(): Batch processing from local Excel file to BigQuery.
    • almina_extra_data_mapping_trigger(cloud_event): Cloud Function for GCS-triggered processing.
    • prepare_dataframe_for_bigquery(df): Ensures DataFrame is BigQuery-compatible (types, nulls, etc.).
  • Dependencies: All core modules below.

SummaryDataProcessor

  • Purpose: Orchestrates the extraction and transformation of plant data from Excel workbooks using mapping rules.
  • Key Methods:
    • process_workbook(raw_workbook_bytes, mapping_csv_rows, file_name, upload_time_to_bucket): Main entry point; processes a workbook and returns a DataFrame.
    • _identify_daily_sheets(workbook): Finds daily sheets by name pattern.
    • _process_sheet(...): Extracts data for both shifts from a sheet.
    • _calculate_derived_fields(record, sheet_name, shift): Derives date, time, shift, and other fields.
    • _extract_cell_value(...): Extracts and cleans a value from a cell or range.
    • _add_metadata(df, ...): Adds metadata columns.
    • _calculate_quality_score(df, error_count): Computes a data quality score.
  • Dependencies: Mapping rules, data transformer, logging, sanitizers.

Field Mapping & Mapping Rules

  • Purpose: Defines how to interpret the MappingTest worksheet and map fields to cell references.
  • Key Classes:
    • CellReference: Represents a cell or range, with optional sheet override.
    • FieldMappingRule: Holds mapping info for a single field (cell refs, clues, required, examples).
    • MappingConfig: Aggregates all field mapping rules.
    • MappingTestParser: Parses mapping rows into a MappingConfig.
  • Key Function:
    • parse_mapping_test_rows(rows): Parses mapping rows into config.

Data Transformation

  • Purpose: Converts raw extracted values to the correct types for BigQuery.
  • Key Class:
    • DataTransformer: Handles numeric, date, timestamp, and string parsing, with error handling and logging.
  • Key Function:
    • transform_dataframe(df, schema_types, logger): Applies type transformation to an entire DataFrame.

BigQuery Schema & Utilities

  • Purpose: Defines and generates BigQuery schemas based on mapping fields.
  • Key Classes/Functions:
    • SummaryTableSchema: Canonical schema with field/type lists and utilities.
    • build_schema_from_mapping_fields(field_names): Builds schema from mapping fields, inferring types.
    • _infer_field_type(clean_name, original_name): Infers BigQuery type from field name.
    • SummaryBigQuerySchema: Generates schema from mapping config, using example values for type inference.

BigQuery Service

  • Purpose: Handles all interactions with BigQuery: table creation, schema management, and data upload.
  • Key Class:
    • SummaryBigQueryService
      • ensure_table_exists(field_names, ...): Ensures the summary table exists and is up-to-date.
      • upload_dataframe(df, ...): Uploads DataFrame, with idempotency and schema alignment.
      • _upsert_via_merge(df): Upserts using staging table and MERGE.
      • _prepare_dataframe_for_upload(df): Cleans and converts DataFrame for upload.
      • _final_pyarrow_safety_check(df): Ensures PyArrow compatibility.
      • _align_dataframe_with_bq_schema(df): Aligns DataFrame types with BQ schema.
      • _truncate_datetime_columns_to_microseconds(df): Ensures timestamp precision.
      • _ensure_record_id(df): Adds idempotency key.
      • query_recent_data(limit, days_back): Queries recent data for validation/testing.
      • get_table_info(): Returns table metadata.

GCP Storage Service

  • Purpose: Encapsulates all Google Cloud Storage operations.
  • Key Class:
    • GCPStorageService
      • list_xlsx_files(): Lists all .xlsx files in the bucket.
      • download_file(file_name): Downloads a file and returns its bytes and upload time.
  • Implements: Retry logic for robustness.

Logging Utilities

  • Purpose: Provides structured logging for both local and cloud environments.
  • Key Class:
    • StructuredLogger: Logs messages with severity, component, timestamp, and extra metadata.
  • Key Function:
    • setup_logging(component): Configures and returns a StructuredLogger.

Settings & Configuration

  • Purpose: Centralizes all configuration and environment variables.
  • Key Class:
    • Settings: Immutable dataclass for all settings.
      • from_env(): Loads from environment.
      • validate(): Ensures required settings are present.
  • Key Functions:
    • get_settings(): Returns singleton settings instance.
  • Global: settings is the default instance.

Sanitizers

  • Purpose: Ensures all column names are BigQuery-safe and snake_case.
  • Key Function:
    • sanitize_column_name(original_name): Converts any string to a valid BigQuery column name.

Retry Utilities

  • Purpose: Provides a decorator for retrying functions with exponential backoff.
  • Key Function:
    • retry_with_backoff(max_retries, backoff_factor): Decorator for robust retry logic, with logging.

Data Flow Summary

  1. File Ingestion:
    • Files are listed/downloaded from GCS via GCPStorageService.
  2. Mapping Extraction:
    • The MappingTest worksheet is parsed into a MappingConfig using MappingTestParser.
  3. Data Extraction:
    • For each daily sheet and shift, values are extracted using mapping rules.
  4. Transformation:
    • Raw values are converted to correct types using DataTransformer and transform_dataframe.
  5. Validation & Metadata:
    • Data quality is checked, and metadata columns are added.
  6. BigQuery Upload:
    • DataFrame is cleaned, aligned, and uploaded to BigQuery using SummaryBigQueryService.
  7. Logging:
    • All steps are logged with context for traceability.

Error Handling & Logging

  • StructuredLogger ensures all logs are context-rich and compatible with GCP logging.
  • retry_with_backoff ensures transient errors are retried with exponential backoff.
  • Validation errors are accumulated and attached as metadata to each record.
  • Critical failures are logged and raised, preventing silent data loss.

Extensibility & Best Practices

  • Mapping-driven: New fields can be added by updating the MappingTest worksheet and field mappings.
  • Schema evolution: The pipeline can add new columns to BigQuery as needed.
  • Separation of concerns: Each module handles a single responsibility.
  • Cloud-native: Designed for both local and GCP Cloud Function execution.

Security & Compliance

  • Authentication: Uses service accounts and environment variables for secure GCP access.
  • Data Handling: All sensitive operations are logged; errors are surfaced for review.
  • Compliance: Ensure GCP IAM roles and data residency requirements are met for your organization.

Appendix: Example Usage

Local Batch Processing

export GCP_PROJECT_ID=your-project
export GCS_BUCKET_NAME=your-bucket
export BQ_DATASET_ID=your-dataset
python main.py

Cloud Function Deployment

  • Deploy almina_extra_data_mapping_trigger as a GCP Cloud Function (Gen2) with a GCS trigger.
  • Ensure all environment variables are set in the Cloud Function configuration.