Get Stage State¶
Get the intermediate state data of a specified operator (stage) in a stream processing pipeline.
Prerequisites¶
Stream data processing pipelines have been created with the calculator library of the Stream Processing service.
Request Format¶
POST https://{apigw-address}/streaming/v2.0/stage-state?action=get
Request Parameters (URI)¶
Name |
Location (Path/Query) |
Mandatory/Optional |
Data Type |
Description |
---|---|---|---|---|
orgId |
Query |
Mandatory |
String |
The organization ID. How to get the orgId>> |
Request Parameters (Body)¶
Name |
Mandatory/Optional |
Data Type |
Description |
---|---|---|---|
pipelineId |
Mandatory |
String |
The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page. |
stageInstanceName |
Mandatory |
String |
The stage instance name, which can be found on the Stream Development page. Click the stream processing pipeline name, select the target operator, and copy the stage instance name on the Info page. |
assetIds |
Mandatory |
String |
The asset ID. Supports the query of multiple asset IDs, separated by commas. |
pointIds |
Mandatory |
String |
The measurement point ID. Supports the query of multiple measurement point IDs, separated by commas. The upper limit of the number of measurement points that can be queried is 100,000. |
Response Parameters¶
Name |
Data Type |
Description |
---|---|---|
data |
List<JSONObject> |
The list of operator intermediate state data. For more information, see items. Depending on the operator type being queried (distinguished by the stageInstanceName prefix in the request parameters), different lists of operator intermediate state data will be returned. Note: The data formats of different types of operators are also different. |
items¶
Type 1:Generic Type¶
Operators of this type: LastRecordAppender, LastChangedRecordAppender, LatestRecordMerger, RecordCapturer
Name |
Data Type |
Description |
---|---|---|
modelId |
String |
The model ID. |
modelIdPath |
String |
The model ID path. |
attr |
Object |
The struct that contains the output results of the operator. |
assetId |
String |
The asset ID. |
pointId |
String |
The measurement point ID. |
time |
Long |
The measurement point data timestamp (UNIX time, accurate to the second). |
value |
String |
The measurement point data. |
quality |
Integer |
The quality tagging for the measurement point data. |
dq |
Long |
The quality indicator of the measurement point data. |
Type 2:Simple Timestamp Type¶
Operators of this type: LatePointTagger
Name |
Data Type |
Description |
---|---|---|
assetId |
String |
The asset ID. |
pointId |
String |
The measurement point ID. |
time |
Long |
The measurement point data timestamp (UNIX time, accurate to the second). |
Type 3:Window Aggregation Type¶
Operators of this type: FixedTimeWindowAggregator, SlidingTimeWindowAggregator
Name |
Data Type |
Description |
---|---|---|
assetId |
String |
The asset ID. |
pointId |
String |
The measurement point ID. |
%s::windowStrategy::%s |
Object |
The state information required by the window function calculation. %s represents the string determined according to information such as assets and measurement points. |
%s::updated |
Object |
The context information for the window function. %s represents the string determined according to information such as assets and measurement points. |
%s::watermark |
Long |
The watermark of the current asset of the window function. %s represents the string determined according to information such as assets and measurement points. |
Type 4:Simple Window Aggregation Type¶
Operators of this type: SimplifiedTimeWindowAggregator
Name |
Data Type |
Description |
---|---|---|
assetId |
String |
The asset ID. |
pointId |
String |
The measurement point ID. |
time |
Long |
The measurement point data timestamp (UNIX time, accurate to the second). |
windowSum |
Object |
The state information required by the window function calculation. |
Error Code¶
Code |
Error Information |
Description |
---|---|---|
61102 |
Missing param xx |
The parameter format or value is not valid. |
61105 |
Too many points |
The number of requested measurement points exceeds the limit. |
61106 |
Wrong pipelineId or OU has no privilege |
The provided pipeline ID is not valid, or the pipeline ID and organization ID do not match. |
61107 |
Not supported stage type |
The specified stage type is not supported. |
61199 |
Other errors. |
Samples¶
Request Sample¶
url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId
method: POST
requestBody:
{
"pipelineId":"yourPipelineId",
"stageInstanceName":"yourStageInstanceName",
"assetIds":"yourAssetIds",
"pointIds":"yourPointIds"
}
Return Sample¶
{
"code": 0,
"msg": "OK",
"data": {
"items": [
{
"modelId": "modelId",
"modelIdPath": "/modelIdPath",
"assetId": "assetId",
"pointId": "pointId",
"time": 1582648020580,
"value": 3.1,
"quality": 0,
"attr": {}
},
{
"modelId": "modelId",
"modelIdPath": "/modelIdPath",
"assetId": "assetId2",
"pointId": "pointId2",
"time": 1582648020580,
"value": 3.2,
"quality": 1,
"attr": {}
}
]
}
}
Java SDK Sample¶
import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;
public class Sample {
private static final String API_Gateway_URL = "https://{domain_url}";
private Poseidon poseidon;
private static class Request extends PoseidonRequest {
public void setBodyParams(String key, Object value) {
bodyParams().put(key, value);
}
public void setMethod(String method) {
this.method = method;
}
private String method;
@Override
public String baseUri() {
return "";
}
@Override
public String method() {
return method;
}
}
@Before
public void init() {
poseidon = Poseidon.config(
PConfig.init()
.appKey("AccessKey of your APP")
.appSecret("SecretKey of your APP")
).method("POST");
}
@Test
public void GetStageState() {
Request request = new Request();
request.setBodyParams("assetIds", "yourAssetId");
request.setBodyParams("pointIds", "yourPointId");
request.setBodyParams("pipelineId", "yourPipelineId");
request.setBodyParams("stageInstanceName", "yourStageInstanceName");
JSONObject response = poseidon
.url(API_Gateway_URL + "/streaming/v2.0/stage-state")
.queryParam("orgId", "yourOrgId")
.queryParam("action", "get")
.getResponse(request, JSONObject.class);
System.out.println(response);
}
}