Get Stage State

Get the intermediate state data of a specified operator (stage) in a stream processing pipeline.

Prerequisites

Stream data processing pipelines have been created with the calculator library of the Stream Processing service.

Request Format

POST https://{apigw-address}/streaming/v2.0/stage-state?action=get

Request Parameters (URI)

Name

Location (Path/Query)

Mandatory/Optional

Data Type

Description

orgId

Query

Mandatory

String

The organization ID. How to get the orgId>>

Request Parameters (Body)

Name

Mandatory/Optional

Data Type

Description

pipelineId

Mandatory

String

The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page.

stageInstanceName

Mandatory

String

The stage instance name, which can be found on the Stream Development page. Click the stream processing pipeline name, select the target operator, and copy the stage instance name on the Info page.

assetIds

Mandatory

String

The asset ID. Supports the query of multiple asset IDs, separated by commas.

pointIds

Mandatory

String

The measurement point ID. Supports the query of multiple measurement point IDs, separated by commas. The upper limit of the number of measurement points that can be queried is 100,000.

Response Parameters

Name

Data Type

Description

data

List<JSONObject>

The list of operator intermediate state data. For more information, see items. Depending on the operator type being queried (distinguished by the stageInstanceName prefix in the request parameters), different lists of operator intermediate state data will be returned. Note: The data formats of different types of operators are also different.

items

Type 1:Generic Type

Operators of this type: LastRecordAppender, LastChangedRecordAppender, LatestRecordMerger, RecordCapturer

Name

Data Type

Description

modelId

String

The model ID.

modelIdPath

String

The model ID path.

attr

Object

The struct that contains the output results of the operator.

assetId

String

The asset ID.

pointId

String

The measurement point ID.

time

Long

The measurement point data timestamp (UNIX time, accurate to the second).

value

String

The measurement point data.

quality

Integer

The quality tagging for the measurement point data.

dq

Long

The quality indicator of the measurement point data.

Type 2:Simple Timestamp Type

Operators of this type: LatePointTagger

Name

Data Type

Description

assetId

String

The asset ID.

pointId

String

The measurement point ID.

time

Long

The measurement point data timestamp (UNIX time, accurate to the second).

Type 3:Window Aggregation Type

Operators of this type: FixedTimeWindowAggregator, SlidingTimeWindowAggregator

Name

Data Type

Description

assetId

String

The asset ID.

pointId

String

The measurement point ID.

%s::windowStrategy::%s

Object

The state information required by the window function calculation. %s represents the string determined according to information such as assets and measurement points.

%s::updated

Object

The context information for the window function. %s represents the string determined according to information such as assets and measurement points.

%s::watermark

Long

The watermark of the current asset of the window function. %s represents the string determined according to information such as assets and measurement points.

Type 4:Simple Window Aggregation Type

Operators of this type: SimplifiedTimeWindowAggregator

Name

Data Type

Description

assetId

String

The asset ID.

pointId

String

The measurement point ID.

time

Long

The measurement point data timestamp (UNIX time, accurate to the second).

windowSum

Object

The state information required by the window function calculation.

Error Code

Code

Error Information

Description

61102

Missing param xx

The parameter format or value is not valid.

61105

Too many points

The number of requested measurement points exceeds the limit.

61106

Wrong pipelineId or OU has no privilege

The provided pipeline ID is not valid, or the pipeline ID and organization ID do not match.

61107

Not supported stage type

The specified stage type is not supported.

61199

Other errors.

Samples

Request Sample

url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId

method: POST

requestBody:
{
    "pipelineId":"yourPipelineId",
    "stageInstanceName":"yourStageInstanceName",
    "assetIds":"yourAssetIds",
    "pointIds":"yourPointIds"
}

Return Sample

{
  "code": 0,
  "msg": "OK",
  "data": {
    "items": [
  {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId",
        "pointId": "pointId",
        "time": 1582648020580,
        "value": 3.1,
        "quality": 0,
        "attr": {}
    },
    {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId2",
        "pointId": "pointId2",
        "time": 1582648020580,
        "value": 3.2,
        "quality": 1,
        "attr": {}
    }
    ]
  }
}

SDK Samples


You can access the SDK samples for stream processing service on GitHub: