EDH Kafka Consumer¶
数据源算子,从定义的 Kafka Topic 中消费数据,支持单机模式和集群模式。配置 EDH Kafka Consumer 算子时,需要区分 “常规流” 和 “高级流” 任务中的配置。
配置常规流数据处理任务时,消费的 Topic 为默认值且不可选择,即实时通道 Topic 为 MEASURE_POINT_INTERNAL_{OUID}
;离线通道 Topic 为 MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
,其中 {OUID}
为组织 ID。
配置高级流数据处理任务时,消费的 Topic 可以自由选择,默认情况下系统会为每个组织创建以下6个 Topic 供选择:
MEASURE_POINT_INTERNAL_{OUID}
MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
MEASURE_POINT_CAL_{OUID}
MEASURE_POINT_CAL_OFFLINE_{OUID}
MEASURE_POINT_ORIGIN_{OUD}
MEASURE_POINT_ORIGIN_OFFLINE_{OUID}
其中 {OUID}
为组织 ID。另外,用户也可以通过 资源管理 页面申请 流数据处理-消息队列 资源,以便在这里选择对应的 Topic。
配置详情¶
该算子的配置包括 General 和 Kafka 的详细信息,各字段的配置如下:
General¶
名称 |
是否必须 |
描述 |
---|---|---|
Name |
Yes |
算子名称 |
Description |
No |
算子描述 |
Kafka(常规流)¶
名称 |
是否必须 |
描述 |
---|---|---|
Kafka Configuration |
No |
Kafka 高级参数设置,默认会填充 |
配置如下图所示:
Kafka(高级流)¶
名称 |
是否必须 |
描述 |
---|---|---|
Topic |
Yes |
消费的 Kafka Topic。 |
Kafka Configuration |
No |
Kafka 高级参数设置,默认会填充 |
配置如下图所示:
数据格式说明¶
系统默认创建的 Topic 有严格的数据格式,分别为:
MEASURE_POINT_ORIGIN_{OUID}
数据格式¶
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": [
{
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276269,
"assetId":"zabPDuHq1"
},
{
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276270,
"assetId":"zabPDuHq1"
}
]
}
或:
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": {
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276269,
"assetId":"zabPDuHq"
}
}
MEASURE_POINT_CAL_{OUID}
数据格式¶
//无quality的数据格式
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": {
"measurepoints": {
"tempWithoutQuality":23.4
},
"time":1542609276270,
"assetId":"zabPDuHq"
},
"dq": {
"measurepoints": {
"tempWithoutQuality":1
},
"time":1542609276270,
"assetId":"zabPDuHq"
}
}
或:
//有quality的数据格式
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload":
{
"measurepoints": {
"tempWithQuality": {
"value":23.4,
"quality”:0
}
},
"time":1542609276270,
"assetId":"zabPDuHq"
},
"dq": {
"measurepoints": {
"tempWithQuality": 1
},
"time":1542609276270,
"assetId":"zabPDuHq"
}
}
参数说明¶
字段 |
对应设备模型字段 |
说明 |
---|---|---|
orgId |
TSLModel.ou |
组织 ID,主要被下游订阅和告警使用。 |
modelId |
TSLModel.tslModelId |
模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。 |
modelIdPath |
n/a |
用户自定义模型的完整路径。 |
assetId |
TSLInstance.tslInstanceId |
资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。 |
time |
n/a |
时间戳 |
measurepoints |
TSLIdentifier.identifier |
测点和值
|
MEASURE_POINT_INTERNAL_{OUID}
数据格式¶
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"assetId":"zabPDuHq",
"pointId":"inverter",
"time":1542609276270,
"value":23.4,
"quality":0,
"dq":0,
"attr": {}
}
参数说明¶
字段 |
CommDataObject 对应字段 |
说明 |
---|---|---|
orgId |
orgId |
组织 ID,主要被下游订阅和告警使用。 |
modelId |
modelId |
模型 ID,用户自定义的模型标识符,主要被下游订阅和告警使用。 |
modelIdPath |
modelIdPath |
用户自定义模型的完整路径。 |
assetId |
assetId |
资产 ID,为了兼容现有数据格式(实时流数据输入格式,和用户通过 EnOS API 接口访问)。 |
pointId |
measurepoints.${key} |
测点唯一标识符。 |
time |
time |
时间戳 |
value |
measurepoints.${pointId} 或 measurepoints.${pointId}.value |
测点值
|
quality |
null 或 measurepoints.${pointId}.quality |
测点质量位
|
dq |
dq.measurepoints.${pointId} |
非必填字段 |
attr |
n/a |
cal 中需要的额外信息,暂时不对上下游暴露。 |