Export Flow

导出任务流到本地。

前提条件

用户必须属于目标任务流所属的OU。

请求格式

GET https://{apigw-address}/dataflow-batch-service/v2.0/flows?action=export

请求参数(URI)

名称

位置(Path/Query)

必需/可选

数据类型

描述

flowId

Query

必需

Integer

任务流ID。

userId

Query

必需

String

用户ID。如何获取userId信息>>

orgId

Query

必需

String

用户所属的组织ID。如何获取orgId信息>>

响应参数

名称

数据类型

描述

data

List<JSONObject>

包含任务流的详细信息。详见 Flow结构体

Flow结构体

示例

{
    "name": "workflow1",
    "cycle": "D",
    "cron": "0 0 0 * * ? *",
    "parameters": "[]",
    "alertMode": 3,
    "alertTo": "",
    "appId": "",
    "submitter": "yourSubmitter",
    "owners": "yourOwners",
    "visitors": "yourVisitors",
    "type": 1,
    "syncType": 1,
    "desc": "",
    "startTime": "2019-07-25",
    "tasks": [
      {
        "name": "tass",
        "resource": "default",
        "type": "COMMAND",
    "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
        "syncType": 1,
        "cmd": "echo "hello"",
        "submitter": "yourSubmitter",
        "filePackage": "",
        "cron": "",
        "priorityLevel": 0,
        "timeout": 300,
        "retryLimit": 3,
        "retryInterval": 0,
        "successCode": "0",
        "waitCode": "",
        "asLink": false
      }
    ],
    "flowLinks": [],
    "taskLinks": [],
    "relations": [],
    "linkRelations": []
}

参数

名称

数据类型

描述

name

String

任务流名称。

cycle

String

调度周期(M:月;W:周;D:天;H:小时;mi:分钟)。

cron

String

任务流调度周期。调度中使用的是七位的Crontab,概括而言,Crontab可以指定某个事件在其指定的时间点被触发,比如:c1 (0 1 * * * ? *) 定义了事件在每个小时的1分0秒触发,c2 (59 59 23 * * ? *) 定义了事件在每天的23时59分59秒触发。有关Crontab的更多配置,详见 http://cron.qqe2.com/

parameters

List<Map<key,value>>

调度参数。作为统一配置的全局参数,可以在节点内使用这些参数,以使任务运行时能动态适配环境变化(参数需要以 key-value 的格式表达,例如:[{"key":"env","value":"product"},{"key":"task_id","value":"123456"}] )。

alertMode

Integer

告警模式(0:无, 1:仅邮件告警, 2:仅短信告警, 3:邮件与短信告警)。

alertTo

String

告警对象。

appId

String

N/A,通常为空字符串。

submitter

String

任务流提交账号。

owners

String

任务流所有者(负责人)的用户名(多个owner之间以 ; 分开,例如owners=“userNameA;userNameB”)。

visitors

String

可访问者的用户名(多个visitor之间以 ; 分开,例如visitors=“userNameA;userNameB”)。

type

Integer

任务流调度类型(0:手动调度任务;1:周期调度任务;2:临时任务)。

syncType

Integer

同步类型(0:文件同步;1:数据同步)。

desc

String

任务流描述信息。

startTime

String

任务流生效日期(即开始调度日期)。

tasks

List<Task>

任务节点集合,集合中每个元素表示任务流中的一个任务,详见 Task结构体

flowLinks

List<FlowLink>

任务流依赖集合。集合中每个元素表示当前任务流依赖于某个源任务流(通过 FlowLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。

taskLinks

List<TaskLink>

任务节点依赖集合。集合中每个元素表示当前任务流中某个任务依赖于某个源任务(通过 TaskLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。

relations

List<Relation>

关连线集合。Relation结构体 的集合,Relation表示两个任务间具有上下游依赖关系。

linkRelations

List<LinkRelation>

连接关系集合,详见 LinkRelation结构体

Task结构体

示例

{
    "name": "task",
    "resource": "default",
    "type": "DATA_INTEGRATION",
    "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
    "syncType": 1,
    "cmd": "echo "hello"",
    "submitter": "yourSubmitter",
    "filePackage": "",
    "cron": "",
    "priorityLevel": 0,
    "timeout": 300,
    "retryLimit": 3,
    "retryInterval": 0,
    "successCode": "0",
    "waitCode": "",
    "asLink": false
 }

参数

名称

数据类型

描述

name

String

任务名称。

resource

String

任务资源。

type

String

任务类型(返回值可为:DATA_INTEGRATION,WORMHOLE,CALCULATE,SHELL,CANAAN,HIVE,MR)。

syncType

Integer

同步类型(0:文件同步;1:数据同步)。

cmd

String

命令行命令。

submitter

String

任务提交者。

filePackage

String

文件位置。

cron

String

具体调度时间。

priorityLevel

Integer

优先级别。

timeout

Integer

超时时间。

retryLimit

Integer

重试次数。

retryInterval

Integer

重试时间间隔。

successCode

String

成功返回值。

waitCode

String

N/A,通常为空字符串。

asLink

Boolean

是否与其他任务具有依赖关系。

runMode

String

任务运行模式,详见 RunMode结构体

RunMode结构体

示例

{
    "taskMode": 1,
    "cpu": 0.5,
    "memory": 1,
    "maxParallel": 0,
    "keyType": 0,
    "datasourceId": 0,
    "path": "",
    "content": ""
}

参数

名称

数据类型

描述

taskMode

Integer

任务运行模式(1:单任务;2:多任务)。

cpu

Float

每个任务(单任务就是该任务本身,多任务是每个子任务)运行时需要的 CPU(单位:core,最小0.1,最大2)。

memory

Float

每个任务运行时需要的 Memory(单位:G,最小0.3,最大4)。

maxParallel

Integer

多任务模式下,允许同时并发执行的最大子任务数。

keyType

Integer

多任务模式下,分布键的来源(1:外部文件;2:自定义,通过content字段设置)。

datasourceId

Integer

分布键来源为外部文件时,连接外部文件所在数据源的数据源ID(通过 数据源注册 服务注册并获取ID)。

path

String

分布键来源为外部文件时,分布键文件在外部数据源中的路径。

content

String

分布键来源为自定义时,分布键的内容。

Relation结构体

示例

{
    "sourceTaskName": "tass",
    "targetTaskName": "rf",
    "rerun": true
}

参数

名称

数据类型

描述

sourceTaskName

String

上游任务名称。

targetTaskName

String

下游任务名称。

rerun

Boolean

true和false仅在任务级联重跑时生效。true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。

LinkRelation结构体

示例

{
    "linkId": "0",
    "targetTaskName": "tass",
    "rerun": false
}

参数

名称

数据类型

描述

linkId

String

连接ID。

targetTaskName

String

下游任务名称。

rerun

Boolean

true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。

错误码

代码

错误信息

描述

62102

Flow validation exception

请求参数格式不正确

62109

Server internal exception

服务器内部异常

有关其他错误码的描述,参见 通用错误码

示例

请求示例

url: https://{apigw-address}/dataflow-batch-service/v2.0/flows?action=export&flowId={}&userId={}&orgId={}

method: GET

返回示例

{
    "status": 0,
    "msg": " Success",
    "data": {
        "name": "clone",
        "cycle": "D",
        "cron": "0 0 0 * * ? *",
        "parameters": "[{\"key\":\"REPLACE\",\"value\":\"lili1\"}]",
        "submitter": "yourSubmitter",
        "owners": "yourOwners",
        "visitors": "yourVisitors",
        "type": 1,
        "desc": "",
        "tasks": [
            {
                "name": "tass",
                "resource": "default",
                "type": "DATA_INTEGRATION",
                "cmd": "echo `whoami`",
                "submitter": "yourSubmitter",
                "filePackage": "",
                "cron": "",
                "priorityLevel": 0,
                "timeout": 300,
                "retryLimit": 3,
                "retryInterval": 0,
                "successCode": "0",
                "waitCode": "",
                "asLink": true,
                "runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
                "syncType": 1
            }
        ],
        "relations": [],
        "startTime": "2019-11-22",
        "flowLinks": [],
        "syncType": 1,
        "linkRelations": [],
        "alertTo": "",
        "alertMode": 3,
        "taskLinks": [],
        "appId": ""
    }
}

Java SDK调用示例

import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Test;

import java.util.HashMap;
import java.util.Map;

public class SampleCode{
    public static class Request extends PoseidonRequest {
        public void setQueryParam(String key, Object value){
            queryParams().put(key, value);
        }
        public void setHeaderParam(String key, String value){
            headerParams().put(key, value);
        }

        public void setBodyParam(Map<String, Object> bodyPara){
            bodyParams().putAll(bodyPara);
        }
        public void setMethod(String method) {
            this.method = method;
        }
        private String method;
        public String baseUri() {
            return "";
        }
        public String method() {
            return method;
        }
    }

    @Test
    public void exportFlowTest(){
        //1.在EnOS Console的左边导航栏中点击应用注册。
        //2.点击需调用API的应用,查看基本信息中的AccessKey即为accessKey、SecretKey即为secretKey
        String accessKey = "AccessKey of your APP";
        String secretKey = "SecretKey of your APP";

        //新建一个request 然后把需要的参数传进去存在Query的map中,key是参数名字,value是参数值
        Request request = new Request();
        request.setMethod("GET");

        try {
            JSONObject response = Poseidon.config(PConfig.init().appKey(accessKey).appSecret(secretKey).debug())
                    .url("https://{apigw-address}/dataflow-batch-service/v2.0/flows")
                    .queryParam("orgId", "yourOrgId")
                    .queryParam("userId", "yourUserId")
                    .queryParam("flowId", "36")
                    .queryParam("action", "export")
                    .getResponse(request, JSONObject.class);

            System.out.println(response);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}