This pipeline automates the extraction, transformation, validation, and loading (ETL) of daily plant data from ALMINA’s Excel workbooks into a BigQuery summary table. It is designed for both batch and event-driven (GCP Cloud Function) operation, with robust error handling, logging, and schema management.
High-level steps:
MappingTest worksheet defines how to extract and map data from daily sheets.Settings dataclass, loaded from environment variables.GCP_PROJECT_ID, GCS_BUCKET_NAME, BQ_DATASET_IDBQ_TABLE_ID, CUSTOMER_ID, GOOGLE_APPLICATION_CREDENTIALSmain.py)main(): Batch processing from local Excel file to BigQuery.almina_extra_data_mapping_trigger(cloud_event): Cloud Function for GCS-triggered processing.prepare_dataframe_for_bigquery(df): Ensures DataFrame is BigQuery-compatible (types, nulls, etc.).process_workbook(raw_workbook_bytes, mapping_csv_rows, file_name, upload_time_to_bucket): Main entry point; processes a workbook and returns a DataFrame._identify_daily_sheets(workbook): Finds daily sheets by name pattern._process_sheet(...): Extracts data for both shifts from a sheet._calculate_derived_fields(record, sheet_name, shift): Derives date, time, shift, and other fields._extract_cell_value(...): Extracts and cleans a value from a cell or range._add_metadata(df, ...): Adds metadata columns._calculate_quality_score(df, error_count): Computes a data quality score.MappingTest worksheet and map fields to cell references.CellReference: Represents a cell or range, with optional sheet override.FieldMappingRule: Holds mapping info for a single field (cell refs, clues, required, examples).MappingConfig: Aggregates all field mapping rules.MappingTestParser: Parses mapping rows into a MappingConfig.parse_mapping_test_rows(rows): Parses mapping rows into config.DataTransformer: Handles numeric, date, timestamp, and string parsing, with error handling and logging.transform_dataframe(df, schema_types, logger): Applies type transformation to an entire DataFrame.SummaryTableSchema: Canonical schema with field/type lists and utilities.build_schema_from_mapping_fields(field_names): Builds schema from mapping fields, inferring types._infer_field_type(clean_name, original_name): Infers BigQuery type from field name.SummaryBigQuerySchema: Generates schema from mapping config, using example values for type inference.SummaryBigQueryServiceensure_table_exists(field_names, ...): Ensures the summary table exists and is up-to-date.upload_dataframe(df, ...): Uploads DataFrame, with idempotency and schema alignment._upsert_via_merge(df): Upserts using staging table and MERGE._prepare_dataframe_for_upload(df): Cleans and converts DataFrame for upload._final_pyarrow_safety_check(df): Ensures PyArrow compatibility._align_dataframe_with_bq_schema(df): Aligns DataFrame types with BQ schema._truncate_datetime_columns_to_microseconds(df): Ensures timestamp precision._ensure_record_id(df): Adds idempotency key.query_recent_data(limit, days_back): Queries recent data for validation/testing.get_table_info(): Returns table metadata.GCPStorageServicelist_xlsx_files(): Lists all .xlsx files in the bucket.download_file(file_name): Downloads a file and returns its bytes and upload time.StructuredLogger: Logs messages with severity, component, timestamp, and extra metadata.setup_logging(component): Configures and returns a StructuredLogger.Settings: Immutable dataclass for all settings.from_env(): Loads from environment.validate(): Ensures required settings are present.get_settings(): Returns singleton settings instance.settings is the default instance.sanitize_column_name(original_name): Converts any string to a valid BigQuery column name.retry_with_backoff(max_retries, backoff_factor): Decorator for robust retry logic, with logging.GCPStorageService.MappingTest worksheet is parsed into a MappingConfig using MappingTestParser.DataTransformer and transform_dataframe.SummaryBigQueryService.MappingTest worksheet and field mappings.export GCP_PROJECT_ID=your-project
export GCS_BUCKET_NAME=your-bucket
export BQ_DATASET_ID=your-dataset
python main.py
almina_extra_data_mapping_trigger as a GCP Cloud Function (Gen2) with a GCS trigger.