EDH Kafka Consumer¶
A data source stage, which consumes data from specified Kafka Topic. It supports both standalone and cluster pipelines. When configuring the EDH Kafka Consumer, the configuration is different for general pipelines and advanced pipelines.
When configuring the EDH Kafka Consumer for general pipelines, the consumed Topic is specified by default and cannot be changed. That is, MEASURE_POINT_INTERNAL_{OUID}
Topic for real-time channel, and MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
Topic for offline channel. In which, {OUID}
is the organization ID.
When configuring the EDH Kafka Consumer for advanced pipelines, the consumed Topic can be specified based on business needs. By default, the system will create the following 6 Topics for use as needed:
MEASURE_POINT_INTERNAL_{OUID}
MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
MEASURE_POINT_CAL_{OUID}
MEASURE_POINT_CAL_OFFLINE_{OUID}
MEASURE_POINT_ORIGIN_{OUD}
MEASURE_POINT_ORIGIN_OFFLINE_{OUID}
In which, {OUID}
is the organization ID. You can also request for the Stream Processing - Message Queue resource on the Resource Management page, which can be selected as the consumed Topic.
Configuration¶
The configuration tabs for this stage are General and Kafka.
General¶
Name |
Required? |
Description |
---|---|---|
Name |
Yes |
The name of the stage. |
Description |
No |
The description of the stage. |
Kafka (General)¶
Name |
Required? |
Description |
---|---|---|
Kafka Configuration |
No |
Configure advanced Kafka parameters. The parameter |
Sample configuration is as follows:
Kafka (Advanced)¶
Name |
Required? |
Description |
---|---|---|
Topic |
Yes |
Consumed Kafka Topic. |
Kafka Configuration |
No |
Configure advanced Kafka parameters. The parameters |
Sample configuration is as follows:
Data Format¶
The topics created by the system have strict data formats. Sample for each topic is as follows:
MEASURE_POINT_ORIGIN_{OUID}
Data Format¶
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": [
{
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276269,
"assetId":"zabPDuHq1"
},
{
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276270,
"assetId":"zabPDuHq1"
}
]
}
Or:
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": {
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276269,
"assetId":"zabPDuHq"
}
}
MEASURE_POINT_CAL_{OUID}
Data Format¶
//Without quality
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": {
"measurepoints": {
"tempWithoutQuality":23.4
},
"time":1542609276270,
"assetId":"zabPDuHq"
},
"dq": {
"measurepoints": {
"tempWithoutQuality":1
},
"time":1542609276270,
"assetId":"zabPDuHq"
}
}
Or:
//With quality
{
"orgId": "1b47ed98d1800000",
"modelId": "inverter",
"modelIdPath": "/rootModel/inverter",
"payload": {
"measurepoints": {
"tempWithQuality": {
"value": 23.4,
"quality": 0
}
},
"time": 1542609276270,
"assetId": "zabPDuHq"
},
"dq": {
"measurepoints": {
"tempWithQuality": 1
},
"time": 1542609276270,
"assetId": "zabPDuHq"
}
}
Parameters¶
Field |
Corresponding Device Model Field |
Description |
---|---|---|
orgId |
TSLModel.ou |
Organization ID, to be used by downstream subscription and alert services. |
modelId |
TSLModel.tslModelId |
Model ID (identifier of user defined model), to be used by downstream subscription and alert services. |
modelIdPath |
TSLModel.tslModelIdPath |
Complete path of user defined model. |
assetId |
TSLInstance.tslInstanceId |
Asset ID, to be used for complying with existing data formats (Real-time stream data format). |
time |
n/a |
Timestamp. |
measurepoints |
TSLIdentifier.identifier |
Measurement point and value.
|
MEASURE_POINT_INTERNAL_{OUID}
Data Format¶
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"assetId":"zabPDuHq",
"pointId":"inverter",
"time":1542609276270,
"value":23.4,
"quality":0,
"dq":0,
"attr": {}
}
Parameters¶
Field |
Corresponding CommDataObject Field |
Description |
---|---|---|
orgId |
orgId |
Organization ID, to be used by downstream subscription and alert services. |
modelId |
modelId |
Model ID (identifier of user defined model), to be used by downstream subscription and alert services. |
modelIdPath |
modelIdPath |
Complete path of user defined model. |
assetId |
assetId |
Asset ID, to be used for complying with existing data formats (Real-time stream data format). |
pointId |
measurepoints.key |
Measurement point ID. |
time |
time |
Timestamp. |
value |
measurepoints.pointId or measurepoints.pointId.value |
Measurement point value:
|
quality |
null or measurepoints.pointId.quality |
Measurement point data quality:
|
dq |
dq.measurepoints.pointId |
Not required. |
attr |
n/a |
Extra information that is needed in cal (currently not exposed to upstream or downstream services). |