7.1 - Dataiku Project Architecture

Energy Optimization Product is iterative in its nature.
As a self-service product, it is designed and developed by the end-users.

Each iteration represents an optimizer for a certain industrial site of Solvay. Currently the following sites are covered:

Additionally for the orchestration and monitoring, additional Dataiku project is designed.

Folder contains 7 projects

Each site has a similar structure, with slightly different requirements.

7.2 - Dataflow

The dataflow in each Dataiku project is the same:

  1. Price_forecast zone:
    • DPL.V_FACT_meteo_data data meteo data is resampled  UPD 13/01/2025: After the Vendohm dataset removal, the meteo data was never replaced, hence the dataset is removed.  
    • DPL.V_FACT_energy_price_forecast price forecast is imported
    • DPL.V_FACT_energy_price_hourly energy price data is imported
    • DPL.V_FACT_solid_fuel_wap solid fuel price data is resampled
    • Energy price forecast is stored in dss-prod-design-sql Postgres database
  2. Optimization zone:
    • DPL.V_FACT_{site}_queue optimization queue is synced into dss-prod-design-sql Postgres database
    • Optimization is performed for every observation in the queue
    • Processed observations and the output of optimization are synced into dedicated GCS bucket
    • Data from GCS bucket is synced into BigQuery DS_APP_dataiku database

Since each site is different, the flow has some changes to accommodate the process of each site.

7.2.1 - Bernburg & Dombasle

Optimization in Bernburg and Dombasle is not using the meteo data yet.
In the Price_forecast no data from DPL.V_FACT_meteo_data is used. In the Optimization zone the price forecast data is reused to feed the optimization directly.


Dombasle flow

7.2.2 - Devnya

Optimization in Devnya is not taking any price changes into account.
The only zone existing in the flow is the Optimizaiton zone with a unique source of data - DPL.V_FACT_{site}_queue.
Optimiztion is performed using the WebApp input only.

Devnya flow

7.2.3 - Rheinberg

Optimization in Rheinberg is not using the meteo data yet.
In the Price_forecast no data from DPL.V_FACT_meteo_data is used.

Rheinberg flow

7.2.4 - Rosignano

The specificity of this project is that Vendohm data in Price_forecast zone contain only the gas prices, while in Optimization zone it contains electricity prices. These datasets are independent and connected to BQ separately.

Rosignano flow

7.2.5 - Torrelavega

The specificity of this project is that Vendohm data is synced into the Optimization zone as well to feed the Optimization.

Torrelavega flow

7.3 - Data Preprocessing

7.3.1 Vendohm price

select date, {electricity_index}, {gas_index}
from prj-data-robustify-dev.DPL.V_FACT_energy_price_hourly

The following indices are used:

siteelectricity_indexgas_index
Bernburgelec_DE_EPEXgas_DE_THE
Dombasleelec_FR_EPEXgas_FR_PEG
Rheinbergelec_DE_EPEXgas_DE_THE
Rosignanoelec_IT_MGP, elec_IT_PUNgas_IT_PSV
Torrelavega---gas_TTF, gas_SP_PVB

It is already preprocessed in Talend and BigQuery, so no additional processing is required in Dataiku.

7.3.2 Meteologica price forecast

    SELECT
        datetime_from as date,
        price as elec_price
    FROM
        `prj-data-robustify-dev.DPL.V_FACT_energy_price_forecast`
    where country={country}
    ORDER BY
        datetime_from desc;
        

It is already preprocessed in Talend and BigQuery, so no additional processing is required in Dataiku.

7.3.3 Solid fuel price

For each site (except for Devnya) the query is personalized to import only the required parameters.

    SELECT
        date,
        {site}_metcoke as coke_price, 
        {site}_anthracite as anthracite_price 
    FROM `prj-data-robustify-dev.DPL.V_FACT_solid_fuel_wap` 
    order by date
        

It is partially preprocessed in Talend and BigQuery. Additional processing is performed:

  • Drop complete duplicates
  • Resample to get hourly data to ensure the correct granularity during join with energy data.

7.3.4 Meteo data  UPD 13/01/2025: After the Vendohm dataset removal, the meteo data was never replaced, hence the dataset is removed.  

This query could be simplified to extract only the columns used in the following processing.

    SELECT 
(case when raw_curve_id={index1} then "ambient_temperature" when raw_curve_id={index2} then "RH" end) as parameter, published_date, delivery_date, meteo_value FROM `prj-data-robustify-dev.DPL.V_FACT_meteo_data` where raw_curve_id in ({index1}, {index2})

The following indices are used:

siterelative humidity (RH)ambient temperature
Bernburg438000035438000034
Rosignano438000005438000004
Torrelavega438000059438000058

This data is not processed, so the following transformations are applied:

  • Replace index ids by their names
  • For each index select the latest available information for every datetime
  • Create a pivot table
  df2 = (df.groupby('parameter')
           .apply(lambda x: x.sort_values('published_date')
                             .drop_duplicates(subset='delivery_date', keep='last'))
           .reset_index(level=0, drop=True)[['parameter', 'delivery_date', 'meteo_value']])
  df3 = pd.pivot_table(df2, index='delivery_date', columns='parameter', values='meteo_value')
        

Finally using the Time Series Preparation plugin the dataset is resampled to Hourly data.

7.4 - Optimization

Optimization step is defined independently for each site due to the specificities of the energy system of the plant.
For each site the main optimization code is defined by Energy team.

The structure of the code in optimizer follows:

  1. Import libraries
  2. Import queue and additiional datasets
  3. Define the names_output
  4. Get the list of IDs in the queue
  5. Get the token
  6. Definition of the optimize_run function
    • Update the status to 'Running' in the WebApp
    • Select the data for the current run
    • Get the fixed variables (from the Dataiku custom variables)
    • Define the variables, constraints and optimization model
    • Try to solve the problem. If solvable, then postprocess and store. Else, define a row with 0s except Period.
    • Add OptimizationId, mode and User to the output
  7. Map the function to the queue. If any exception is occurred, define a row with 0s except Period.
  8. Change the column names according to the GCP schema
  9. Determine the status of each OptimizationId in the queue. If for any iteration the status is optimal, the status of the whole ID is optimal. Else infeasible or failed.
  10. Push the status of each iteration to the WebApp 
  11. Store the optimization output and the statuses


Maintenance of the Python recipees is split:

  1. Steps 6-7 are maintained by Alba and DT Data Science team.
  2. Steps 1-5 and 8-11 are maintained by DataOps team.

7.5 - Scenarios

7.5.1 - Base Scenario

UPD 23/04/2024: This scenario is deprecated 

Trigger

  • Name: Dataset Updated on GCP
  • Runs every: 120 seconds
  • Code:
from dataiku.scenarios import Trigger
import dataiku
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery

client = dataiku.api.client()
current_project = client.get_project({project_id})
variables = current_project.get_variables()
credentials = variables['standard']['bigquery']

credentials = service_account.Credentials.from_service_account_info(credentials)

project_id = 'prj-data-robustify-dev'
bqclient = bigquery.Client(project=project_id, credentials=credentials)

query_job = bqclient.query(
    """
    SELECT *
    from `prj-data-robustify-dev.DPL.{table_name}`"""
)
query_response = query_job.result()
results = [dict(r) for r in query_response]
df = pd.DataFrame(results)
output = df.loc[0].map(str).to_dict()
t = Trigger()

if not (pd.Series(variables['standard']['last_update'])==df.loc[0].map(str)).min():
    t.fire()
    variables['standard']['last_update'] = output
current_project.set_variables(variables)

Reporters

  • Email when failed
  • Receivers:
    • Eldiias Dzhamankulov

Steps

  • Build optimization_queue_copy [Build just these items]
  • Output [Build just these items]
  • Clear jobs macros
  • Clear scenario logs macros

7.5.2 - Energy prices

Devnya doesn't need this scenario.

Trigger

  • Run daily at a given time (project dependent)

Reporters

  • Email when failed
  • Receivers:
    • Rafael Truan Cacho
    • Alba Carrero
    • Eldiias Dzhamankulov

Steps

  • Build resampled data [Force build dependencies]
  • Build Energy dataset [Build required dependencies]
  • Clear jobs macros
  • Clear scenario logs macros

7.5.3 - Optimizer

UPD 23/04/2024: Steps are changed

Trigger

No triggers required, since it is launched from orchestrator

Reporters

  • When scenario fails, API response fails as well. Therefore, no need of additional reporting.

Steps

  • Build optimization_queue_copy [Build just these items]
  • Build Output [Build just these items]
  • Run scenario "Sync BD"
  • Clear jobs macros
  • Clear scenario logs macros

7.5.4 - Sync BQ

UPD 23/04/2024: Trigger and Steps are changed

UPD 23/02/2024: Disabled Trigger 

  • Dataset modification: Output
  • Run every: 45 seconds
  • Grace delay: 20 seconds

Since the scenario is launched from "Optimizer" scenario, no explicit trigger is reaquired.

Reporters

No need of reporting. No failures expected.

Steps

  • Build GCS tables [Build just these items]
  • Build BQ tables [Build just these items]
  • Refresh Tableau [custom python function below]
  • Send status update to the WebApp [custom python function below]
  • Clear jobs macros
  • Clear scenario logs macros
Refresh Tableau
import requests as r
from bs4 import BeautifulSoup
#authenticate yourself
s = r.Session()
url = "https://eu-west-1a.online.tableau.com/api/3.22/auth/signin"

mapping = {
    "Devnia_prod":"53b9800f-4e93-4970-9829-2101b3f61c51",
    "Dombasle_prod":"443443ca-394a-432c-bdc9-1f094655ba3c,",
    "Torrelavega_prod":"da214835-658f-4bd2-bb3f-7d569488ecd1",
    "Benburg_dev":"12ed7f34-03ff-44f8-a659-13bf38585353",
    "Devnia_dev":"a823cd38-28b6-488d-92b7-20236f262043",
    "Dombasle_dev":"7c3a9d3d-0944-4615-a53c-f2f8ee33090f",
    "Rheinberg_dev":"b764d3b9-65eb-4525-899f-bb27bc45c8b0",
    "Rosignano_dev":"be6aacbc-5a1d-434f-8ec7-0e6261f2a28e",
    "Torrelavega_dev":"0be2b2b0-fcbd-4cd7-a4f9-129d8c30b643",
    "Benburg_UAT":"023eedd2-ea51-4563-a0f4-b8993893312d",
    "Devnia_UAT":"c143af21-e53a-4b51-aaed-dcda78ef204d",
    "Dombasle_UAT":"727310be-8ba3-4284-b244-ce55a626015d",
    "Rheinberg_UAT":"094ff065-3569-4ed6-8643-3d82c1f0b31a",
    "Rosignano_UAT":"83a5876f-f1f7-4dc8-883a-18bf811c7cfe",
    "Torrelavega_UAT":"1b5151b6-59c1-4a4a-829b-06fb9f7e4abe"
}

data = """<tsRequest>
    <credentials personalAccessTokenName="GSC_Token" personalAccessTokenSecret="nHxKWmiUTneSl5MWmXynfA==:reP8DcZH0rFrjgQg4NZVnkfpIvhzz0Ga">
        <site contentUrl=""/>
    </credentials>
</tsRequest>"""

request1 = s.post(url, data=data) if request1: soup = BeautifulSoup(request1.content) token = soup.select_one('tsresponse').select_one('credentials').get('token') else: raise ValueError('Tableau Auth API is failing') #launch the refresh url = f"https://eu-west-1a.online.tableau.com/api/3.22/sites/17eb3f27-6850-4932-a4b1-6e6019488f02/workbooks/{mapping[{site}]}/refresh" data = """ """ request2 = s.post(url, data=data, headers = {"X-Tableau-Auth":token}) if request2: print(request2.content) else: raise ValueError('Tableau refresh API failed')
Send status update to the WebApp
from robustify_rest_api.api import get_token, put_status
import dataiku
import time
time.sleep(10)
# authenticate
token = get_token()
# get data
outputs = dataiku.Dataset("queue_status").get_dataframe()
#convert into mapping
status = outputs.groupby('OptimizationId').Termination.apply(lambda x: "Completed - Optimal" if 'optimal' in x.unique() else
    "Completed - Infeasible" if 'infeasible' in x.unique() else
    "Failed").to_dict()
#send requests
for id_,st in status.items():
    put_status(id_,st,token, {site})
    print(id_, 'done')

7.6 - Orchestrator

Orchestrator project serves 2 purposes:

  • API Deployer: launch optimization scenario
  • Monitoring of the project

API Deployer

UPDATE 23/04/2024: API is finalized

Defines a function run_scenario that accepts a single argument: site.

import dataiku
import socket

class ConfigError(Exception):
    pass

def run_scenario(site, hostname):

    if site not in ('Devnya','Dombasle','Bernburg','Torrelavega','Rheinberg','Rosignano'):
        raise ConfigError(f'Site {site} is not mapped to a DSS config')
    
    #hostname = socket.gethostname()

    # Initialization
    # ----------------------------------------------------------------------------------------------------------------
    if hostname == "Design":  # api in prod_design, ex: "run test queries" directly from project
        # PROD DESIGN
        dataiku.set_remote_dss("http://ce-dataops-dss-design-ew1-prod:10000", "tWyyCXduOtrajhG8sVS1569dHzlc0ZcH")
    elif hostname == "Prod":  # api deployed from prod
        # PROD AUTOMATION
        dataiku.set_remote_dss("http://ce-dataops-dss-automation-ew1-prod:10000", "UH5GTQX8XOzXFUL4tVjydC5mNHtykbtS")
    elif hostname=='UAT':
        # UAT
        dataiku.set_remote_dss("http://ce-dataops-dss-automation-ci-ew1-prod:10000", "tYD7IhRziwgskbi1EgVYcN5CKh2oz2At")
    else:
        raise ConfigError(f"Hostname '{hostname}' is not mapped to a DSS config.")

    project_id = f'ROBUSTIFY_ENERGY_OPTIMIZATION_{site.upper()}_ECO'
    
    client = dataiku.api_client()
    project = client.get_project(project_id)
    scenario = project.get_scenario("OPTIMIZER")
    scenario.run()
    return "Completed"


Monitoring of the project UPD 13/01/2025: This functionality is removed to permit the move to prod of each site independently. 

For the monitoring purposes scenario results and metrics from status tables are imported from all 6 projects.

Scenario analysis allows to monitor the failure status in all 6 projects from the same dashboard. Also, it allows to analyze the temporal performance to identify the bottlenecks of the process.

Metrics review allows to see the results of the optimization. Statistical analysis of the optimization results provides information on the performance of the solver and the requests demanded by the users.



Responsible & contact points:

  • Alessandro Mainardi - Project Owner
  • Simon Bourguignon - Delivery Manager
  • Alba Carrero/ Gaetan Frenoy - Product Owner
  • Rui Ferraz - Project Manager