High Level Project Architecture
Here is a the link of the data architecture schema.
Architecture - High Level Design (HLD)
Link.
Architecture - Low Level Design (LLD)
Link.
Architecture Data Flow
DataPrep Flow
Schema showing the different STEPS of the application flow - with the data involved at each step
Steps descriptions
Describe the data and process involved at each step
SAP PF1
Description
This project has the data from SAP on QM, material document and Vendor master data tables loading into Industrial data ocean for all data in PF1 basing on the create and modification date. Then, the view on top will filter only Soda Ash data.
Note: Soda ash data has only in PF1, therefore, this project will load only from PF1. (It is not include WP1)
Tools: Talend
1. Loading
1.1 Incremental Load
The main job is F100_SPF_IND_QM_Main to run all the incremental load, which include the loading of table MCH1, MCHA, QALS, AFVC, QAMR, QAMV, QAVE, MSEG, PLPO in sequence in order to limit the number of background job in SAP.
| Table | Job | STG | ODS | Incremental load by | |
|---|---|---|---|---|---|
MCH1 | F001_SPF_F001_I_H_MCH1_TO_BQ | STG_SPF_0000_0000_F001_I_H_mch1 | ODS_SPF_0000_F001_I_H_mch1 | ERSDA - Create date | LAEDA - Modify date |
MCHA | F001_SPF_F001_I_H_MCHA_TO_BQ | STG_SPF_0000_0000_F001_I_H_mcha | ODS_SPF_0000_F001_I_H_mcha | ERSDA- Create date | LAEDA - Modify date |
MSEG | F001_SPF_F001_I_H_MSEG_TO_BQ | STG_SPF_0000_0000_F001_I_H_mseg | ODS_SPF_0000_F001_I_H_mseg | CPUDT_MKPF- Create date | /BEV2/ED_AEDAT - Modify date |
QALS | F001_SPF_F001_I_H_QALS_TO_BQ | STG_SPF_0000_0000_F001_I_H_qals | ODS_SPF_0000_F001_I_H_qals | ERSTELDAT- Create date | AENDERDAT - Modify date |
QAMR | F001_SPF_F001_I_H_QAMR_TO_BQ | STG_SPF_0000_0000_F001_I_H_qamr | ODS_SPF_0000_F001_I_H_qamr | ERSTELLDAT- Create date | AENDERDAT - Modify date |
QAMV | F001_SPF_F001_I_H_QAMV_TO_BQ | STG_SPF_0000_0000_F001_I_H_qamv | ODS_SPF_0000_F001_I_H_qamv | ERSTELLDAT- Create date | AENDERDAT - Modify date |
| QAPP | F001_SPF_F001_I_H_QAPP_TO_BQ | STG_SPF_0000_0000_F001_I_H_qapp | ODS_SPF_0000_F001_I_H_qapp | ERSTELDAT- Create date | AENDERDAT - Modify date |
| QASR | F001_SPF_F001_I_H_QASR_TO_BQ | STG_SPF_0000_0000_F001_I_H_qasr | ODS_SPF_0000_F001_I_H_qasr | ERSTELLDAT- Create date | AENDERDAT - Modify date |
QAVE | F001_SPF_F001_I_H_QAVE_TO_BQ | STG_SPF_0000_0000_F001_I_H_qave | ODS_SPF_0000_F001_I_H_qave | VDATUM- Create date | VAEDATUM - Modify date |
AFVC | F001_SPF_F001_I_H_AFVC_TO_BQ | STG_SPF_0000_0000_F001_I_H_afvc | ODS_SPF_0000_F001_I_H_afvc | Get list of AUFPL from last load of QALS | |
The job AFVC is required to run after QALS because the job will select the list of AUFPL (Routing number of operations in the order) from last load of QALS (max meta_business_date). In case of reload, it will rely on QALS table as well.
1.2 Full Load
Job F101_SPF_IND_QM_Main_Full will manage the full load job.
| Table | Job | STG | ODS |
|---|---|---|---|
LFA1 | F001_SPF_F001_F_D_LFA1_TO_BQ | STG_SPF_0000_0000_F001_F_H_lfa1 | ODS_SPF_0000_F001_F_H_lfa1 |
QPAC | F001_SPF_F001_F_D_QPAC_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpac | ODS_SPF_0000_F001_F_H_qpac |
QPAM | F001_SPF_F001_F_D_QPAM_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpam | ODS_SPF_0000_F001_F_H_qpam |
QPCD | F001_SPF_F001_F_D_QPCD_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpcd | ODS_SPF_0000_F001_F_H_qpcd |
QPCT | F001_SPF_F001_F_D_QPCD_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpct | ODS_SPF_0000_F001_F_H_qpct |
1.3. Reloading data
All the job will have context parameter l_VAR_eBatch_PF1_[TableName]_additional_filter to change the selection when extract the SAP. For incremental load, this context MUST BE "incremental". If it is blank, it will get data from 2023.
Note: the incremental will check only date not the time.
Example of context reload
l_VAR_eBatch_PF1_AFVC_additional_filter = AUFPL >= '1007995974' and AUFPL <= '1009027526'
l_VAR_eBatch_PF1_MCH1_additional_filter = ERSDA > '20230101'
2. Talend job
2.1 Detail job
All the jobs to extract data from SAP PF1 will be the same concept as below. The different of each job is the table to extract and the schema of each table
- Connect to the SAP system by reading context from flow job
- Setup loop to get the data
- tSetGlobalVar : to set the maximum number of records to read each time and set the variable nb to check when to exit the loop (start with 0)
- tLoop : setup the condition to exit the loop when variable nb < 0
- tJava: setup the offset of records in order to get new records of each loop
- To get data from the source by using start row number from "nb" and max row number from "limit". It read schema from the source(meta data)
- To replace invalid character such as | ; \n . Checking this on the Logging session. If we have KO records, we need to add field into the tReplace
- Generate output file and save to DATA\DEV\DATA_OCEAN_DOMAIN_INDUSTRIAL\Tmp\SAP\[table name]
- Update the offset number "nb" = "nb" + "limit"
- Update "nb" = -1 when ((Integer)globalMap.get("tReplace_1_NB_LINE"))<= 0 in order to exit the loop
- Upload the files all the folder(point number 5 to GCS to cs-ew1-prj-data-dm-industrial-[env]-staging/SPF_IND_0000_0000_F001_I_H_[TableName]
- Delete all the files in the folder (point number 5)
2.2 Flow job
It is control the filename, target staging and ods tables
- Setup meta_run_id and filename of the output file
- Get the last load from table STG.incremetnal_load, control by the variable l_VAR_eBatch_PF1_QAPP_INC_LOAD and configuration the logic of the incremental load in tJava to use the date from incremental_load to the field of create or change date in the SAP
- Call the detail job and pass parameters such as user/password, query from point number 2 to do the incremental load and save the file to GCS
- Call the standard job to upload the files from GCS to ODS
- If the loading is OK and parameter l_VAR_eBatch_PF1_[table_name]_additional_filter = incremental, update the time on the table incremental_load. If the value is not incremental, it is the reloading(check on point 1.3)
- If everything is OK, update the log.
Access rights
To access to SAP, the Talend user RFC_TAL_PF1 is required
Source
pf1nonha.eua.solvay.com
Format
Destination
Location
The data will keep in
Bucket = cs-ew1-prj-data-dm-industrial-[dev]-staging and it will keep in each folder for each table
DataOean GCP = prj-data-dm-industrial-[env]
Product GCP = prj-data-sad-ebatch-[env]
To save the data into GCP Industrial Data Ocean, service account is required
Format
Same as the source
Sizing
Expected data volume for full load (as of May 2024)
Table
Table Name
Size
LFA1
Vendor Master (General Section)
229,553
QPAC
Inspection catalog codes for selected sets
2,314
QPAM
Inspection catalog selected sets
741
QPCD
Inspection catalog codes
26,828
QPCT
Code texts
228,149
The rest will be incremental hourly
Assessment
How to validate that the generated output is valid: Compare with table in PF1
Scheduling
PL_INDUS_EBATCH_SPF_QM_INC_LOAD (incremental load) run every 1 hour weekday.
PL_INDUS_EBATCH_SPF_QM_FULL_LOAD (full load) run every workday at 08:00 AM CET (daily)
Timing
The average time expected for :
- full process - 7 - 15 min
- incremental process : 5 - 10 min
Criticality
High
Logging
Industrial
1. Table log: This sql will get the status log that have job name from source system SPF (PF1)
select job.job_name, job.meta_start_date, logs.meta_run_id, logs.meta_source_system, logs.meta_step, logs.meta_status, logs.meta_num_lines, logs.meta_error_lines from STG.log_tables logs join STG.run_jobs job on logs.meta_run_id = job.meta_run_id
where logs.meta_run_id in (SELECT meta_run_id FROM STG.run_jobs order by meta_start_date desc limit 100)
and job_name like '%SPF%'
and meta_start_date > DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
order by job.meta_start_date desc2. Incremental table: Every time that the job loading complete, max time from staging table will update this table. Therefore, it is easier to know which table is not updated.
Table: STG.incremental_loading
SAP BW HR
Description:
BW HR query is used to determine the user authorization basing on the user profile on site. The source is query DO_BW_QRY_CPHRPANHR_0001
Tool: Xtract
Xtract server = ACEW1DXTRAXUS01
Xtract job TALEND_PROD_DO_BW_QRY_CPHRPANHR_0001. There is only 1 parameter, which is YYYYMM_Start=202403&YYYYMM_End=202403. It is required to enter start and end
Selected all the available fields on the query and generate the output to \\ACEW1DXTRAXUS01\extractions\TALEND_PROD_DO_BW_QRY_CPHRPANHR_0001.csv. It will trigger to generate file by Talend job
Tool: Talend
1. Loading
1.1 Full Load
| Job | STG | ODS | Comment |
|---|---|---|---|
| F028_BW_QRY_CPHRPANHR_0001_to_ODS | STG_BWH_0000_0000_F001_F_D_qry_cphrpanhr_0001 | ODS_BWH_0000_F001_F_D_qry_cphrpanhr_0001 |
1.2. Reloading data
The job will have context parameter l_VAR_XTRACT_PARA_CPHRPANHR to change the selection of the BW query. Normally the value should be "currentmonth" in order to load only the current month. However, if the data from other months need to be reload, we can enter the context to be this context MUST BE "&YYYYMM_Start=202403&YYYYMM_End=202403" In this example, it will load only Mar 2024. If it is blank, it will get all the data in BW, which may cause the job error with memory.
2. Talend Jobs
2.1 Detail job
- Call Xtract URL (tHttpRequest) in order to trigger the Xtract to generate file and pass parameter l_VAR_XTRACT_JOB from the flow job, which select only the current month
- Get the file from Xtract server
- Replace the column in case there is invalid character
- Generate the output file to \DATA_OCEAN_DOMAIN_HR\InOut
- Upload to GCS to cs-ew1-prj-data-dm-hr-[dev]-staging
- Delete the output file from local
2.2 Flow job
- Setup output file name
- Setup execute id, system id, start time
- Calculate the incremental load to get the current month by parameter l_VAR_XTRACT_PARA_CPHRPANHR. If it has value "currentmonth", it will get the current month of today. If not, it will use the value of this parameter to Xtract. The example of the parameter is "&YYYYMM_Start=202402&YYYYMM_End=202402". It will reload only for Feb 2024
- Generate meta_run_id
- Call detail job
- Load the data from GCS to ODS
- Update the log when the job is done.
Access rights
To access to BW, the Talend will connect to Xtract and Xtract require authorization of user RFC_TAL_WBP.
Most of the case for HR query, it is required to contact authorization team to add the authorization. In this case, role ZH_PA_ROBOT is modified in order to add the new query.
Source
BW query DO_BW_QRY_CPHRPANHR_0001
Format
Destination
Location
The data will keep in
Bucket = cs-ew1-prj-data-dm-hr-[dev]-staging
DataOean GCP = prj-data-dm-hr-dev
Product GCP = prj-data-sad-ebatch-ppd
To save the data into GCP Industrial Data Ocean, service account is required
Format
Same as the source
Sizing
Current month should have around 30,000 records
Assessment
How to validate that the generated output is valid: Compare with BW query DO_BW_QRY_CPHRPANHR_0001 and drill down all characteristics same as Data Dict
Scheduling
PL_HR_EBATCH_BWH_CPHRPANHR_0001 run every Monday at 08:00 AM CET (weekly)
Timing
take time around 2 - 5 minutes loading.
Criticality
High / Medium / Low ??
Logging
select job.job_name, job.meta_start_date, logs.meta_run_id, logs.meta_source_system, logs.meta_step, logs.meta_status, logs.meta_num_lines, logs.meta_error_lines from STG.log_tables logs join STG.run_jobs job on logs.meta_run_id = job.meta_run_id
where logs.meta_run_id in (SELECT meta_run_id FROM STG.run_jobs order by meta_start_date desc limit 100)
and meta_source_system like '%BWH%CPHRPANHR%'
and meta_start_date > DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
order by job.meta_start_date desc
PI Startrek
Description: it is a API connection to get manufacturing data. It is required authenticate from windows user, which Talend does not have the function to do. Therefore, it is required Python to run.
The python environment is setup in folder \data\[Env]\DATA_OCEAN_DOMAIN_INDUSTRIAL\Python
There are 3 main API that we use in the eBatch project
1. Event Frame with calculation
Option:
&starttime=*-24h (last 24 hours)
&startime=%272024-05-19%2010:40:00%27 (specific start time)
It will list all event frames in the selected time.
1.2 Each event frame is required to get the list of Attributes in the link of attribute. We can specific only the fields value that we need by using parameter "selectedFields"
We will get the value directly from Batch_ID, Weight and Silo by getting from Value links (point 1.4)
Sometime we will get error batch id but we keep it as the same as source
1.3 The rest of the attribute, which is not starting with Granulom and not equals to Batch_ID, Weight and Silo are required to access to Attribute link in order to get sub attribute (Avg, Low_limit, Upper_limit, Max, Mean, Min, Std)
1.4 Get the value from the Value link
2. Event Frame with value in time series - Granulom
The same steps as Event Frame with calculation on point 1 and 2 but the Attribute will be get only the name starting with "Granulom". Then instead of get the sub attribute, it is required to get the values during the event frame in "RecordedData" link
3. Tags data
3.1 Find the "Web ID" by using this URL: https://pivision.eua.solvay.com/piwebapi/points/search?dataServerWebId=F1DSUZOarN2bd0yKBpfAHpVUXAQUNFVzFQU1RFS1BEQTAx&query=TOR.BIR.SILO.SILO.SILO.CB_M_BIR_Calcio_BIR_2&selectedFields=Items.WebId
The red text is the tag name in this file List of Tag (column N). For example, in this project for Torrelavega, we will get only 5 tags, which control by variable l_VAR_eBatch_tag_TOR_tag
- TOR.BIR.SILO.SILO.SILO.SY_L2_PPM_CA_BIR = F1DPUZOarN2bd0yKBpfAHpVUXAraMBAAQUNFVzFQU1RFS1BEQTAxXFRPUi5CSVIuU0lMTy5TSUxPLlNJTE8uU1lfTDJfUFBNX0NBX0JJUg
- TOR.BIR.SILO.SILO.SILO.SY_L2_COL_L = F1DPUZOarN2bd0yKBpfAHpVUXArqMBAAQUNFVzFQU1RFS1BEQTAxXFRPUi5CSVIuU0lMTy5TSUxPLlNJTE8uU1lfTDJfQ09MX0w
- TOR.BIR.SILO.SILO.SILO.CB_M_BIR_Colorimetría_a = F1DPUZOarN2bd0yKBpfAHpVUXAyKMBAAQUNFVzFQU1RFS1BEQTAxXFRPUi5CSVIuU0lMTy5TSUxPLlNJTE8uQ0JfTV9CSVJfQ09MT1JJTUVUUsONQV9B
- TOR.BIR.SILO.SILO.SILO.CB_M_BIR_Colorimetría_b = F1DPUZOarN2bd0yKBpfAHpVUXAyaMBAAQUNFVzFQU1RFS1BEQTAxXFRPUi5CSVIuU0lMTy5TSUxPLlNJTE8uQ0JfTV9CSVJfQ09MT1JJTUVUUsONQV9C
- TOR.BIR.SILO.SILO.SILO.CB_M_BIR_Colorimetría_L = F1DPUZOarN2bd0yKBpfAHpVUXAyqMBAAQUNFVzFQU1RFS1BEQTAxXFRPUi5CSVIuU0lMTy5TSUxPLlNJTE8uQ0JfTV9CSVJfQ09MT1JJTUVUUsONQV9M
Example output of the link will be
3.2 Get the value by using this URL: https://pivision.eua.solvay.com/piwebapi/streams/[WebId]/recorded?starttime=2024-06-07%2000:26:52
Python
Example file of python file
- It is the main URL to call in order to get another url
- Output from run 1 on web (required authorization to get the result)
- Selected link will be called to get the result
- Call the selected link URL + the new parameter on
- Example of the full URL
- It is the result of the second call. The field can be select by parameter "selectedFields=Items.Timestamp;Items.Value"
Talend
1. Job Detail
- tFileDelete: Delete the log file from the python name DATA\[env]\IND\IND_EBATCH\py_log.txt. All the jobs use the same file name and it will delete before.
- tJava: To generate .py file in DATA\[env]\IND\IND_EBATCH\pyCodeyyyyMMddHHmmss.py by reading input parameter from the Flow job and the RSD parameter. Example of python code
- tSystem: To run the py file.
- The output of the screen will be kept in DATA\[env]\IND\IND_EBATCH\py_log.txt.
- The extraction will be generated in DATA\[env]\IND\IND_EBATCH\Output\[API interface]\file.csv
- tGSPut: To upload csv file to GCS prj-data-sad-ebatch-[env]/cs-ew1-prj-data-sad-ebatch-[env]-staging/[API interface]
- tFileDelete: Delete csv files
- tFileDelete: Delete py file
2. Incremental Load (Flow job)
2.1. Get the last load from table STG.incremental_loading
2.2. Run the job detail that generate py file and run the python to get the output
2.3. Standard job to load to STG and ODS
2.4. Update the last load from the event frame start date and tag timestamp to the STG.incremental_loading
Remark: PI_TOR_TAG has many tags. Therefore, it will get the minimum date of the max loading date.
3. Reloading data
It is flexible to do the reload by selected starttime on table STG.incremental_load
Access rights
To access to PI, it is required david.genet@solvay.com to grant the authorization to access the pi (window account - SVC_TALEND_ADMIN_DEV ) first in order to run the python (authenticate by window account). If promote, it is required to ask authorization.
Source
https://pivision-dev.eua.solvay.com/piwebapi/
Format
Json web API
Destination
Location
Bucket = cs-ew1-prj-data-sad-ebatch-[env]-staging and it will keep in each folder for each table
DataOean GCP = N/A
Product GCP = prj-data-sad-ebatch-ppd
Plan job is PL_EBATCH_PI_TO_ODS
Main job is F100_PI_TOR_TO_BQ
Detail Job STG ODS F001_PI_TOR_EFRAME_TO_BQ STG_EXT_TOR_0000_F001_I_H_pi_eframe ODS_EXT_TOR_F001_I_H_pi_eframe F002_PI_TOR_EFRAME_GRANULOM_TO_BQ STG_EXT_TOR_0000_F001_I_H_pi_eframe_granulom ODS_EXT_TOR_F001_I_H_pi_eframe_granulom F010_PI_TOR_TAG_TO_BQ STG_EXT_TOR_0000_F001_I_H_pi_tag ODS_EXT_TOR_F001_I_H_pi_tag
Format
Event Frame with calculation
Event Frame with value in time series - Granulom
Tags data
Sizing
Less than 100 records / hour
Assessment
How to validate that the generated output is valid: Compare with the output from the web API
- EventFrame(calculation and Granulom) =
https://pivision.eua.solvay.com/piwebapi/assetdatabases/F1RD0WI4NHGJw0uG4fwriJz93welqtozWz30OpJS7GtHMFHgQUNFVzFQU1RFS1BBRjAzXEdCVSBTT0RBIC0gQUxMIFBMQU5UUw/eventframes?templateName=Batch_Record_Pharma&[start time]
starttime='2024-04-19 19:40:00'
Tags Data =
https://pivision.eua.solvay.com/piwebapi/streams/[web id ]/recorded?[start time]
web id = tags data
starttime='2024-06-07 00:26:52'
Scheduling
PL_EBATCH_PI_TO_ODS run every hour on Mon - Fri
Timing
1 event frame will take around 2 - 4 min
For tags, it is very quick
Criticality
High
Logging
1. Monitor number of records within a day
select job.job_name, job.meta_start_date, logs.meta_run_id, logs.meta_source_system, logs.meta_step, logs.meta_status, logs.meta_num_lines, logs.meta_error_lines from STG.log_tables logs join STG.run_jobs job on logs.meta_run_id = job.meta_run_id
where logs.meta_run_id in (SELECT meta_run_id FROM STG.run_jobs order by meta_start_date desc limit 100)
and job_name like '%PI%'
and meta_step = 'Bucket to Staging'
and meta_start_date > DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
order by job.meta_start_date desc
We should not have 0 record. If it is 0 normally, it mean that python is error. (if python is error, the log will be OK in Talend log)
2. Monitor last update date. The result should be the same current date.
SELECT * FROM STG.incremental_loading
where meta_file_name like '%PI%'
order by meta_file_name


















