EDH Kafka Consumer


数据源算子,从定义的 Kafka Topic 中消费数据,支持单机模式和集群模式。配置 EDH Kafka Consumer 算子时,需要区分 “常规流” 和 “高级流” 任务中的配置。


配置常规流数据处理任务时,消费的 Topic 为默认值且不可选择,即实时通道 Topic 为 MEASURE_POINT_INTERNAL_{OUID};离线通道 Topic 为 MEASURE_POINT_INTERNAL_OFFLINE_{OUID},其中 {OUID} 为组织 ID。


配置高级流数据处理任务时,消费的 Topic 可以自由选择,默认情况下系统会为每个组织创建以下6个 Topic 供选择:

  • MEASURE_POINT_INTERNAL_{OUID}

  • MEASURE_POINT_INTERNAL_OFFLINE_{OUID}

  • MEASURE_POINT_CAL_{OUID}

  • MEASURE_POINT_CAL_OFFLINE_{OUID}

  • MEASURE_POINT_ORIGIN_{OUD}

  • MEASURE_POINT_ORIGIN_OFFLINE_{OUID}

其中 {OUID} 为组织 ID。另外,用户也可以通过 资源管理 页面申请 流数据处理-消息队列 资源,以便在这里选择对应的 Topic。

配置详情

该算子的配置包括 GeneralKafka 的详细信息,各字段的配置如下:

General

名称

是否必须

描述

Name

Yes

算子名称

Description

No

算子描述

Kafka(常规流)

名称

是否必须

描述

Kafka Configuration

No

Kafka 高级参数设置,默认会填充 auto.offset.reset 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_consumer_config_1.png

Kafka(高级流)

名称

是否必须

描述

Topic

Yes

消费的 Kafka Topic。

Kafka Configuration

No

Kafka 高级参数设置,默认会填充 auto.offset.resetdefault.api.timeout.ms 参数,可按业务需要调整。详细信息,可参考 Kafka 官网


配置如下图所示:

../../../_images/kafka_consumer_config_2.png

数据格式说明

系统默认创建的 Topic 有严格的数据格式,分别为:

MEASURE_POINT_ORIGIN_{OUID} 数据格式

{
  "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
    "payload": [
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276269,
            "assetId":"zabPDuHq1"
        },
         {
            "measurepoints": {
                "temp":0.7121109803730992,
                "tempWithQuality": {
                    "value":23.4,
                    "quality":9
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq1"
        }
    ]
}

或:

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
   "payload": {
       "measurepoints": {
           "temp":0.7121109803730992,
           "tempWithQuality": {
               "value":23.4,
               "quality":9
           }
       },
       "time":1542609276269,
       "assetId":"zabPDuHq"
   }
}

MEASURE_POINT_CAL_{OUID} 数据格式

//无quality的数据格式
 {
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
    "modelIdPath":"/rootModel/inverter",
    "payload": {
        "measurepoints": {
            "tempWithoutQuality":23.4
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    },
    "dq": {
        "measurepoints": {
            "tempWithoutQuality":1
        },
        "time":1542609276270,
        "assetId":"zabPDuHq"
    }
}

或:

//有quality的数据格式
{
    "orgId":"1b47ed98d1800000",
    "modelId":"inverter",
     "modelIdPath":"/rootModel/inverter",
    "payload":
     {
            "measurepoints": {
                "tempWithQuality": {
                    "value":23.4,
                    "quality”:0
                }
            },
            "time":1542609276270,
            "assetId":"zabPDuHq"
        },
   "dq": {
            "measurepoints": {
                "tempWithQuality": 1
            },
            "time":1542609276270,
            "assetId":"zabPDuHq"
        }
}

参数说明

字段

对应设备模型字段

说明

orgId

TSLModel.ou

组织 ID,主要被下游订阅和告警使用。

modelId

TSLModel.tslModelId

模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。

modelIdPath

n/a

用户自定义模型的完整路径。

assetId

TSLInstance.tslInstanceId

资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。

time

n/a

时间戳

measurepoints

TSLIdentifier.identifier

测点和值

  • 不带质量位的点:"temp":0.7121109803730992

  • 带质量位的点:"tempWithQuality": { "value":23.4, "quality":9 }

MEASURE_POINT_INTERNAL_{OUID} 数据格式

{
   "orgId":"1b47ed98d1800000",
   "modelId":"inverter",
   "modelIdPath":"/rootModel/inverter",
   "assetId":"zabPDuHq",
   "pointId":"inverter",
   "time":1542609276270,
   "value":23.4,
   "quality":0,
   "dq":0,
   "attr": {}
}

参数说明

字段

CommDataObject 对应字段

说明

orgId

orgId

组织 ID,主要被下游订阅和告警使用。

modelId

modelId

模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。

modelIdPath

modelIdPath

用户自定义模型的完整路径。

assetId

assetId

资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。

pointId

measurepoints.${key}

测点唯一标识符。

time

time

时间戳

value

measurepoints.${pointId} 或 measurepoints.${pointId}.value

测点值

  • 不带质量位的点:即 value 值

  • 带质量位的点:需解 map 中的 value 字段

quality

null 或 measurepoints.${pointId}.quality

测点质量位

  • 不带质量位的点:0

  • 带质量位的点:map 中的 quality 字段

dq

dq.measurepoints.${pointId}

非必填字段

attr

n/a

cal 中需要的额外信息,暂时不对上下游暴露。