单元 4. 设计数据到达监控与事件触发机制


模型的训练或模型推理都需要使用到数据,每天会有新的数据产生,且数据产生的时间并不固定。因此需要在数据到达后第一时间触发任务流的运行。


本节描述设计数据到达监控与事件触发的任务流。

数据准备

编排任务流前,已通过以下命令在 Hive 中创建了一个 status_tbl 表,用于记录每天数据的更新情况:

create table status_tbl(masterid string, updatetime timestamp, flag int);


status_tbl 表结构及字段说明如下:

字段名称

描述

masterid

场站ID

updatetime

插入数据的时间

flag

状态,1表示数据已写入,0表示数据已使用

设计任务流

当源数据更新完成后,可在 status_tbl 表中插入一条记录,表示某个场站的数据更新已完成。任务流监控到状态表中的数据更新后,就会对该场站的数据进行加工处理。处理完成后,将该场站的状态更新为0,表示该场站当天的数据已使用。


状态表的实时监控可通过在任务流前加上 Recursion 算子来实现,具体过程如下:

  1. 任务流运行,使用监控状态表的数据。

  2. 判断获取的状态是否满足退出条件,如果不满足,则继续轮询,查询该状态表。

  3. 当满足退出条件时(可以在 Hive 表中插入一条满足退出条件的记录),表明数据已更新,可以进行后续的处理。


任务流前加上 Recursion 算子后,完成编排的任务流如下图所示:

../_images/pipeline_overview_1.png


在 Recursion 算子的子画布中,使用以下算子编排任务流:

  1. Hive 算子:从 Hive 中查询状态表数据,并获取 Hive 算子所需的 keytab 和 kerberos 配置文件

  2. Git Directory 算子:从 Git 目录获取 transform3.py 文件,用于 Python 算子的输入

  3. Python 算子:对输入文件做格式化处理


将算子拖到编辑画布,完成编排后的任务流如下图所示:

../_images/recursion_pipeline_overview.png


任务流中编排的每个算子的配置说明如下:

Recursion 算子

名称:Recursion

描述:监控数据到达,触发事件

配置参数

Recursion 算子的表达式格式为:

引用 | Transform3.list_output1 | != | 声明 | ABCDE00012020-06-221

算子配置示例如下图所示:

../_images/recursion.png

Hive 算子

名称:Hive(1)

描述:从 Hive 中查询状态表数据、keytab、和 krb5 配置文件

输入参数

参数名称

数据类型

操作类型

data_source_name

String

声明

注册的 Hive 数据源名称

sqls

List

声明

[“set tez.am.resource.memory.mb=1024”,”select masterid, date(updatetime) as updatetime, flag from status_tbl”]

queue

String

声明

root.eaptest01(通过资源管理申请的大数据队列名称)

输出参数

参数名称

resultset

file

算子配置示例如下图所示:

../_images/hive_config_1.png

Git Directory 算子

名称:Git Directory

描述:从 Git 目录拉取 Python 代码文件

输入参数

参数名称

数据类型

操作类型

data_source_name

String

声明

注册的 Git 数据源名称

branch

String

声明

master

project

String

声明

workspace1

paths

List

声明

[“workspace1/kmmlds/transform3.py”]

输出参数

参数名称

workspace

directory

paths

list

算子配置示例如下图所示:

../_images/git_directory_3.png

Python 算子

名称:Transform3

描述:对输入文件做格式化处理

输入参数

参数名称

数据类型

操作类型

workspace

Directory

引用

Git Directory.workspace

entrypoint

String

声明

workspace1/kmmlds/transform3.py

requirements_file_path

String

声明

list_data

file

引用

Hive(1).resultset

输出参数

参数名称

list_output1

String

算子配置示例如下图所示:

../_images/python_transform_3.png

下一单元

运行和发布任务流