Description
Tools: Talend
Detail job
- F010_Helix_Incidents
- F040_Helix_Measurement
- F050_Helix_Request
- F030_Helix_ServiceRequestStub
- F021_Helix_WorkInfo
- F011_Helix_Worklog
- F020_Helix_WorkOrder_json
- Connect to the source system API by reading context from flow job
- Setup loop to get the data
- tSetGlobalVar : to set the maximum number of records to read each time and set the variable nb to check when to exit the loop (start with 0)
- tLoop : setup the condition to exit the loop when variable nb < 0
- tJava: setup the offset of records in order to get new records of each loop
- To get data from the source by using start row number from "nb" and max row number from "limit". It read schema from the source(meta data)
- Generate output file and save to DATA\DEV\DATA_OCEAN_DOMAIN_DT\Tmp
- Update the offset number "nb" = "nb" + "limit"
- Update "nb" = -1 when ((Integer)globalMap.get("tReplace_1_NB_LINE"))<= 0 in order to exit the loop
- Upload the files all the folder( cs-ew1-prj-data-dm-dt-[dev]-staging)
- Delete all the files in the folder (point number 5)
Flow job
Below are the list of plan names which are used to invoke the above mentioned Talend jobs.
- PL_DT_F020_Helix_WorkOrder_json
- PL_DT_F011_Helix_Worklog
- PL_DT_F021_Helix_WorkInfo
- PL_DT_F030_Helix_ServiceRequestStub
- PL_DT_F050_Helix_Request
- PL_DT_F040_Helix_Measurement
- PL_DT_F010_Helix_Incidents
- Setup meta_run_id and filename of the output file
- Get the last load from table STG.incremetnal_load, control by the variable l_VAR_eBatch_PF1_QAPP_INC_LOAD and configuration the logic of the incremental load in tJava to use the date from incremental_load to the field of create or change date in the SAP
- Call the detail job and pass parameters such as user/password, query from point number 2 to do the incremental load and save the file to GCS
- Call the standard job to upload the files from GCS to ODS
- If the loading is OK and parameter l_VAR_heliux_[table_name]_reload = incremental, update the time on the table incremental_load. If the value is not incremental, it is the reloading
- If everything is OK, update the log.
Access rights
Source
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/HPD:Help%20Desk
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/SLM:Measurement
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/SRM:Request
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/SB:ServiceRequestStub
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/WOI:WorkInfo
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/HPD:WorkLog
- https://solvay-restapi.onbmc.com/api/arsys/v1.0/entry/WOI:WorkOrder
Format
- JSON
Destination
Location
- Bucket = cs-ew1-prj-data-dm-dt-[dev]-staging/xxx
- DataOean GCP = prj-data-dm-dt-[env]
- STG Table names =
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_HD_incidents
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_Measurement
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_ServiceRequest
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_ServiceRequestStub
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_Workinfo
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_worklog
- prj-data-dm-dt-[env].STG.STG_HLX_0000_0000_F001_I_H_workorder_json
- ODS Table names =
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_Incidents
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_Measurement
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_ServiceRequest
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_ServiceRequestStub
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_Workinfo
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_worklog
- prj-data-dm-dt-dev.ODS.ODS_HLX_0000_F001_I_H_workorder_json
- DPL View names =
- prj-data-dm-dt-[env].DPL.V_FACT_hlx_work_order
- prj-data-dm-dt-[env].DPL.V_FACT_hlx_Workinfo
- prj-data-dm-dt-[env].DPL.V_FACT_hlx_service_request_stub
- prj-data-dm-dt-[env].DPL.V_FACT_hlx_measurement
- prj-data-dm-dt-[env].DPL.V_DIM_hlx_incident
- prj-data-dm-dt-[env].DPL.V_FACT_hlx_ServiceRequest
- prj-data-dm-dt-[env].DPL.V_DIM_hlx_status
Format
- columnar format
Sizing
Assessment
How to validate that the generated output is valid:
Loading
1.1 Incremental Load
It is control by variables:
l_VAR_helix_[object]_reload: If it is "increment", it will get the last load from table STG.incremental_loading and using meta_file_name without _RELOAD
l_VAR_helix_[object]_sort: To sort the increment field with ascending order in order keep track the last update
l_VAR_helix_[object]_limit: The number of records to get from API each time
l_VAR_helix_[object]_nloop: the number of loop for each load. (the number of limit x nloop will be max record on the file to load each time to BQ)
l_VAR_helix_[object]_offset: The number of record index from API. It starts with 0. Will change to another number only specific reload case
1.2 Full load
N/A
1.3. Reloading data
It is control by variables:
l_VAR_helix_[object]_reload: If it is NOT "increment", it will the full load with condition on this parameter such as [ and 'Status'="Closed"] Need to starting with " and ..." because the first condition is fixed that incremental field >= mata_loast_process_date of meta_filename = xxx_RELOAD. Then in this case the selection will be 'Last Modified Date'>="3/1/2022 00:00:00" and Status="Closed".
l_VAR_helix_[object]_limit: The number of records to get from API each time
l_VAR_helix_[object]_nloop: the number of loop for each load. (the number of limit x nloop will be max record on the file to load each time to BQ)
l_VAR_helix_[object]_sort: To sort the increment field with ascending order in order keep track the last update. In case the number of records (limit x nloop) still not get all the records, the last modified date will updated to HELIX_INCIDENT_RELOAD. Then the next load will start from this point for the reload.
l_VAR_helix_[object]_offset: The number of record index from API. It starts with 0. Will change to another number only specific reload case
How to:
1. Stop schedule job
2. Change the start date that is required to reload on
UPDATE STG.incremental_loading
SET meta_last_process_date = '2021-12-22 00:00:00' --the date that want to reload
where meta_file_name = 'HELIX_INCIDENTS_RELOAD' -- identify the object that want to reload but the name to change must be _RELOAD. Without _RELOAD will use for incremental load only.
3. Change context l_VAR_helix_incident_reload to the condition that want to reload.
such as %20and%20Status=%22Closed%22
4. Recheck the variables: limit, nloop, offset
5. Run the job to load until HELIX_HD_INCIDENTS_RELOAD is more than HELIX_HD_INCIDENTS. Meaning reload is up-to-date
6. Change the context l_VAR_helix_incident_reload to be "incremental"
7. Reschedule the job
1.4 Plan to schedule
Every hour
1.5 Timing
The average time expected for loading: 5 minutes
Criticality
High/Medium/Low
Logging
1. Check the last loading
select * from STG.incremental_loading
where meta_file_name in ('HELIX_CASE_ITSM','HELIX_INCIDENT','HELIX_SERVICE_REQ','HELIX_WORKORDER','KADISKA_RUM')
order by meta_file_name
2. Check the loading records / error
select job.job_name, job.meta_start_date, logs.meta_run_id, logs.meta_source_system, logs.meta_step, logs.meta_status, logs.meta_num_lines, logs.meta_error_lines from STG.log_tables logs join STG.run_jobs job on logs.meta_run_id = job.meta_run_id
where logs.meta_run_id in (SELECT meta_run_id FROM STG.run_jobs order by meta_start_date desc limit 100)
and (job_name like '%Helix%' or job_name like '%Kadiska%' or job_name like '%HLX%')
and (meta_step = 'ODS to DM' or meta_step = 'Bucket to Staging')
and meta_start_date > DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
order by meta_start_date desc









