Description

This project has the data from SAP on QM, material document and Vendor master data tables loading into Industrial data ocean for all data in PF1 basing on the create and modification date. Then, the view on top will filter only Soda Ash data.

Note: Soda ash data has only in PF1, therefore, this project will load only from PF1. (It is not include WP1)

Tools: Talend

1. Detail job

All the jobs to extract data from SAP PF1 will be the same concept as below.  The different of each job is the table to extract and the schema of each table

  1. Connect to the SAP system by reading context from flow job
  2. Setup loop to get the data
    1. tSetGlobalVar : to set the maximum number of records to read each time and set the variable nb to check when to exit the loop (start with 0)
    2. tLoop : setup the condition to exit the loop when variable nb < 0
    3. tJava: setup the offset of records in order to get new records of each loop
  3. To get data from the source by using start row number from "nb" and max row number from "limit".  It read schema from the source(meta data)
  4. To replace invalid character such as | ; \n .  Checking this on the Logging session. If we have KO records, we need to add field into the tReplace
  5. Generate output file and save to  DATA\DEV\DATA_OCEAN_DOMAIN_INDUSTRIAL\Tmp\SAP\[table name]
  6. Update the offset number "nb" = "nb" + "limit"
  7. Update "nb" = -1 when ((Integer)globalMap.get("tReplace_1_NB_LINE"))<= 0  in order to exit the loop
  8. Upload the files all the folder(point number 5 to GCS to  cs-ew1-prj-data-dm-industrial-[env]-staging/SPF_IND_0000_0000_F001_I_H_[TableName]
  9. Delete all the files in the folder (point number 5)

2. Flow job

It is control the filename, target staging and ods tables

  1. Setup meta_run_id and filename of the output file
  2. Get the last load from table STG.incremetnal_load, control by the variable l_VAR_eBatch_PF1_QAPP_INC_LOAD and configuration the logic of the incremental load in tJava to use the date from incremental_load to the field of create or change date in the SAP
  3. Call the detail job and pass parameters such as user/password, query from point number 2 to do the incremental load and save the file to GCS
  4. Call the standard job to upload the files from GCS to ODS
  5. If the loading is OK and parameter l_VAR_eBatch_PF1_[table_name]_additional_filter = incremental, update the time on the table incremental_load. If the value is not incremental, it is the reloading(check on point 1.3)
  6. If everything is OK, update the log. 

Context Variable

  • l_VAR_PF1_[table]_BACKET               = bucket name
  • l_VAR_PF1_[table]_BQ_Table_ODS    = ods table name
  • l_VAR_PF1_[table]_BQ_Table_STG     = stg table name
  • l_VAR_PF1_[table]_INC_LOAD           = meta_file_name of the table incremental_loading table normally it is "PF1_[table]"
  • l_VAR_PF1_[table]_additional_filter   = "incremental" when loading by checking incremental from table incremental_loading table. It can change to any reloading such as LAEDA > '20240101'
  • l_VAR_PF1_[table]_email_recipient    = recipient of the email eco-dataops@solvay.com
  • l_VAR_PF1_[table]_email_flag            = yes when want to send email to inform when incremental load has issue (max meta_stg_insert_date from STG < date in incremental_loading table), else the email will not send

3. Access rights

To access to SAP, the Talend user RFC_TAL_PF1 is required

4. Source

           pf1nonha.eua.solvay.com

5. Format

The format of the source data

Destination

Location

The data will keep in

Bucket = cs-ew1-prj-data-dm-industrial-[dev]-staging and it will keep in each folder for each table

DataOean GCP = prj-data-dm-industrial-[env]

Product GCP = prj-data-sad-ebatch-[env]

To save the data into GCP Industrial Data Ocean, service account is required

Format

Same as the source

Sizing

Expected data volume for full load (as of May 2024)

Table

Table Name

Size

LFA1

Vendor Master (General Section)

229,553

QPAC

Inspection catalog codes for selected sets

2,314

QPAM

Inspection catalog selected sets

741

QPCD

Inspection catalog codes

26,828

QPCT

Code texts

228,149


The rest will be incremental hourly 

Assessment

How to validate that the generated output is valid: Compare with table in PF1

Loading

1. Incremental Load

The main job is F100_SPF_IND_QM_Main to run all the incremental load, which include the loading of table MCH1, MCHA, QALS, AFVC, QAMR, QAMV, QAVE, MSEG, PLPO in sequence in order to limit the number of background job in SAP. 

TableJobSTGODSIncremental load by 

MCH1

F001_SPF_F001_I_H_MCH1_TO_BQ

STG_SPF_0000_0000_F001_I_H_mch1

ODS_SPF_0000_F001_I_H_mch1

ERSDA - Create date

LAEDA - Modify date

MCHA

F001_SPF_F001_I_H_MCHA_TO_BQ

STG_SPF_0000_0000_F001_I_H_mcha

ODS_SPF_0000_F001_I_H_mcha

ERSDA- Create date

LAEDA - Modify date

QALS

F001_SPF_F001_I_H_QALS_TO_BQ

STG_SPF_0000_0000_F001_I_H_qals

ODS_SPF_0000_F001_I_H_qals

ERSTELDAT- Create date

AENDERDAT - Modify date

AFVC

F001_SPF_F001_I_H_AFVC_TO_BQ

STG_SPF_0000_0000_F001_I_H_afvc

ODS_SPF_0000_F001_I_H_afvc

Get list of AUFPL from last load of QALS

QAMR

F001_SPF_F001_I_H_QAMR_TO_BQ

STG_SPF_0000_0000_F001_I_H_qamr

ODS_SPF_0000_F001_I_H_qamr

ERSTELLDAT- Create date

AENDERDAT - Modify date

QAMV

F001_SPF_F001_I_H_QAMV_TO_BQ

STG_SPF_0000_0000_F001_I_H_qamv

ODS_SPF_0000_F001_I_H_qamv

ERSTELLDAT- Create date

AENDERDAT - Modify date

QAVE

F001_SPF_F001_I_H_QAVE_TO_BQ

STG_SPF_0000_0000_F001_I_H_qave

ODS_SPF_0000_F001_I_H_qave

VDATUM- Create date

VAEDATUM - Modify date

MSEG

F001_SPF_F001_I_H_MSEG_TO_BQ

STG_SPF_0000_0000_F001_I_H_mseg

ODS_SPF_0000_F001_I_H_mseg

CPUDT_MKPF- Create date

/BEV2/ED_AEDAT - Modify date

QASRF001_SPF_F001_I_H_QASR_TO_BQ

STG_SPF_0000_0000_F001_I_H_qasr

ODS_SPF_0000_F001_I_H_qasr

ERSTELLDAT- Create date

AENDERDAT - Modify date

QAPPF001_SPF_F001_I_H_QAPP_TO_BQ

STG_SPF_0000_0000_F001_I_H_qapp

ODS_SPF_0000_F001_I_H_qapp

ERSTELDAT- Create date

AENDERDAT - Modify date

AUFK

F001_SPF_F001_I_H_AUFK_TO_BQ

STG_SPF_0000_0000_F001_I_H_aufk

ODS_SPF_0000_F001_I_H_aufk

ERDAT - Create date

AEDAT - Change date

CKMLMV001

F001_SPF_F001_I_H_CKMLMV001_TO_BQ

STG_SPF_0000_0000_F001_I_H_CKMLMV001

ODS_SPF_0000_F001_I_H_CKMLMV001

ERDAT - Create date


The job AFVC is required to run after QALS because the job will select the list of AUFPL (Routing number of operations in the order) from last load of QALS (max meta_business_date).  In case of reload, it will rely on QALS table as well. 

2. Full Load

Job F101_SPF_IND_QM_Main_Full will manage the full load job. 

Table

Job

STG

ODS

LFA1

F001_SPF_F001_F_D_LFA1_TO_BQ

STG_SPF_0000_0000_F001_F_H_lfa1

ODS_SPF_0000_F001_F_H_lfa1

QPAC

F001_SPF_F001_F_D_QPAC_TO_BQ

STG_SPF_0000_0000_F001_F_H_qpac

ODS_SPF_0000_F001_F_H_qpac

QPAM

F001_SPF_F001_F_D_QPAM_TO_BQ

STG_SPF_0000_0000_F001_F_H_qpam

ODS_SPF_0000_F001_F_H_qpam

QPCD

F001_SPF_F001_F_D_QPCD_TO_BQ

STG_SPF_0000_0000_F001_F_H_qpcd

ODS_SPF_0000_F001_F_H_qpcd

QPCT

F001_SPF_F001_F_D_QPCD_TO_BQ

STG_SPF_0000_0000_F001_F_H_qpct

ODS_SPF_0000_F001_F_H_qpct

3. Reloading data

All the job will have context parameter l_VAR_eBatch_PF1_[TableName]_additional_filter to change the selection when extract the SAP. For incremental load, this context MUST BE "incremental". If it is blank, it will get data from 2023. 

Note: the incremental will check only date not the time. 

Example of context reload

l_VAR_eBatch_PF1_AFVC_additional_filter = AUFPL >= '1007995974' and AUFPL <= '1009027526'

l_VAR_eBatch_PF1_MCH1_additional_filter = ERSDA  > '20230101'

4. Scheduling

PL_INDUS_EBATCH_SPF_QM_INC_LOAD  (incremental load) run every 1 hour at xx:10 everyday include weekend. 

PL_INDUS_EBATCH_SPF_QM_FULL_LOAD  (full load) run every workday at 08:00 AM CET (daily)

5. Timing

The average time expected for :

  • full process - 7 - 15 min
  • incremental process : 5 - 10 min

Criticality

High

Logging

Industrial

1. Table log:  This sql will get the status log that have job name from source system SPF (PF1)

select   job.job_name ,  job.meta_start_date ,  logs.meta_run_id ,  logs.meta_source_system ,  logs.meta_step ,  logs.meta_status ,  logs.meta_num_lines ,  logs.meta_error_lines   from   STG.log_tables   logs   join   STG.run_jobs   job   on   logs.meta_run_id  =  job.meta_run_id
where    logs.meta_run_id   in   ( SELECT   meta_run_id   FROM   STG.run_jobs   order   by   meta_start_date   desc   limit   100 )
and   job_name   like   '%SPF%'
and   meta_start_date   >    DATE_SUB ( CURRENT_TIMESTAMP () ,  INTERVAL   1   DAY )
order   by   job.meta_start_date   desc

2.  Incremental table:  Every time that the job loading complete, max time from staging table will update this table. Therefore, it is easier to know which table is not updated. 

Table: STG.incremental_loading

  • No labels