Save Pipeline

更新并保存指定流数据处理任务的配置信息。

前提条件

已通过流数据处理服务创建流数据处理任务,并获取到流数据处理任务的ID。

请求格式

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=save

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

pipelineId

Path

必需

String

流数据处理任务ID,可通过 EnOS管理门口 > 流数据处理 > 流运维 页面查看,或通过调用 List Pipelines 接口获取。如指定值为 null,则为新建流数据处理任务。

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

请求参数(Body)

名称

必需/可选

数据类型

描述

version

可选

String

流数据处理任务使用的模板版本。

name

可选

String

流数据处理任务名称。

templateType

可选

Integer

流数据处理任务使用的模板类型。1:原生模板;0:时间窗口聚合模板;2:多路归并模板;3:电量计算模板(按表读数);4:电量计算模板(按平均功率);5:电量计算模板(按瞬时功率)。

templateName

可选

String

流数据处理任务使用的模板名称。

messageChannel

可选

Integer

流数据处理任务的消息通道模式。0:实时通道;1:离线通道。

pipelineJson

可选

JSONObject

流数据处理任务配置JSON。

响应参数

名称

数据类型

描述

data

String

运行成功返回保存的流数据处理任务ID。

错误码

代码

错误信息

描述

61100

The Stream configuration JSON is invalid.

流数据处理任务配置JSON不正确。

61108

Stream processing job does not exit.

流数据处理任务不存在。

61112

Failed to save the stream configuration.

保存流数据处理任务失败。

示例

请求示例

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=save

method: POST

requestBody:
{
        "templateType": 3,
        "pipelineName": "stream_power_calculation",
        "templateName": "Electric Energy Cal by Instant Power",
        "version": "0.1.0",
        "messageChannel": 0,
        "desc": "",
        "pipelineJson": {
            "piDetail": true,
            "points": [{
                "minValue": "0",
                "outputPointId": "SPIC_Solar_Inverter::INV.AccumulatingPower_day",
                "exceptionPolicy": "1",
                "maxValue": "",
                "minValueInclude": true,
                "piDetailWindowSize": "FIVE_MINUTE",
                "maxValueInclude": false,
                "inputPointId": "SPIC_Solar_Inverter::INV.GenActivePW",
                "detailOutputPointId": "SPIC_Solar_Inverter::INV.AccumulatingPower_5min"
            }]
        }
}

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": "d790d27f-e977-403c-b4c8-e111443c563f"
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST");
    }

    @Test
    public void SavePipeline() {
        Request request = new Request();
        request.setBodyParams("version", "0.1.0");
        request.setBodyParams("name", "Electric Power Calculation");
        request.setBodyParams("desc", "Sample pipeline");
        request.setBodyParams("templateType", 3);
        request.setBodyParams("templateName", "Electric Energy Cal by Instant Power");
        request.setBodyParams("messageChannel", "0");
        request.setBodyParams("piDetail", false);
        request.setBodyParams("points", JSONObject.parseArray("[\n" +
                "\t\t{\n" +
                "\t\t\t\"inputPointId\":\"SPIC_Solar_Inverter::INV.GenActivePW\",\n" +
                "\t\t\t\"outputPointId\":\"SPIC_Solar_Inverter::INV.AccumulatingPower_day\",\n" +
                "\t\t\t\"detailOutputPointId\":\"SPIC_Solar_Inverter::INV.AccumulatingPower_5min\",\n" +
                "\t\t\t\"minValue\":\"0\",\n" +
                "\t\t\t\"maxValue\":\"100\",\n" +
                "\t\t\t\"maxValueInclude\":true,\n" +
                "\t\t\t\"minValueInclude\":true,\n" +
                "\t\t\t\"piDetailWindowSize\":\"FIVE_MINUTE\",\n" +
                "\t\t\t\"exceptionPolicy\":\"1\"\n" +
                "\t\t}\n" +
                "\t]"));

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}")
                .queryParam("orgId", "yourOrgId")
                .queryParam("action", "save")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}