开发高阶流数据处理任务¶
在高阶流数据处理任务中,系统任务流能将流数据处理任务需要的数据分别输出到不同的 Kafka Topic 中。在下游流数据处理任务的配置项中,按需配置输入数据源 Kafka Topic,从而有效地节约计算资源。
新建高阶流数据处理任务¶
前提条件
新建高阶流数据处理任务之前,需确保组织已通过 EnOS 管理控制台 > 资源管理 页面申请 流数据处理-消息队列 资源。
EnOS 流数据处理服务提供多个版本的系统算子包,并支持上传自定义算子包。在设计流数据处理任务前,需要先安装对应版本的系统算子包或自定义算着包。详细信息,参见 安装算法模板和算子包。
通过以下步骤,开发高阶流数据处理任务:
登录 EnOS 管理控制台,选择 流数据处理 > 流开发,点击任务列表上方的 + 图标。
在 添加流 窗口中,选择 高阶 流类型。
选择 新建 方式创建流数据处理任务。也可通过导入配置文件快速创建流数据处理任务。
输入流数据处理任务的名称和描述。
从 算子版本 下拉菜单中,选择已安装的系统算子包版本或自定义算子包版本。
点击 确认,进入数据处理任务设计页面。
设计流数据处理任务¶
通过以下步骤,按业务需求使用算子设计流数据处理任务:
在流数据处理任务设计页面中,点击页面右上角的 Stage Library,从下拉菜单中找到需要使用的算子。点击数据处理算子(如 Point Selector),将其添加到 Pipeline 编辑页面。
拖拽 Stage 和连接线,将添加的 Stage 编排进 Pipeline 中。选中添加的算子,在配置项中完成对该算子的参数配置。
重复步骤1、2,将其他算子编排进 Pipeline 中,并完成各个 Stage 的参数配置。
点击任务栏中的 保存,保存流数据处理任务的配置信息。
完成算子配置后,点击工具栏中的 Validate 图标,检查 Pipeline 和算子参数配置是否正确,并按照检查结果修改配置。
更多使用算子配置 Pipeline 的详细介绍,参考 StreamSet 文档。
导入流数据处理任务配置¶
在数据处理任务设计页面上,除了设计全新的流数据处理任务外,也可通过导入现有流数据处理任务配置文件,快速创建和配置流数据处理任务。
在流数据处理任务设计页面中,点击任务栏中的 导入配置。
浏览并选择流数据处理任务配置文件,然后点击 确定。
根据业务需要,编辑和保存导入的流数据处理任务。
发布和运行流数据处理任务¶
算子配置检查通过后,即可将流数据处理任务发布上线并运行。
点击任务栏中的 发布,将流数据处理任务发布上线。
进入 流数据处理 > 流运维 页面,查看已发布的流数据处理任务,其默认状态为 PUBLISHED。
完成流数据处理任务的运行配置和告警配置,确保相关的系统流任务已启动,然后启动流数据处理任务。
有关流数据处理任务运维相关的详细信息,参考 维护流数据处理任务。
自定义 Topic Reader 和 Topic Writer 流任务常见配置方法¶
高阶流数据处理任务的最大特点是开放了 Normalizer 算子。配置自定义 Topic Reader 流任务时,需要使用 Point Selector 算子选择测点,并将测点数据输出到不同 Topic。
Topic Writer 流任务可以把 Internal Topic 中的数据输出到 Cal Topic 中,以便下游应用消费,将数据写入 TSDB 中。
注意:
配置自定义 Topic Reader 和 Topic Writer 流任务时,尽量不要使用除 EDH Kafka Consumer、EDH Kafka Producer、和 Point Selector 以外的算子。
发布流数据处理任务时,自动生成的系统流任务 Data Reader 和 Data Writer 可以被选择性地停止。
如需直接输出数据到 Cal Topic,需要满足如下格式:
无质量位的数据
{ "orgId":"1b47ed98d1800000", "modelId":"inverter", "modelIdPath":"/rootModel/inverter", "payload": { "measurepoints": { "tempWithoutQuality":23.4 }, "time":1542609276270, "assetId":"zabPDuHq" }, "dq": { "measurepoints": { "tempWithoutQuality":1 }, "time":1542609276270, "assetId":"zabPDuHq" } }
有质量位的数据
{ "orgId":"1b47ed98d1800000", "modelId":"inverter", "modelIdPath":"/rootModel/inverter", "payload": { "measurepoints": { "tempWithQuality": { "value":23.4, "quality”:0 } }, "time":1542609276270, "assetId":"zabPDuHq" }, "dq": { "measurepoints": { "tempWithQuality": 1 }, "time":1542609276270, "assetId":"zabPDuHq" } }
参数说明
字段 |
对应设备模型字段 |
说明 |
---|---|---|
orgId |
TSLModel.ou |
组织 ID,供下游订阅和告警服务使用。 |
modelId |
TSLModel.tslModelId |
模型 ID,用户自定义的模型标识符,供下游订阅和告警服务使用。 |
modelIdPath |
n/a |
用户自定义模型的完整路径。 |
assetId |
TSLInstance.tslInstanceId |
资产 ID,用于兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。 |
time |
n/a |
时间戳。 |
measurepoints |
TSLIdentifier.identifier |
测点 ID 和测点数据。
|
自定义 Topic Reader 和 Topic Writer 流任务的配置方法示例如下:
Topic Reader
Topic Writer
算子参考文档¶
对算子的功能、配置参数、和输出结果的详细介绍,参考 算子参考说明。