Unit 4. Designing Mechanism for Monitoring Data Arrival


The data is required for both model training and model inference, and new data is generated at a non-fixed frequency every day. Therefore, the pipeline needs to be triggered as soon as the data arrives.


This section describes how to design the pipeline of data arrival monitoring and event triggering.

Data Preparation

Before orchestrating a pipeline, a status_tbl table should be created in Hive through the following command to record the daily data update:

create table status_tbl(masterid string, updatetime timestamp, flag int);


The structure and fields of the status_tbl table are shown as follows:

Field name Description
masterid Site ID
updatetime Data insertion time
flag Status: 1 stands for that the data is written, while 0 stands for the data is used

Designing Pipelines

When the source data is updated, a record can be inserted in the status_tbl table to indicate that the data update of a certain site has been completed. After the pipeline identifies that the data in the status table is updated, it will process the data of the site. After the processing is completed, the status of the site will be updated to 0, indicating that the data of the site on that day has been used.


The real-time monitoring of the status table can be implemented by adding the Recursion operator before the pipeline. The specific process is given as follows:

  1. The pipeline starts running, using the data of the monitoring status table.
  2. The system judges whether the obtained status meets the exit condition, and if it is not met, the polling continues to query the status table.
  3. When the exit condition is met (where you can insert a record that meets the exit condition in the Hive table), it indicates that the data has been updated and it is allowed to perform the subsequent processing.


After adding the Recursion operator in front of the pipeline, the pipeline after orchestration is shown in the figure below:

../_images/pipeline_overview_1.png


In the sub-canvas of the Recursion operator, use the following operators to orchestrate the pipeline:

  1. Hive operator: query the status table data from Hive and get the keytab and kerberos profiles required by the Hive operator
  2. Git Directory operator: get the file transform3.py from the Git directory and use it as input of Python operator
  3. Python operator: format the input file


Drag the operators to the editing canvas, and the pipeline after orchestration is shown in the figure below:

../_images/recursion_pipeline_overview.png


The configuration instructions for each operator orchestrated in the pipeline are given as follows:

Recursion Operator

Name: Recursion

Description: monitor the data arrival and trigger corresponding event

Configuration parameters

The expression format of the Recursion operator is given as follows:

Reference | Transform3.list_output1 | != | Declaration | ABCDE00012020-06-221

An sample of operator configuration is given as follows:

../_images/recursion.png

Hive Operator

Name: Hive(1)

Description: query status table data, keytab, and krb5 profiles from Hive

Input parameters

Parameter Name Data type Operation Type Value
data_source_name String Declaration Name of the registered Hive data source
sqls List Declaration [“set tez.am.resource.memory.mb=1024”,”select masterid, date(updatetime) as updatetime, flag from status_tbl”]
queue String Declaration root.eaptest01 (Name of the big data queue applied for through resource management)

Output parameters

Parameter Name Value
resultset File

An sample of operator configuration is given as follows:

../_images/hive_config_1.png

Git Directory operator

Name: Git Directory

Description: pull the Python code file from the Git directory

Input parameters

Parameter Name Data type Operation Type Value
data_source_name String Declaration Name of the registered Git data source
branch String Declaration master
project String Declaration workspace1
paths List Declaration [“workspace1/kmmlds/transform3.py”]

Output parameters

Parameter Name Value
workspace directory
paths list

An sample of operator configuration is given as follows:

../_images/git_directory_3.png

Python Operator

Name: Transform3

Description: format the input file

Input parameters

Parameter Name Data type Operation Type Value
workspace Directory Reference Git Directory.workspace
entrypoint String Declaration workspace1/kmmlds/transform3.py
requirements_file_path String Declaration  
list_data File Reference Hive(1).resultset

Output parameters

Parameter Name Value
list_output1 String

An sample of operator configuration is given as follows:

../_images/python_transform_3.png