Save Pipeline

Save the configuration of a specific stream processing pipeline.

Prerequisites

A stream processing pipeline is created with the Stream Processing service, and the pipeline ID is available.

Request Format

POST https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=save

Request Parameters (URI)

Name

Location (Path/Query)

Mandatory/Optional

Data Type

Description

pipelineId

Path

Mandatory

String

The stream processing pipeline ID, which can be found on the EnOS Management Console > Stream Processing > Stream Operation page. When null is specified, use this API to create a stream processing pipeline.

orgId

Query

Mandatory

String

The organization ID. How to get the orgId>>

Request Parameters (Body)

Name

Mandatory/Optional

Data Type

Description

version

Optional

String

Template version that is used by the stream processing pipeline.

name

Optional

String

Stream processing pipeline name.

templateType

Optional

Integer

Type of the template that is used by the stream processing pipeline. Possible values are 1: Origin Template; 0: Time Window Aggregation Template; 2: Multi-Merging Template; 3: Electric Energy Cal (by Metering Reading) Template; 4: Electric Energy Cal (by Average Power) Template; 5: Electric Energy Cal (by Instant Power) Template.

templateName

Optional

String

Template that is used by the stream processing pipeline.

messageChannel

Optional

Integer

Message channel that is used by the stream processing pipeline (0: real-time channel; 1: offline channel).

pipelineJson

Optional

JSONObject

Configuration of the stream processing pipeline in JSON format.

Response Parameters

Name

Data Type

Description

data

String

Returns the pipeline ID upon success.

Error Code

Code

Error Information

Description

61100

The Stream configuration JSON is invalid.

The pipeline configuration JSON is not valid. Please check the syntax.

61108

stream processing pipeline does not exit.

Stream processing pipeline does not exist. Please check the pipeline ID.

61112

Failed to save the stream configuration.

Failed to save the stream processing pipeline configuration.

Sample

Request Sample

url: https://{apigw-address}/streaming/v2.0/streaming/pipeline/{pipelineId}?action=save

method: POST

requestBody:
{
        "templateType": 3,
        "pipelineName": "stream_power_calculation",
        "templateName": "Electric Energy Cal by Instant Power",
        "version": "0.1.0",
        "messageChannel": 0,
        "desc": "",
        "pipelineJson": {
            "piDetail": true,
            "points": [{
                "minValue": "0",
                "outputPointId": "SPIC_Solar_Inverter::INV.AccumulatingPower_day",
                "exceptionPolicy": "1",
                "maxValue": "",
                "minValueInclude": true,
                "piDetailWindowSize": "FIVE_MINUTE",
                "maxValueInclude": false,
                "inputPointId": "SPIC_Solar_Inverter::INV.GenActivePW",
                "detailOutputPointId": "SPIC_Solar_Inverter::INV.AccumulatingPower_5min"
            }]
        }
}

Return Sample

{
  "code": 0,
  "msg": "OK",
  "data": "d790d27f-e977-403c-b4c8-e111443c563f"
}

Java SDK Sample

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Before;
import org.junit.Test;

public class Sample {
    private static final String API_Gateway_URL = "https://{domain_url}";
    private Poseidon poseidon;

    private static class Request extends PoseidonRequest {

        public void setBodyParams(String key, Object value) {
            bodyParams().put(key, value);
        }

        public void setMethod(String method) {
            this.method = method;
        }

        private String method;

        @Override
        public String baseUri() {
            return "";
        }

        @Override
        public String method() {
            return method;
        }
    }

    @Before
    public void init() {
        poseidon = Poseidon.config(
                PConfig.init()
                        .appKey("AccessKey of your APP")
                        .appSecret("SecretKey of your APP")
        ).method("POST");
    }

    @Test
    public void SavePipeline() {
        Request request = new Request();
        request.setBodyParams("version", "0.1.0");
        request.setBodyParams("name", "Electric Power Calculation");
        request.setBodyParams("desc", "Sample pipeline");
        request.setBodyParams("templateType", 3);
        request.setBodyParams("templateName", "Electric Energy Cal by Instant Power");
        request.setBodyParams("messageChannel", "0");
        request.setBodyParams("piDetail", false);
        request.setBodyParams("points", JSONObject.parseArray("[\n" +
                "\t\t{\n" +
                "\t\t\t\"inputPointId\":\"SPIC_Solar_Inverter::INV.GenActivePW\",\n" +
                "\t\t\t\"outputPointId\":\"SPIC_Solar_Inverter::INV.AccumulatingPower_day\",\n" +
                "\t\t\t\"detailOutputPointId\":\"SPIC_Solar_Inverter::INV.AccumulatingPower_5min\",\n" +
                "\t\t\t\"minValue\":\"0\",\n" +
                "\t\t\t\"maxValue\":\"100\",\n" +
                "\t\t\t\"maxValueInclude\":true,\n" +
                "\t\t\t\"minValueInclude\":true,\n" +
                "\t\t\t\"piDetailWindowSize\":\"FIVE_MINUTE\",\n" +
                "\t\t\t\"exceptionPolicy\":\"1\"\n" +
                "\t\t}\n" +
                "\t]"));

        JSONObject response = poseidon
                .url(API_Gateway_URL + "/streaming/v2.0/streaming/pipeline/{pipelineId}")
                .queryParam("orgId", "yourOrgId")
                .queryParam("action", "save")
                .getResponse(request, JSONObject.class);
        System.out.println(response);
    }
}