Description
This project has the data from SAP on QM, material document and Vendor master data tables loading into Industrial data ocean for all data in PF1 basing on the create and modification date. Then, the view on top will filter only Soda Ash data.
Note: Soda ash data has only in PF1, therefore, this project will load only from PF1. (It is not include WP1)
Tools: Talend
1. Detail job
All the jobs to extract data from SAP PF1 will be the same concept as below. The different of each job is the table to extract and the schema of each table
- Connect to the SAP system by reading context from flow job
- Setup loop to get the data
- tSetGlobalVar : to set the maximum number of records to read each time and set the variable nb to check when to exit the loop (start with 0)
- tLoop : setup the condition to exit the loop when variable nb < 0
- tJava: setup the offset of records in order to get new records of each loop
- To get data from the source by using start row number from "nb" and max row number from "limit". It read schema from the source(meta data)
- To replace invalid character such as | ; \n . Checking this on the Logging session. If we have KO records, we need to add field into the tReplace
- Generate output file and save to DATA\DEV\DATA_OCEAN_DOMAIN_INDUSTRIAL\Tmp\SAP\[table name]
- Update the offset number "nb" = "nb" + "limit"
- Update "nb" = -1 when ((Integer)globalMap.get("tReplace_1_NB_LINE"))<= 0 in order to exit the loop
- Upload the files all the folder(point number 5 to GCS to cs-ew1-prj-data-dm-industrial-[env]-staging/SPF_IND_0000_0000_F001_I_H_[TableName]
- Delete all the files in the folder (point number 5)
2. Flow job
It is control the filename, target staging and ods tables
- Setup meta_run_id and filename of the output file
- Get the last load from table STG.incremetnal_load, control by the variable l_VAR_eBatch_PF1_QAPP_INC_LOAD and configuration the logic of the incremental load in tJava to use the date from incremental_load to the field of create or change date in the SAP
- Call the detail job and pass parameters such as user/password, query from point number 2 to do the incremental load and save the file to GCS
- Call the standard job to upload the files from GCS to ODS
- If the loading is OK and parameter l_VAR_eBatch_PF1_[table_name]_additional_filter = incremental, update the time on the table incremental_load. If the value is not incremental, it is the reloading(check on point 1.3)
- If everything is OK, update the log.
Context Variable
- l_VAR_PF1_[table]_BACKET = bucket name
- l_VAR_PF1_[table]_BQ_Table_ODS = ods table name
- l_VAR_PF1_[table]_BQ_Table_STG = stg table name
- l_VAR_PF1_[table]_INC_LOAD = meta_file_name of the table incremental_loading table normally it is "PF1_[table]"
- l_VAR_PF1_[table]_additional_filter = "incremental" when loading by checking incremental from table incremental_loading table. It can change to any reloading such as LAEDA > '20240101'
- l_VAR_PF1_[table]_email_recipient = recipient of the email eco-dataops@solvay.com
- l_VAR_PF1_[table]_email_flag = yes when want to send email to inform when incremental load has issue (max meta_stg_insert_date from STG < date in incremental_loading table), else the email will not send
3. Access rights
To access to SAP, the Talend user RFC_TAL_PF1 is required
4. Source
5. Format
Destination
Location
The data will keep in
Bucket = cs-ew1-prj-data-dm-industrial-[dev]-staging and it will keep in each folder for each table
DataOean GCP = prj-data-dm-industrial-[env]
Product GCP = prj-data-sad-ebatch-[env]
To save the data into GCP Industrial Data Ocean, service account is required
Format
Same as the source
Sizing
Expected data volume for full load (as of May 2024)
Table
Table Name
Size
LFA1
Vendor Master (General Section)
229,553
QPAC
Inspection catalog codes for selected sets
2,314
QPAM
Inspection catalog selected sets
741
QPCD
Inspection catalog codes
26,828
QPCT
Code texts
228,149
The rest will be incremental hourly
Assessment
How to validate that the generated output is valid: Compare with table in PF1
Loading
1. Incremental Load
The main job is F100_SPF_IND_QM_Main to run all the incremental load, which include the loading of table MCH1, MCHA, QALS, AFVC, QAMR, QAMV, QAVE, MSEG, PLPO in sequence in order to limit the number of background job in SAP.
| Table | Job | STG | ODS | Incremental load by | |
|---|---|---|---|---|---|
MCH1 | F001_SPF_F001_I_H_MCH1_TO_BQ | STG_SPF_0000_0000_F001_I_H_mch1 | ODS_SPF_0000_F001_I_H_mch1 | ERSDA - Create date | LAEDA - Modify date |
MCHA | F001_SPF_F001_I_H_MCHA_TO_BQ | STG_SPF_0000_0000_F001_I_H_mcha | ODS_SPF_0000_F001_I_H_mcha | ERSDA- Create date | LAEDA - Modify date |
QALS | F001_SPF_F001_I_H_QALS_TO_BQ | STG_SPF_0000_0000_F001_I_H_qals | ODS_SPF_0000_F001_I_H_qals | ERSTELDAT- Create date | AENDERDAT - Modify date |
AFVC | F001_SPF_F001_I_H_AFVC_TO_BQ | STG_SPF_0000_0000_F001_I_H_afvc | ODS_SPF_0000_F001_I_H_afvc | Get list of AUFPL from last load of QALS | |
QAMR | F001_SPF_F001_I_H_QAMR_TO_BQ | STG_SPF_0000_0000_F001_I_H_qamr | ODS_SPF_0000_F001_I_H_qamr | ERSTELLDAT- Create date | AENDERDAT - Modify date |
QAMV | F001_SPF_F001_I_H_QAMV_TO_BQ | STG_SPF_0000_0000_F001_I_H_qamv | ODS_SPF_0000_F001_I_H_qamv | ERSTELLDAT- Create date | AENDERDAT - Modify date |
QAVE | F001_SPF_F001_I_H_QAVE_TO_BQ | STG_SPF_0000_0000_F001_I_H_qave | ODS_SPF_0000_F001_I_H_qave | VDATUM- Create date | VAEDATUM - Modify date |
MSEG | F001_SPF_F001_I_H_MSEG_TO_BQ | STG_SPF_0000_0000_F001_I_H_mseg | ODS_SPF_0000_F001_I_H_mseg | CPUDT_MKPF- Create date | /BEV2/ED_AEDAT - Modify date |
| QASR | F001_SPF_F001_I_H_QASR_TO_BQ | STG_SPF_0000_0000_F001_I_H_qasr | ODS_SPF_0000_F001_I_H_qasr | ERSTELLDAT- Create date | AENDERDAT - Modify date |
| QAPP | F001_SPF_F001_I_H_QAPP_TO_BQ | STG_SPF_0000_0000_F001_I_H_qapp | ODS_SPF_0000_F001_I_H_qapp | ERSTELDAT- Create date | AENDERDAT - Modify date |
AUFK | F001_SPF_F001_I_H_AUFK_TO_BQ | STG_SPF_0000_0000_F001_I_H_aufk | ODS_SPF_0000_F001_I_H_aufk | ERDAT - Create date | AEDAT - Change date |
CKMLMV001 | F001_SPF_F001_I_H_CKMLMV001_TO_BQ | STG_SPF_0000_0000_F001_I_H_CKMLMV001 | ODS_SPF_0000_F001_I_H_CKMLMV001 | ERDAT - Create date | |
The job AFVC is required to run after QALS because the job will select the list of AUFPL (Routing number of operations in the order) from last load of QALS (max meta_business_date). In case of reload, it will rely on QALS table as well.
2. Full Load
Job F101_SPF_IND_QM_Main_Full will manage the full load job.
Table | Job | STG | ODS |
|---|---|---|---|
LFA1 | F001_SPF_F001_F_D_LFA1_TO_BQ | STG_SPF_0000_0000_F001_F_H_lfa1 | ODS_SPF_0000_F001_F_H_lfa1 |
QPAC | F001_SPF_F001_F_D_QPAC_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpac | ODS_SPF_0000_F001_F_H_qpac |
QPAM | F001_SPF_F001_F_D_QPAM_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpam | ODS_SPF_0000_F001_F_H_qpam |
QPCD | F001_SPF_F001_F_D_QPCD_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpcd | ODS_SPF_0000_F001_F_H_qpcd |
QPCT | F001_SPF_F001_F_D_QPCD_TO_BQ | STG_SPF_0000_0000_F001_F_H_qpct | ODS_SPF_0000_F001_F_H_qpct |
3. Reloading data
All the job will have context parameter l_VAR_eBatch_PF1_[TableName]_additional_filter to change the selection when extract the SAP. For incremental load, this context MUST BE "incremental". If it is blank, it will get data from 2023.
Note: the incremental will check only date not the time.
Example of context reload
l_VAR_eBatch_PF1_AFVC_additional_filter = AUFPL >= '1007995974' and AUFPL <= '1009027526'
l_VAR_eBatch_PF1_MCH1_additional_filter = ERSDA > '20230101'
4. Scheduling
PL_INDUS_EBATCH_SPF_QM_INC_LOAD (incremental load) run every 1 hour at xx:10 everyday include weekend.
PL_INDUS_EBATCH_SPF_QM_FULL_LOAD (full load) run every workday at 08:00 AM CET (daily)
5. Timing
The average time expected for :
- full process - 7 - 15 min
- incremental process : 5 - 10 min
Criticality
High
Logging
Industrial
1. Table log: This sql will get the status log that have job name from source system SPF (PF1)
select job.job_name , job.meta_start_date , logs.meta_run_id , logs.meta_source_system , logs.meta_step , logs.meta_status , logs.meta_num_lines , logs.meta_error_lines from STG.log_tables logs join STG.run_jobs job on logs.meta_run_id = job.meta_run_id
where logs.meta_run_id in ( SELECT meta_run_id FROM STG.run_jobs order by meta_start_date desc limit 100 )
and job_name like '%SPF%'
and meta_start_date > DATE_SUB ( CURRENT_TIMESTAMP () , INTERVAL 1 DAY )
order by job.meta_start_date desc2. Incremental table: Every time that the job loading complete, max time from staging table will update this table. Therefore, it is easier to know which table is not updated.
Table: STG.incremental_loading





