开发高阶流数据处理任务


在高阶流数据处理任务中,系统任务流能将流数据处理任务需要的数据分别输出到不同的 Kafka Topic 中。在下游流数据处理任务的配置项中,按需配置输入数据源 Kafka Topic,从而有效地节约计算资源。

新建高阶流数据处理任务

前提条件

  • 新建高阶流数据处理任务之前,需确保组织已通过 EnOS 管理控制台 > 资源管理 页面申请 流数据处理-消息队列 资源。

  • EnOS 流数据处理服务提供多个版本的系统算子包,并支持上传自定义算子包。在设计流数据处理任务前,需要先安装对应版本的系统算子包或自定义算着包。详细信息,参见 安装算法模板和算子包


通过以下步骤,开发高阶流数据处理任务:

  1. 登录 EnOS 管理控制台,选择 流数据处理 > 流开发,点击任务列表上方的 + 图标。

  2. 添加流 窗口中,选择 高阶 流类型。

  3. 选择 新建 方式创建流数据处理任务。也可通过导入配置文件快速创建流数据处理任务。

  4. 输入流数据处理任务的名称和描述。

  5. 算子版本 下拉菜单中,选择已安装的系统算子包版本或自定义算子包版本。

    _images/creating_advanced_pipeline.png
  6. 点击 确认,进入数据处理任务设计页面。

设计流数据处理任务

通过以下步骤,按业务需求使用算子设计流数据处理任务:

  1. 在流数据处理任务设计页面中,点击页面右上角的 Stage Library,从下拉菜单中找到需要使用的算子。点击数据处理算子(如 Point Selector),将其添加到 Pipeline 编辑页面。

    _images/streamsets_stage_library.png
  2. 拖拽 Stage 和连接线,将添加的 Stage 编排进 Pipeline 中。选中添加的算子,在配置项中完成对该算子的参数配置。

    _images/streamsets_add_stage.png
  3. 重复步骤1、2,将其他算子编排进 Pipeline 中,并完成各个 Stage 的参数配置。

  4. 点击任务栏中的 保存,保存流数据处理任务的配置信息。

  5. 完成算子配置后,点击工具栏中的 Validate 图标,检查 Pipeline 和算子参数配置是否正确,并按照检查结果修改配置。

    _images/streamsets_validation.png

更多使用算子配置 Pipeline 的详细介绍,参考 StreamSet 文档

导入流数据处理任务配置

在数据处理任务设计页面上,除了设计全新的流数据处理任务外,也可通过导入现有流数据处理任务配置文件,快速创建和配置流数据处理任务。

  1. 在流数据处理任务设计页面中,点击任务栏中的 导入配置

  2. 浏览并选择流数据处理任务配置文件,然后点击 确定

    _images/import_advanced_pipeline.png
  3. 根据业务需要,编辑和保存导入的流数据处理任务。

发布和运行流数据处理任务

算子配置检查通过后,即可将流数据处理任务发布上线并运行。

  1. 点击任务栏中的 发布,将流数据处理任务发布上线。

  2. 进入 流数据处理 > 流运维 页面,查看已发布的流数据处理任务,其默认状态为 PUBLISHED

  3. 完成流数据处理任务的运行配置和告警配置,确保相关的系统流任务已启动,然后启动流数据处理任务。

有关流数据处理任务运维相关的详细信息,参考 维护流数据处理任务

自定义 Topic Reader 和 Topic Writer 流任务常见配置方法

高阶流数据处理任务的最大特点是开放了 Normalizer 算子。配置自定义 Topic Reader 流任务时,需要使用 Point Selector 算子选择测点,并将测点数据输出到不同 Topic。

Topic Writer 流任务可以把 Internal Topic 中的数据输出到 Cal Topic 中,以便下游应用消费,将数据写入 TSDB 中。

注意:

  1. 配置自定义 Topic Reader 和 Topic Writer 流任务时,尽量不要使用除 EDH Kafka Consumer、EDH Kafka Producer、和 Point Selector 以外的算子。

  2. 发布流数据处理任务时,自动生成的系统流任务 Data Reader 和 Data Writer 可以被选择性地停止。

  3. 如需直接输出数据到 Cal Topic,需要满足如下格式:

    无质量位的数据

    {
     "orgId":"1b47ed98d1800000",
     "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
     "payload": {
         "measurepoints": {
             "tempWithoutQuality":23.4
         },
         "time":1542609276270,
         "assetId":"zabPDuHq"
     },
     "dq": {
         "measurepoints": {
             "tempWithoutQuality":1
         },
         "time":1542609276270,
         "assetId":"zabPDuHq"
     }
    }
    

    有质量位的数据

    {
     "orgId":"1b47ed98d1800000",
     "modelId":"inverter",
      "modelIdPath":"/rootModel/inverter",
     "payload":
      {
             "measurepoints": {
                 "tempWithQuality": {
                     "value":23.4,
                     "quality”:0
                 }
             },
             "time":1542609276270,
             "assetId":"zabPDuHq"
         },
    "dq": {
             "measurepoints": {
                 "tempWithQuality": 1
             },
             "time":1542609276270,
             "assetId":"zabPDuHq"
         }
    }
    

参数说明

字段

对应设备模型字段

说明

orgId

TSLModel.ou

组织 ID,供下游订阅和告警服务使用。

modelId

TSLModel.tslModelId

模型 ID,用户自定义的模型标识符,供下游订阅和告警服务使用。

modelIdPath

n/a

用户自定义模型的完整路径。

assetId

TSLInstance.tslInstanceId

资产 ID,用于兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。

time

n/a

时间戳。

measurepoints

TSLIdentifier.identifier

测点 ID 和测点数据。

  • 不带质量位的测点:”temp”: 0.7121109803730992

  • 带质量位的测点:”tempWithQuality”: { “value”:23.4, “quality”:9 }


自定义 Topic Reader 和 Topic Writer 流任务的配置方法示例如下:

Topic Reader

_images/custom_topic_reader.png

Topic Writer

_images/custom_topic_writer.png

算子参考文档

对算子的功能、配置参数、和输出结果的详细介绍,参考 算子参考说明