EDH Kafka Producer¶
A destination stage, which sends data to specified Kafka Topic. It supports both standalone and cluster pipelines. When configuring the EDH Kafka Producer, the configuration is different for general pipelines and advanced pipelines.
When configuring the EDH Kafka Producer for general pipelines, the Topic to receive data is specified by default and cannot be changed. That is, MEASURE_POINT_INTERNAL_{OUID}
Topic for real-time channel, and MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
Topic for offline channel. In which, {OUID}
is the organization ID.
When configuring the EDH Kafka Producer for advanced pipelines, the Topic to receive data can be specified based on business needs. By default, the system will create the following 6 Topics for use as needed:
MEASURE_POINT_INTERNAL_{OUID}
MEASURE_POINT_INTERNAL_OFFLINE_{OUID}
MEASURE_POINT_CAL_{OUID}
MEASURE_POINT_CAL_OFFLINE_{OUID}
MEASURE_POINT_ORIGIN_{OUD}
MEASURE_POINT_ORIGIN_OFFLINE_{OUID}
In which, {OUID}
is the organization ID. You can also request for the Stream Processing - Message Queue resource on the Resource Management page, which can be selected as the Topic to receive data.
Configuration¶
The configuration tabs for this stage are General and Kafka.
General¶
Name |
Required? |
Description |
---|---|---|
Name |
Yes |
The name of the stage. |
Description |
No |
The description of the stage. |
Kafka (General)¶
Name |
Required? |
Description |
---|---|---|
Kafka Configuration |
No |
Configure advanced Kafka parameters. The parameters |
Sample configuration is as follows:
Kafka (Advanced)¶
Name |
Required? |
Description |
---|---|---|
Topic |
Yes |
Kafka Topic to receive data. |
Kafka Configuration |
No |
Configure advanced Kafka parameters. The parameters |
Sample configuration is as follows:
Data Format¶
The topics created by the system have strict data formats. Sample for each topic is as follows:
MEASURE_POINT_ORIGIN_{OUID}
Data Format¶
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": [
{
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276269,
"assetId":"zabPDuHq1"
},
{
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276270,
"assetId":"zabPDuHq1"
}
]
}
Or:
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": {
"measurepoints": {
"temp":0.7121109803730992,
"tempWithQuality": {
"value":23.4,
"quality":9
}
},
"time":1542609276269,
"assetId":"zabPDuHq"
}
}
MEASURE_POINT_CAL_{OUID}
Data Format¶
//Without quality
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"payload": {
"measurepoints": {
"tempWithoutQuality":23.4
},
"time":1542609276270,
"assetId":"zabPDuHq"
},
"dq": {
"measurepoints": {
"tempWithoutQuality":1
},
"time":1542609276270,
"assetId":"zabPDuHq"
}
}
Or:
//With quality
{
"orgId": "1b47ed98d1800000",
"modelId": "inverter",
"modelIdPath": "/rootModel/inverter",
"payload": {
"measurepoints": {
"tempWithQuality": {
"value": 23.4,
"quality": 0
}
},
"time": 1542609276270,
"assetId": "zabPDuHq"
},
"dq": {
"measurepoints": {
"tempWithQuality": 1
},
"time": 1542609276270,
"assetId": "zabPDuHq"
}
}
Parameters¶
Field |
Corresponding Device Model Field |
Description |
---|---|---|
orgId |
TSLModel.ou |
Organization ID, to be used by downstream subscription and alert services. |
modelId |
TSLModel.tslModelId |
Model ID (identifier of user defined model), to be used by downstream subscription and alert services. |
modelIdPath |
TSLModel.tslModelIdPath |
Complete path of user defined model. |
assetId |
TSLInstance.tslInstanceId |
Asset ID, to be used for complying with existing data formats (Real-time stream data format). |
time |
n/a |
Timestamp. |
measurepoints |
TSLIdentifier.identifier |
Measurement point and value.
|
MEASURE_POINT_INTERNAL_{OUID}
Data Format¶
{
"orgId":"1b47ed98d1800000",
"modelId":"inverter",
"modelIdPath":"/rootModel/inverter",
"assetId":"zabPDuHq",
"pointId":"inverter",
"time":1542609276270,
"value":23.4,
"quality":0,
"dq":0,
"attr": {}
}
Parameters¶
Field |
Corresponding CommDataObject Field |
Description |
---|---|---|
orgId |
orgId |
Organization ID, to be used by downstream subscription and alert services. |
modelId |
modelId |
Model ID (identifier of user defined model), to be used by downstream subscription and alert services. |
modelIdPath |
modelIdPath |
Complete path of user defined model. |
assetId |
assetId |
Asset ID, to be used for complying with existing data formats (Real-time stream data format). |
pointId |
measurepoints.key |
Measurement point ID. |
time |
time |
Timestamp. |
value |
measurepoints.pointId or measurepoints.pointId.value |
Measurement point value:
|
quality |
null or measurepoints.pointId.quality |
Measurement point data quality:
|
dq |
dq.measurepoints.pointId |
Not required. |
attr |
n/a |
Extra information that is needed in cal (currently not exposed to upstream or downstream services). |