Get Stage State

查询指定流数据处理任务中某个指定算子的中间状态数据。

前提条件

已通过流数据处理服务创建流数据处理任务。

请求格式

POST https://{apigw-address}/streaming/v2.0/stage-state?action=get

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

请求参数(Body)

名称

必需/可选

数据类型

描述

pipelineId

必需

String

流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看。

stageInstanceName

必需

String

算子实例名称,可通过 流运维 页面,点击流数据处理任务名称,选择算子后,在 Info 页面查看。

assetIds

必需

String

资产ID,支持查询多个资产,多个资产ID之间用英文逗号隔开。

pointIds

必需

String

测点ID,支持多测点查询,多个测点间用英文逗号隔开;支持查询的(资产数*测点数)上限为10万。

响应参数

名称

数据类型

描述

data

List<JSONObject>

返回算子中间状态数据列表。详见 items。 被查询的算子类型不同(根据请求参数中 stageInstanceName 前缀区分),将返回不同算子的中间状态数据列表。注意:不同类型算子的数据格式也有所区别。

items

类型 1:通用类型

属于该类型的算子:LastRecordAppenderLastChangedRecordAppenderLatestRecordMergerRecordCapturer

名称

数据类型

描述

modelId

String

模型 ID。

modelIdPath

String

模型 ID 路径。

attr

Object

包含算子输出结果的结构体。

assetId

String

资产ID。

pointId

String

测点 ID。

time

Long

测点数据时间戳,UNIX 时间,精确到秒。

value

String

测点数值。

quality

Integer

测点数据质量打标。

dq

Long

测点数据自带的质量位。如果测点无质量位,则无该字段。

类型 2:简易时间戳类型

属于该类型的算子:LatePointTagger

名称

数据类型

描述

assetId

String

资产 ID。

pointId

String

测点 ID。

time

Long

测点数据时间戳,UNIX 时间,精确到秒。

类型 3:窗口聚合类型

属于该类型的算子:FixedTimeWindowAggregatorSlidingTimeWindowAggregator

名称

数据类型

描述

assetId

String

资产 ID。

pointId

String

测点 ID。

%s::windowStrategy::%s

Object

窗口函数计算所需的状态信息,%s 代表根据资产、测点等信息确定的字符串。

%s::updated

Object

窗口函数的上下文信息,%s 代表根据资产、测点等信息确定的字符串。

%s::watermark

Long

窗口函数中当前资产的水位线,%s 代表根据资产、测点等信息确定的字符串。

类型 4:简单窗口聚合类型

属于该类型的算子:SimplifiedTimeWindowAggregator

名称

数据类型

描述

assetId

String

资产 ID。

pointId

String

测点 ID。

time

Long

测点数据时间戳,UNIX 时间,精确到秒。

windowSum

Object

窗口函数计算所需的状态信息。

错误码

代码

错误信息

描述

61102

Missing param xx.

参数格式不正确。

61105

Too many points.

请求的总测点数超过限制。

61106

Wrong pipelineId or OU has no privilege.

pipelineId不正确,或者不属于指定的组织。

61107

Not supported Stage type.

不支持的算子类型。

61199

其他错误。

示例

请求示例

url: https://{apigw-address}/streaming/v2.0/stage-state?action=get&orgId=yourOrgId

method: POST

requestBody:
{
    "pipelineId":"yourPipelineId",
    "stageInstanceName":"yourStageInstanceName",
    "assetIds":"yourAssetIds",
    "pointIds":"yourPointIds"
}

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": {
    "items": [
  {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId",
        "pointId": "pointId",
        "time": 1582648020580,
        "value": 3.1,
        "quality": 0,
        "attr": {}
    },
    {
        "modelId": "modelId",
        "modelIdPath": "/modelIdPath",
        "assetId": "assetId2",
        "pointId": "pointId2",
        "time": 1582648020580,
        "value": 3.2,
        "quality": 1,
        "attr": {}
    }
    ]
  }
}

SDK 示例


你可以在 Github 上获取流数据处理的 SDK 示例: