Page tree


Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Description

The data source getting from data ocean marketingprj-data-dq-selfservice, which get from salesforce (SFC) on the Case DataOcean_industrialreporting_workspace.FixedCosts tables in order to get the number of complaints comparing with BW query QVBW_QRY_MVSDSO57_0002 QVBW - Deliveries Detials SO&PO (core query), which get the number of delivery itemsinformation of FixedCosts.

Tools: Talend

Talend project DATA_OCEAN_DOMAIN_MARKETINGIND_DASHBOARD

  • F001_GSheet_site_to_BQ_FixedCosts

  • F020_SFC_case_customer_complaint_to_ODS (number of complaints)

  • F001_QVBW_QRY_MVSDSO57_0002_to_ODS (number of delivery item)

From source to ODS 

Job: F001_QVBWGSheet_QRYsite_MVSDSO57to_0002BQ_to_ODS (number of delivery itemFixedCosts (FixedCosts)

Image RemovedImage Added

  1. Define variable variable and Generate meta_run_id
  2. Check the incremental load.  If variable l_VAR_XTRACT_PARA_TALEND_QVBW_QRY_MVSDSO57_0002 = currentmonth, it will load from BW last month to current month, else must be this format &YYYYMM_Start=202306&YYYYMM_End=202311
  3. Call reference job to load from BW Gsheet and save the output file to bucket cs-ew1-prj-data-dmindustrial-marketingdash-dev-staging/TALEND_DEV_QVBW_QRY_MVSDSO57_0002fc/
  4. Load from Bucket to STG and ODS
  5. Update log

Job:F020_SFC_case_customer_complaint_to_ODS (J020_SFC_case_customer_complaint_to_ODS)

Image Removed

  1. Move data from ODS to DM
  2. Update log
  3. Connect to Salesforce (SFC)
  4. Query SFC on the case table 
  5. Write output to file in local PC
  6. Put the file to bucket cs-ew1-prj-data-dm-marketing-dev-staging/Case/
  7. Delete the output file in local PC
  8. Write log on the main flow job

From DM to Operational Dashboard


Dataflow

prj-data-dmdq-marketingselfservice-devtest.ODS.ODSDataOcean_BWH_0000_F001_F_M_qvbw_qry_mvsdso57_0002 → prj-data-dm-marketing-dev.ODS.V_ODS_BWH_0000_F001_F_M_qvbw_qry_mvsdso57_0002 industrialreporting_workspace.FixedCosts → prj-data-dm-marketing-dev.ODS.VB_BWH_no_of_delivery_orders → prj-data-dm-marketing-dev.DS_prj_industrial_dash.V_BWH_no_of_delivery_orders => prj-data-industrial-dash-dev.DataOceanSTG.VSTG_BWHFIL_no_of_delivery_orders → prj-data-industrial-dash-dev.DPL.V_BWH_no_of_delivery_ordersprj-data-dm-marketing-dev.ODS.ODS_SFC_0000_0000_F001_F_M_case fc → prj-data-dmindustrial-marketingdash-dev.ODS.V_ODS_SFCFIL_0000_F001_F_M_case fc → prj-data-dm-marketing-dev.ODS.VB_sfc_case → prj-data-dm-marketing-dev.DS_prj_industrial_dash.V_sfc_case ==> prj-data-industrial-dash-dev.DataOceanDM.VFACT_sfc_case fixedCosts → prj-data-industrial-dash-dev.DPL.V_sfcFACT_casefixedCosts 

Access rights

It is required to access BW query via Xtract.  

and access to SFC

Source

BW

BW query = QVBW_QRY_MVSDSO57_0002

Xtract job = TALEND_[ENV]_QVBW_QRY_MVSDSO57_0002

SFC (https://login.salesforce.com/services/Soap/u/52.0)

Condition that select from SFC

FROM  Case
WHERE CreatedDate >= 2020-01-01T00:00:00Z
and PO2_CASE_Organization__c='ECO'

Control variable to connect

  • l_LOCAL_Domain_Marketing_SF_USER
  • l_LOCAL_Domain_Marketing_SF_PASSWORD
  • l_LOCAL_Domain_Marketing_SF_TOKEN
  • l_LOCAL_Domain_Marketing_SF_endpoint

Format

columnar format

  1. prj-data-dq-selfservice-test 
  2. prj-data-industrial-dash-[env]

Source

BigQuery

Project = prj-data-dq-selfservice-test

BQ Dataset = DataOcean_industrialreporting_workspace

BQ Table = FixedCosts

Destination

DataOcean

  • Bucket = cs-ew1-prj-data-dmindustrial-marketingdash-dev[env]-staging/
    • TALEND_DEV_QVBW_QRY_MVSDSO57_0002
    • Case
  • DataOean GCP = prj-data-dm-marketing-[env]
    • fc
    • FIL_IND
    STG Table names =
    • STG_BWH_0000_0000_F001_20241126224947_0000_F_M_qvbw_qry_mvsdso57_0002fc.csv
  • STG Table names =
    • STG_

    • SFC
    • FIL_0000_0000_F001_F_M_

    • case
    • fc

  • ODS Table names =
    • ODS_

  • BWH
    • FIL_0000_F001_F_M_

  • qvbw_qry_mvsdso57_0002ODS_SFC_0000_F001_F_M_case
    • fc

  • DM Table names =
    • FACT_fixedCosts
  • DPL View name = 
    • V_FACT_fixedCosts

Product

  • GCP = prj-data-industrial-dash-[env]DataOcean
  • V_BWH_no_of_delivery_orders
  • DataOcean.V_sfc_case

Format

columnar format

Sizing

  • STG_BWHFIL_0000_0000_F001_F_M_qvbw_qry_mvsdso57_0002 fc         around       around   52,200 records / month, Xtract job spend around 2 min / month maximum 6 months. 
  • STG_SFC_0000_0000_F001_F_M_case                                                   around  18,576 records

Assessment

Data same as BW query and SFC

Loading

1.1 Incremental Load

Delivery Source to ODS : PL_MKT_QVBW_QRY_MVSDSO57_0002

control by l_VAR_XTRACT_PARA_TALEND_QVBW_QRY_MVSDSO57_0002 = currentmonth, it will load last month to current month on 0CALMONTH on

Case (number of complaint) is not available for incremental

1.2 Full load

Delivery N/A

Case (number of complaint) Source to ODS : PL_MKT_SFC_CASE

1.3. Reloading data

Delivery 

  • l_VAR_XTRACT_PARA_TALEND_QVBW_QRY_MVSDSO57_0002  = &YYYYMM_Start=202301&YYYYMM_End=202305.  Maximum should not over 6 months otherwise Xtract will be out of memory

Case (number of complaint)

  • Just run the PL_MKT_SFC_CASE again.

...

  • 258,056 records

Loading

1.1 Full load

  • FixedCosts Source to ODS : TASK_WeeklyExecution_FixedCosts

1.2. Reloading data

FixedCosts

  • Just run the TASK_WeeklyExecution_FixedCosts.

1.3 Plan to schedule

It is scheduled by plans below on WS_DATA_OCEAN_DOMAIN_MARKETINGIND_DASHBOARD

  • TASK_DailyExecution_FixedCosts weekly every 30 min starting at 07am CET ~ 6pm
  • PL_MKT_QVBW_QRY_MVSDSO57_0002 Monthly on 1st at 05:00 CET
  • PL_MKT_SFC_CASE                                  Monthly on 1st at 05:00
  • CET

1.

...

4 Timing

Delivery 10 minutes from source to ODS (current month)

...

  • Case 5 minutes from source to ODS (full)

Criticality

  • Low?

Logging

Check the loading records 

selectjob.job_name, job.meta_start_date, job.meta_execution_id, logs.meta_run_id, logs.meta_source_system, logs.meta_step, logs.meta_status, logs.meta_num_lines, logs.meta_error_linesfromSTG.log_tableslogsjoinSTG.run_jobsjobonlogs.meta_run_id = job.meta_run_id
where  logs.meta_run_idin(SELECTmeta_run_idFROMSTG.run_jobsorderbymeta_start_datedesclimit1000)
andjob_namein('F001_QVBWGSheet_QRYsite_MVSDSO57_0002_to_ODS','F020_SFC_case_customer_complaint_to_ODSBQ_FixedCosts')
andmeta_start_date>  DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL32DAY)
orderbyjob.meta_start_datedesc

...