- Created by TRAJKOVIC-ext, Nikola, last modified by Eldiias DZHAMANKULOV on Jan 13, 2025
7.1 - Dataiku Project Architecture
Energy Optimization Product is iterative in its nature.
As a self-service product, it is designed and developed by the end-users.
Each iteration represents an optimizer for a certain industrial site of Solvay. Currently the following sites are covered:
Additionally for the orchestration and monitoring, additional Dataiku project is designed.

Each site has a similar structure, with slightly different requirements.
7.2 - Dataflow
The dataflow in each Dataiku project is the same:
- Price_forecast zone:
DPL.V_FACT_meteo_data data meteo data is resampledUPD 13/01/2025: After the Vendohm dataset removal, the meteo data was never replaced, hence the dataset is removed.- DPL.V_FACT_energy_price_forecast price forecast is imported
- DPL.V_FACT_energy_price_hourly energy price data is imported
- DPL.V_FACT_solid_fuel_wap solid fuel price data is resampled
- Energy price forecast is stored in dss-prod-design-sql Postgres database
- Optimization zone:
- DPL.V_FACT_{site}_queue optimization queue is synced into dss-prod-design-sql Postgres database
- Optimization is performed for every observation in the queue
- Processed observations and the output of optimization are synced into dedicated GCS bucket
- Data from GCS bucket is synced into BigQuery DS_APP_dataiku database
Since each site is different, the flow has some changes to accommodate the process of each site.
7.2.1 - Bernburg & Dombasle
Optimization in Bernburg and Dombasle is not using the meteo data yet.
In the Price_forecast no data from DPL.V_FACT_meteo_data is used. In the Optimization zone the price forecast data is reused to feed the optimization directly.


7.2.2 - Devnya
Optimization in Devnya is not taking any price changes into account.
The only zone existing in the flow is the Optimizaiton zone with a unique source of data - DPL.V_FACT_{site}_queue.
Optimiztion is performed using the WebApp input only.

7.2.3 - Rheinberg
Optimization in Rheinberg is not using the meteo data yet.
In the Price_forecast no data from DPL.V_FACT_meteo_data is used.

7.2.4 - Rosignano
The specificity of this project is that Vendohm data in Price_forecast zone contain only the gas prices, while in Optimization zone it contains electricity prices. These datasets are independent and connected to BQ separately.

7.2.5 - Torrelavega
The specificity of this project is that Vendohm data is synced into the Optimization zone as well to feed the Optimization.

7.3 - Data Preprocessing
7.3.1 Vendohm price
select date, {electricity_index}, {gas_index}
from prj-data-robustify-dev.DPL.V_FACT_energy_price_hourly
The following indices are used:
| site | electricity_index | gas_index |
|---|---|---|
| Bernburg | elec_DE_EPEX | gas_DE_THE |
| Dombasle | elec_FR_EPEX | gas_FR_PEG |
| Rheinberg | elec_DE_EPEX | gas_DE_THE |
| Rosignano | elec_IT_MGP, elec_IT_PUN | gas_IT_PSV |
| Torrelavega | --- | gas_TTF, gas_SP_PVB |
It is already preprocessed in Talend and BigQuery, so no additional processing is required in Dataiku.
7.3.2 Meteologica price forecast
SELECT
datetime_from as date,
price as elec_price
FROM
`prj-data-robustify-dev.DPL.V_FACT_energy_price_forecast`
where country={country}
ORDER BY
datetime_from desc;
It is already preprocessed in Talend and BigQuery, so no additional processing is required in Dataiku.
7.3.3 Solid fuel price
For each site (except for Devnya) the query is personalized to import only the required parameters.
SELECT
date,
{site}_metcoke as coke_price,
{site}_anthracite as anthracite_price
FROM `prj-data-robustify-dev.DPL.V_FACT_solid_fuel_wap`
order by date
It is partially preprocessed in Talend and BigQuery. Additional processing is performed:
- Drop complete duplicates
- Resample to get hourly data to ensure the correct granularity during join with energy data.
7.3.4 Meteo data UPD 13/01/2025: After the Vendohm dataset removal, the meteo data was never replaced, hence the dataset is removed.
This query could be simplified to extract only the columns used in the following processing.
SELECT
(case when raw_curve_id={index1} then "ambient_temperature" when raw_curve_id={index2} then "RH" end) as parameter, published_date, delivery_date, meteo_value FROM `prj-data-robustify-dev.DPL.V_FACT_meteo_data` where raw_curve_id in ({index1}, {index2})
The following indices are used:
This data is not processed, so the following transformations are applied:
Replace index ids by their namesFor each index select the latest available information for every datetimeCreate a pivot table
df2 = (df.groupby('parameter') .apply(lambda x: x.sort_values('published_date') .drop_duplicates(subset='delivery_date', keep='last')) .reset_index(level=0, drop=True)[['parameter', 'delivery_date', 'meteo_value']]) df3 = pd.pivot_table(df2, index='delivery_date', columns='parameter', values='meteo_value')
Finally using the Time Series Preparation plugin the dataset is resampled to Hourly data.
7.4 - Optimization
Optimization step is defined independently for each site due to the specificities of the energy system of the plant.
For each site the main optimization code is defined by Energy team.
The structure of the code in optimizer follows:
- Import libraries
- Import queue and additiional datasets
- Define the names_output
- Get the list of IDs in the queue
- Get the token
- Definition of the optimize_run function
- Update the status to 'Running' in the WebApp
- Select the data for the current run
- Get the fixed variables (from the Dataiku custom variables)
- Define the variables, constraints and optimization model
- Try to solve the problem. If solvable, then postprocess and store. Else, define a row with 0s except Period.
- Add OptimizationId, mode and User to the output
- Map the function to the queue. If any exception is occurred, define a row with 0s except Period.
- Change the column names according to the GCP schema
- Determine the status of each OptimizationId in the queue. If for any iteration the status is optimal, the status of the whole ID is optimal. Else infeasible or failed.
Push the status of each iteration to the WebApp- Store the optimization output and the statuses
Maintenance of the Python recipees is split:
- Steps 6-7 are maintained by Alba and DT Data Science team.
- Steps 1-5 and 8-11 are maintained by DataOps team.
7.5 - Scenarios
7.5.1 - Base Scenario
UPD 23/04/2024: This scenario is deprecated
Trigger
- Name: Dataset Updated on GCP
- Runs every: 120 seconds
- Code:
from dataiku.scenarios import Trigger
import dataiku
import pandas as pd
from google.oauth2 import service_account
from google.cloud import bigquery
client = dataiku.api.client()
current_project = client.get_project({project_id})
variables = current_project.get_variables()
credentials = variables['standard']['bigquery']
credentials = service_account.Credentials.from_service_account_info(credentials)
project_id = 'prj-data-robustify-dev'
bqclient = bigquery.Client(project=project_id, credentials=credentials)
query_job = bqclient.query(
"""
SELECT *
from `prj-data-robustify-dev.DPL.{table_name}`"""
)
query_response = query_job.result()
results = [dict(r) for r in query_response]
df = pd.DataFrame(results)
output = df.loc[0].map(str).to_dict()
t = Trigger()
if not (pd.Series(variables['standard']['last_update'])==df.loc[0].map(str)).min():
t.fire()
variables['standard']['last_update'] = output
current_project.set_variables(variables)
Reporters
- Email when failed
- Receivers:
- Eldiias Dzhamankulov
Steps
- Build optimization_queue_copy [Build just these items]
- Output [Build just these items]
- Clear jobs macros
- Clear scenario logs macros
7.5.2 - Energy prices
Devnya doesn't need this scenario.
Trigger
- Run daily at a given time (project dependent)
Reporters
- Email when failed
- Receivers:
- Rafael Truan Cacho
- Alba Carrero
- Eldiias Dzhamankulov
Steps
- Build resampled data [Force build dependencies]
- Build Energy dataset [Build required dependencies]
- Clear jobs macros
- Clear scenario logs macros
7.5.3 - Optimizer
UPD 23/04/2024: Steps are changed
Trigger
No triggers required, since it is launched from orchestrator
Reporters
- When scenario fails, API response fails as well. Therefore, no need of additional reporting.
Steps
- Build optimization_queue_copy [Build just these items]
- Build Output [Build just these items]
- Run scenario "Sync BD"
- Clear jobs macros
- Clear scenario logs macros
7.5.4 - Sync BQ
UPD 23/04/2024: Trigger and Steps are changed
UPD 23/02/2024: Disabled Trigger
- Dataset modification: Output
- Run every: 45 seconds
- Grace delay: 20 seconds
Since the scenario is launched from "Optimizer" scenario, no explicit trigger is reaquired.
Reporters
No need of reporting. No failures expected.
Steps
- Build GCS tables [Build just these items]
- Build BQ tables [Build just these items]
- Refresh Tableau [custom python function below]
- Send status update to the WebApp [custom python function below]
- Clear jobs macros
- Clear scenario logs macros
Refresh Tableau
import requests as r
from bs4 import BeautifulSoup
#authenticate yourself
s = r.Session()
url = "https://eu-west-1a.online.tableau.com/api/3.22/auth/signin"
mapping = {
"Devnia_prod":"53b9800f-4e93-4970-9829-2101b3f61c51",
"Dombasle_prod":"443443ca-394a-432c-bdc9-1f094655ba3c,",
"Torrelavega_prod":"da214835-658f-4bd2-bb3f-7d569488ecd1",
"Benburg_dev":"12ed7f34-03ff-44f8-a659-13bf38585353",
"Devnia_dev":"a823cd38-28b6-488d-92b7-20236f262043",
"Dombasle_dev":"7c3a9d3d-0944-4615-a53c-f2f8ee33090f",
"Rheinberg_dev":"b764d3b9-65eb-4525-899f-bb27bc45c8b0",
"Rosignano_dev":"be6aacbc-5a1d-434f-8ec7-0e6261f2a28e",
"Torrelavega_dev":"0be2b2b0-fcbd-4cd7-a4f9-129d8c30b643",
"Benburg_UAT":"023eedd2-ea51-4563-a0f4-b8993893312d",
"Devnia_UAT":"c143af21-e53a-4b51-aaed-dcda78ef204d",
"Dombasle_UAT":"727310be-8ba3-4284-b244-ce55a626015d",
"Rheinberg_UAT":"094ff065-3569-4ed6-8643-3d82c1f0b31a",
"Rosignano_UAT":"83a5876f-f1f7-4dc8-883a-18bf811c7cfe",
"Torrelavega_UAT":"1b5151b6-59c1-4a4a-829b-06fb9f7e4abe"
}
data = """<tsRequest>
<credentials personalAccessTokenName="GSC_Token" personalAccessTokenSecret="nHxKWmiUTneSl5MWmXynfA==:reP8DcZH0rFrjgQg4NZVnkfpIvhzz0Ga">
<site contentUrl=""/>
</credentials>
</tsRequest>"""
request1 = s.post(url, data=data)
if request1:
soup = BeautifulSoup(request1.content)
token = soup.select_one('tsresponse').select_one('credentials').get('token')
else:
raise ValueError('Tableau Auth API is failing')
#launch the refresh
url = f"https://eu-west-1a.online.tableau.com/api/3.22/sites/17eb3f27-6850-4932-a4b1-6e6019488f02/workbooks/{mapping[{site}]}/refresh"
data = """
"""
request2 = s.post(url, data=data, headers = {"X-Tableau-Auth":token})
if request2:
print(request2.content)
else:
raise ValueError('Tableau refresh API failed')
Send status update to the WebApp
from robustify_rest_api.api import get_token, put_status
import dataiku
import time
time.sleep(10)
# authenticate
token = get_token()
# get data
outputs = dataiku.Dataset("queue_status").get_dataframe()
#convert into mapping
status = outputs.groupby('OptimizationId').Termination.apply(lambda x: "Completed - Optimal" if 'optimal' in x.unique() else
"Completed - Infeasible" if 'infeasible' in x.unique() else
"Failed").to_dict()
#send requests
for id_,st in status.items():
put_status(id_,st,token, {site})
print(id_, 'done')
7.6 - Orchestrator
Orchestrator project serves 2 purposes:
- API Deployer: launch optimization scenario
- Monitoring of the project
API Deployer
UPDATE 23/04/2024: API is finalized
Defines a function run_scenario that accepts a single argument: site.
import dataiku
import socket
class ConfigError(Exception):
pass
def run_scenario(site, hostname):
if site not in ('Devnya','Dombasle','Bernburg','Torrelavega','Rheinberg','Rosignano'):
raise ConfigError(f'Site {site} is not mapped to a DSS config')
#hostname = socket.gethostname()
# Initialization
# ----------------------------------------------------------------------------------------------------------------
if hostname == "Design": # api in prod_design, ex: "run test queries" directly from project
# PROD DESIGN
dataiku.set_remote_dss("http://ce-dataops-dss-design-ew1-prod:10000", "tWyyCXduOtrajhG8sVS1569dHzlc0ZcH")
elif hostname == "Prod": # api deployed from prod
# PROD AUTOMATION
dataiku.set_remote_dss("http://ce-dataops-dss-automation-ew1-prod:10000", "UH5GTQX8XOzXFUL4tVjydC5mNHtykbtS")
elif hostname=='UAT':
# UAT
dataiku.set_remote_dss("http://ce-dataops-dss-automation-ci-ew1-prod:10000", "tYD7IhRziwgskbi1EgVYcN5CKh2oz2At")
else:
raise ConfigError(f"Hostname '{hostname}' is not mapped to a DSS config.")
project_id = f'ROBUSTIFY_ENERGY_OPTIMIZATION_{site.upper()}_ECO'
client = dataiku.api_client()
project = client.get_project(project_id)
scenario = project.get_scenario("OPTIMIZER")
scenario.run()
return "Completed"
Monitoring of the project UPD 13/01/2025: This functionality is removed to permit the move to prod of each site independently.
For the monitoring purposes scenario results and metrics from status tables are imported from all 6 projects.
Scenario analysis allows to monitor the failure status in all 6 projects from the same dashboard. Also, it allows to analyze the temporal performance to identify the bottlenecks of the process.

Metrics review allows to see the results of the optimization. Statistical analysis of the optimization results provides information on the performance of the solver and the requests demanded by the users.


Responsible & contact points:
- Alessandro Mainardi - Project Owner
- Simon Bourguignon - Delivery Manager
- Alba Carrero/ Gaetan Frenoy - Product Owner
- Rui Ferraz - Project Manager
- No labels