Export Flow

导出任务流到本地。

前提条件

用户必须属于目标任务流所属的OU。

请求格式

GET https://{apigw-address}/batch-processing-service/v2.1/flows

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

flowId

Query

必需

Integer

任务流ID。

userId

Query

必需

String

用户ID。如何获取userId信息>>

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

action

Query

必需

String

固定值:export

响应参数

名称

数据类型

描述

data

List<JSONObject>

包含任务流的详细信息。详见 Flow结构体

Flow结构体

示例

{
    "name": "workflow1",
    "cycle": "D",
    "cron": "0 0 0 * * ? *",
    "parameters": "[]",
    "alertMode": 3,
    "submitter": "submitter_id",
    "owners": "owner_id",
    "visitors": "xxxx;",
    "type": 1,
    "syncType": 1,
    "desc": "",
    "startTime": "2019-07-25",
    "tasks": [
      {
        "name": "tass",
        "resource": "default",
        "type": "DATA_INTEGRATION",
    "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
        "syncType": 1,
        "cmd": "echo "hello"",
        "submitter": "",
        "filePackage": "",
        "cron": "",
        "priorityLevel": 0,
        "timeout": 300,
        "retryLimit": 3,
        "retryInterval": 0,
        "successCode": "0",
        "asLink": false
      }
    ],
    "flowLinks": [],
    "taskLinks": [],
    "relations": [],
    "linkRelations": []
}

参数

名称

数据类型

描述

name

String

任务流名称。

cycle

String

调度周期(M:月;W:周;D:天;H:小时;mi:分钟)。

cron

String

任务流调度周期。调度中使用的是七位的Crontab,概括而言,Crontab可以指定某个事件在其指定的时间点被触发,比如:c1 (0 1 * * * ? *) 定义了事件在每个小时的1分0秒触发,c2 (59 59 23 * * ? *) 定义了事件在每天的23时59分59秒触发。有关Crontab的更多配置,详见 http://cron.qqe2.com/

parameters

List<Map<key,value>>

调度参数。作为统一配置的全局参数,可以在节点内使用这些参数,以使任务运行时能动态适配环境变化(参数需要以 key-value 的格式表达,例如:[{"key":"env","value":"product"},{"key":"task_id","value":"123456"}] )。

alertMode

Integer

告警模式(0:无, 1:仅邮件告警, 2:仅短信告警, 3:邮件与短信告警)。

submitter

String

任务流提交账号(所属组织对应的大数据账号)。

owners

String

任务流所有者的用户名(多个owner之间以 ; 分开,例如owners=“userNameA;userNameB”)。

visitors

String

可访问者的用户名(多个visitor之间以 ; 分开,例如visitors=“userNameA;userNameB”)。

type

Integer

任务流调度类型(0:手动调度任务;1:周期调度任务;2:临时任务)。

syncType

Integer

同步类型(0:文件同步;1:数据同步)。

desc

String

任务流描述信息。

startTime

String

任务流生效日期(即开始调度日期)。

tasks

List<Task>

任务节点集合,集合中每个元素表示任务流中的一个任务,详见 Task结构体

flowLinks

List<FlowLink>

任务流依赖集合。集合中每个元素表示当前任务流依赖于某个源任务流(通过 FlowLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。

taskLinks

List<TaskLink>

任务节点依赖集合。集合中每个元素表示当前任务流中某个任务依赖于某个源任务(通过 TaskLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。

relations

List<Relation>

关连线集合。Relation结构体 的集合,Relation表示两个任务间具有上下游依赖关系。

linkRelations

List<LinkRelation>

连接关系集合,详见 LinkRelation结构体

Task结构体

示例

{
    "name": "task",
    "resource": "default",
    "type": "SHELL",
    "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
    "syncType": 1,
    "cmd": "echo "hello"",
    "submitter": "yourSubmitter",
    "filePackage": "",
    "cron": "",
    "priorityLevel": 0,
    "timeout": 300,
    "retryLimit": 3,
    "retryInterval": 0,
    "successCode": "0",
    "asLink": false
 }

参数

名称

数据类型

描述

name

String

任务名称。

resource

String

任务资源。

type

String

任务类型(3:SHELL;7:数据集成;8:PYTHON;9:外部应用)

syncType

Integer

同步类型(0:文件同步;1:数据同步)。

cmd

String

命令行命令。

submitter

String

任务提交者。

filePackage

String

文件位置。

cron

String

具体调度时间。

priorityLevel

Integer

优先级别。

timeout

Integer

超时时间。

retryLimit

Integer

重试次数。

retryInterval

Integer

重试时间间隔。

successCode

String

成功返回值。

asLink

Boolean

是否与其他任务具有依赖关系。

runMode

String

任务运行模式,详见 RunMode结构体

RunMode结构体

示例

{
    "taskMode": 1,
    "cpu": 0.5,
    "memory": 1,
    "maxParallel": 0,
    "keyType": 0,
    "datasourceId": 0,
    "path": "",
    "content": ""
}

参数

名称

数据类型

描述

taskMode

Integer

任务运行模式(1:单任务;2:多任务)。

cpu

Float

每个任务(单任务就是该任务本身,多任务是每个子任务)运行时需要的 CPU(单位:core,最小0.1,最大2)。

memory

Float

每个任务运行时需要的 Memory(单位:G,最小0.3,最大4)。

maxParallel

Integer

多任务模式下,允许同时并发执行的最大子任务数。

keyType

Integer

多任务模式下,分布键的来源(1:外部文件;2:自定义,通过content字段设置)。

datasourceId

Integer

分布键来源为外部文件时,连接外部文件所在数据源的数据源ID(通过 数据源注册 服务注册并获取ID)。

path

String

分布键来源为外部文件时,分布键文件在外部数据源中的路径。

content

String

分布键来源为自定义时,分布键的内容。

Relation结构体

示例

{
    "sourceTaskName": "tass",
    "targetTaskName": "rf",
    "rerun": true
}

参数

名称

数据类型

描述

sourceTaskName

String

上游任务名称。

targetTaskName

String

下游任务名称。

rerun

Boolean

true和false仅在任务级联重跑时生效。true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。

LinkRelation结构体

示例

{
    "linkId": "0",
    "targetTaskName": "tass",
    "rerun": false
}

参数

名称

数据类型

描述

linkId

String

连接ID。

targetTaskName

String

下游任务名称。

rerun

Boolean

true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。

错误码

代码

错误信息

描述

62102

Flow validation exception

请求参数格式不正确

62109

Server internal exception

服务器内部异常

有关其他错误码的描述,参见 通用错误码

示例

请求示例

url: https://{apigw-address}/batch-processing-service/v2.1/flows?action=export&flowId={}&userId={}&orgId={}

method: GET

返回示例

{
  "code": 0,
  "msg": "OK",
  "data": {
    "name": "nFlow",
    "cycle": "D",
    "cron": "0 0 0 * * ? *",
    "parameters": "[{\"key\":\"REPLACE\",\"value\":\"lili1\"}]",
    "alertMode": 3,
    "submitter": "yourSubmitter",
    "owners": "yourOwners",
    "visitors": "yourVisitors",
    "type": 1,
    "syncType": 1,
    "desc": "ga",
    "startTime": "2019-07-24",
    "tasks": [
      {
        "name": "tass",
        "resource": "default",
        "type": "DATA_INTEGRATION",
        "syncType": 1,
        "cmd": "echo `whoami`",
        "submitter": "",
        "filePackage": "",
        "cron": "",
        "priorityLevel": 0,
        "timeout": 300,
        "retryLimit": 3,
        "retryInterval": 0,
        "successCode": "0",
        "asLink": false,
        "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}"
      }
    ],
    "flowLinks": [],
    "taskLinks": [],
    "relations": [],
    "linkRelations": []
  }
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class SampleCode{
    public static class Request extends PoseidonRequest {
        public void setQueryParam(String key, Object value){
            queryParams().put(key, value);
        }
        public void setHeaderParam(String key, String value){
            headerParams().put(key, value);
        }

        public void setBodyParam(Map<String, Object> bodyPara){
            bodyParams().putAll(bodyPara);
        }
        public void setMethod(String method) {
            this.method = method;
        }
        private String method;
        public String baseUri() {
            return "";
        }
        public String method() {
            return method;
        }
    }

    @Test
    public void exportFlowTest(){
        //1.在EnOS Console的左边导航栏中点击应用注册。
        //2.点击需调用API的应用,查看基本信息中的AccessKey即为accessKey、SecretKey即为secretKey
        String accessKey = "AccessKey of your APP";
        String secretKey = "SecretKey of your APP";

        //新建一个request 然后把需要的参数传进去存在Query的map中,key是参数名字,value是参数值
        Request request = new Request();
        request.setMethod("GET");

        try {
            JSONObject response = Poseidon.config(PConfig.init().appKey(accessKey).appSecret(secretKey).debug())
                    .url("https://{apigw-address}/batch-processing-service/v2.1/flows")
                    .queryParam("orgId", "yourOrgId")
                    .queryParam("userId", "yourUserId")
                    .queryParam("flowId", "36")
                    .queryParam("action", "export")
                    .getResponse(request, JSONObject.class);

            System.out.println(response);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}