Unit 4. Designing Mechanism for Monitoring Data Arrival


The data is required for both model training and model inference, and new data is generated at a non-fixed frequency every day. Therefore, the pipeline needs to be triggered as soon as the data arrives.


This section describes how to design the pipeline of data arrival monitoring and event triggering.

Data Preparation

Before orchestrating a pipeline, a status_tbl table should be created in Hive through the following command to record the daily data update:

create table status_tbl(masterid string, updatetime timestamp, flag int);


The structure and fields of the status_tbl table are shown as follows:

Field name

Description

masterid

Site ID

updatetime

Data insertion time

flag

Status: 1 stands for that the data is written, while 0 stands for the data is used

Designing Pipelines

When the source data is updated, a record can be inserted in the status_tbl table to indicate that the data update of a certain site has been completed. After the pipeline identifies that the data in the status table is updated, it will process the data of the site. After the processing is completed, the status of the site will be updated to 0, indicating that the data of the site on that day has been used.


The real-time monitoring of the status table can be implemented by adding the Recursion operator before the pipeline. The specific process is given as follows:

  1. The pipeline starts running, using the data of the monitoring status table.

  2. The system judges whether the obtained status meets the exit condition, and if it is not met, the polling continues to query the status table.

  3. When the exit condition is met (where you can insert a record that meets the exit condition in the Hive table), it indicates that the data has been updated and it is allowed to perform the subsequent processing.


After adding the Recursion operator in front of the pipeline, the pipeline after orchestration is shown in the figure below:

../_images/pipeline_overview_1.png


In the sub-canvas of the Recursion operator, use the following operators to orchestrate the pipeline:

  1. Hive operator: query the status table data from Hive and get the keytab and kerberos profiles required by the Hive operator

  2. Git Directory operator: get the file transform3.py from the Git directory and use it as input of Python operator

  3. Python operator: format the input file


Drag the operators to the editing canvas, and the pipeline after orchestration is shown in the figure below:

../_images/recursion_pipeline_overview.png


The configuration instructions for each operator orchestrated in the pipeline are given as follows:

Recursion Operator

Name: Recursion

Description: monitor the data arrival and trigger corresponding event

Configuration parameters

The expression format of the Recursion operator is given as follows:

Reference | Transform3.list_output1 | != | Declaration | ABCDE00012020-06-221

An sample of operator configuration is given as follows:

../_images/recursion.png

Hive Operator

Name: Hive(1)

Description: query status table data, keytab, and krb5 profiles from Hive

Input parameters

Parameter Name

Data type

Operation Type

Value

data_source_name

String

Declaration

Name of the registered Hive data source

sqls

List

Declaration

[“set tez.am.resource.memory.mb=1024”,”select masterid, date(updatetime) as updatetime, flag from status_tbl”]

queue

String

Declaration

root.eaptest01 (Name of the big data queue applied for through resource management)

Output parameters

Parameter Name

Value

resultset

File

An sample of operator configuration is given as follows:

../_images/hive_config_1.png

Git Directory operator

Name: Git Directory

Description: pull the Python code file from the Git directory

Input parameters

Parameter Name

Data type

Operation Type

Value

data_source_name

String

Declaration

Name of the registered Git data source

branch

String

Declaration

master

project

String

Declaration

workspace1

paths

List

Declaration

[“workspace1/kmmlds/transform3.py”]

Output parameters

Parameter Name

Value

workspace

directory

paths

list

An sample of operator configuration is given as follows:

../_images/git_directory_3.png

Python Operator

Name: Transform3

Description: format the input file

Input parameters

Parameter Name

Data type

Operation Type

Value

workspace

Directory

Reference

Git Directory.workspace

entrypoint

String

Declaration

workspace1/kmmlds/transform3.py

requirements_file_path

String

Declaration

list_data

File

Reference

Hive(1).resultset

Output parameters

Parameter Name

Value

list_output1

String

An sample of operator configuration is given as follows:

../_images/python_transform_3.png

Next Unit

Running and Publishing Pipeline