Export Flow¶
导出任务流到本地。
前提条件¶
用户必须属于目标任务流所属的OU。
请求格式¶
GET https://{apigw-address}/dataflow-batch-service/v2.0/flows?action=export
请求参数(URI)¶
名称 |
位置(Path/Query) |
必需/可选 |
数据类型 |
描述 |
---|---|---|---|---|
flowId |
Query |
必需 |
Integer |
任务流ID。 |
userId |
Query |
必需 |
String |
用户ID。如何获取userId信息>> |
orgId |
Query |
必需 |
String |
用户所属的组织ID。如何获取orgId信息>> |
响应参数¶
名称 |
数据类型 |
描述 |
---|---|---|
data |
List<JSONObject> |
包含任务流的详细信息。详见 Flow结构体 |
Flow结构体¶
示例¶
{
"name": "workflow1",
"cycle": "D",
"cron": "0 0 0 * * ? *",
"parameters": "[]",
"alertMode": 3,
"alertTo": "",
"appId": "",
"submitter": "yourSubmitter",
"owners": "yourOwners",
"visitors": "yourVisitors",
"type": 1,
"syncType": 1,
"desc": "",
"startTime": "2019-07-25",
"tasks": [
{
"name": "tass",
"resource": "default",
"type": "COMMAND",
"runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
"syncType": 1,
"cmd": "echo "hello"",
"submitter": "yourSubmitter",
"filePackage": "",
"cron": "",
"priorityLevel": 0,
"timeout": 300,
"retryLimit": 3,
"retryInterval": 0,
"successCode": "0",
"waitCode": "",
"asLink": false
}
],
"flowLinks": [],
"taskLinks": [],
"relations": [],
"linkRelations": []
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
name |
String |
任务流名称。 |
cycle |
String |
调度周期(M:月;W:周;D:天;H:小时;mi:分钟)。 |
cron |
String |
任务流调度周期。调度中使用的是七位的Crontab,概括而言,Crontab可以指定某个事件在其指定的时间点被触发,比如: |
parameters |
List<Map<key,value>> |
调度参数。作为统一配置的全局参数,可以在节点内使用这些参数,以使任务运行时能动态适配环境变化(参数需要以 |
alertMode |
Integer |
告警模式(0:无, 1:仅邮件告警, 2:仅短信告警, 3:邮件与短信告警)。 |
alertTo |
String |
告警对象。 |
appId |
String |
N/A,通常为空字符串。 |
submitter |
String |
任务流提交账号。 |
owners |
String |
任务流所有者(负责人)的用户名(多个owner之间以 ; 分开,例如owners=“userNameA;userNameB”)。 |
visitors |
String |
可访问者的用户名(多个visitor之间以 ; 分开,例如visitors=“userNameA;userNameB”)。 |
type |
Integer |
任务流调度类型(0:手动调度任务;1:周期调度任务;2:临时任务)。 |
syncType |
Integer |
同步类型(0:文件同步;1:数据同步)。 |
desc |
String |
任务流描述信息。 |
startTime |
String |
任务流生效日期(即开始调度日期)。 |
tasks |
List<Task> |
任务节点集合,集合中每个元素表示任务流中的一个任务,详见 Task结构体 |
flowLinks |
List<FlowLink> |
任务流依赖集合。集合中每个元素表示当前任务流依赖于某个源任务流(通过 FlowLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。 |
taskLinks |
List<TaskLink> |
任务节点依赖集合。集合中每个元素表示当前任务流中某个任务依赖于某个源任务(通过 TaskLink结构体 中的 linkId 字段可在 linkRelations 集合中找到对应的连接关系,该连接关系表示源任务流与当前任务流中的哪个任务具有关联)。 |
relations |
List<Relation> |
关连线集合。Relation结构体 的集合,Relation表示两个任务间具有上下游依赖关系。 |
linkRelations |
List<LinkRelation> |
连接关系集合,详见 LinkRelation结构体 |
Task结构体¶
示例¶
{
"name": "task",
"resource": "default",
"type": "DATA_INTEGRATION",
"runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
"syncType": 1,
"cmd": "echo "hello"",
"submitter": "yourSubmitter",
"filePackage": "",
"cron": "",
"priorityLevel": 0,
"timeout": 300,
"retryLimit": 3,
"retryInterval": 0,
"successCode": "0",
"waitCode": "",
"asLink": false
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
name |
String |
任务名称。 |
resource |
String |
任务资源。 |
type |
String |
任务类型(返回值可为:DATA_INTEGRATION,WORMHOLE,CALCULATE,SHELL,CANAAN,HIVE,MR)。 |
syncType |
Integer |
同步类型(0:文件同步;1:数据同步)。 |
cmd |
String |
命令行命令。 |
submitter |
String |
任务提交者。 |
filePackage |
String |
文件位置。 |
cron |
String |
具体调度时间。 |
priorityLevel |
Integer |
优先级别。 |
timeout |
Integer |
超时时间。 |
retryLimit |
Integer |
重试次数。 |
retryInterval |
Integer |
重试时间间隔。 |
successCode |
String |
成功返回值。 |
waitCode |
String |
N/A,通常为空字符串。 |
asLink |
Boolean |
是否与其他任务具有依赖关系。 |
runMode |
String |
任务运行模式,详见 RunMode结构体 |
RunMode结构体¶
示例¶
{
"taskMode": 1,
"cpu": 0.5,
"memory": 1,
"maxParallel": 0,
"keyType": 0,
"datasourceId": 0,
"path": "",
"content": ""
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
taskMode |
Integer |
任务运行模式(1:单任务;2:多任务)。 |
cpu |
Float |
每个任务(单任务就是该任务本身,多任务是每个子任务)运行时需要的 CPU(单位:core,最小0.1,最大2)。 |
memory |
Float |
每个任务运行时需要的 Memory(单位:G,最小0.3,最大4)。 |
maxParallel |
Integer |
多任务模式下,允许同时并发执行的最大子任务数。 |
keyType |
Integer |
多任务模式下,分布键的来源(1:外部文件;2:自定义,通过content字段设置)。 |
datasourceId |
Integer |
分布键来源为外部文件时,连接外部文件所在数据源的数据源ID(通过 数据源注册 服务注册并获取ID)。 |
path |
String |
分布键来源为外部文件时,分布键文件在外部数据源中的路径。 |
content |
String |
分布键来源为自定义时,分布键的内容。 |
FlowLink结构体¶
示例¶
{
"linkId": "0",
"sourceFlowName": "jin"
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
linkId |
String |
连接ID。 |
sourceFlowName |
String |
上游任务流名称。 |
TaskLink结构体¶
示例¶
{
"linkId": "0",
"sourceFlowName": "dw",
"sourceTaskName": "tass"
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
linkId |
String |
连接ID。 |
sourceFlowName |
String |
上游任务流名称。 |
sourceTaskName |
String |
上游任务名称,结合 sourceFlowName 可唯一定位上游任务。 |
Relation结构体¶
示例¶
{
"sourceTaskName": "tass",
"targetTaskName": "rf",
"rerun": true
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
sourceTaskName |
String |
上游任务名称。 |
targetTaskName |
String |
下游任务名称。 |
rerun |
Boolean |
true和false仅在任务级联重跑时生效。true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。 |
LinkRelation结构体¶
示例¶
{
"linkId": "0",
"targetTaskName": "tass",
"rerun": false
}
参数¶
名称 |
数据类型 |
描述 |
---|---|---|
linkId |
String |
连接ID。 |
targetTaskName |
String |
下游任务名称。 |
rerun |
Boolean |
true表示重跑时,下游节点会被执行;false表示重跑时,下游节点不会被执行。 |
错误码¶
代码 |
错误信息 |
描述 |
---|---|---|
62102 |
Flow validation exception |
请求参数格式不正确 |
62109 |
Server internal exception |
服务器内部异常 |
有关其他错误码的描述,参见 通用错误码。
示例¶
请求示例¶
url: https://{apigw-address}/dataflow-batch-service/v2.0/flows?action=export&flowId={}&userId={}&orgId={}
method: GET
返回示例¶
{
"status": 0,
"msg": " Success",
"data": {
"name": "clone",
"cycle": "D",
"cron": "0 0 0 * * ? *",
"parameters": "[{\"key\":\"REPLACE\",\"value\":\"lili1\"}]",
"submitter": "yourSubmitter",
"owners": "yourOwners",
"visitors": "yourVisitors",
"type": 1,
"desc": "",
"tasks": [
{
"name": "tass",
"resource": "default",
"type": "DATA_INTEGRATION",
"cmd": "echo `whoami`",
"submitter": "yourSubmitter",
"filePackage": "",
"cron": "",
"priorityLevel": 0,
"timeout": 300,
"retryLimit": 3,
"retryInterval": 0,
"successCode": "0",
"waitCode": "",
"asLink": true,
"runMode": "{\"taskMode\":1,\"cpu\":0.5,\"memory\":1,\"maxParallel\":0,\"keyType\":0,\"datasourceId\":0,\"path\":\"\",\"content\":\"\"}",
"syncType": 1
}
],
"relations": [],
"startTime": "2019-11-22",
"flowLinks": [],
"syncType": 1,
"linkRelations": [],
"alertTo": "",
"alertMode": 3,
"taskLinks": [],
"appId": ""
}
}
Java SDK调用示例¶
import com.alibaba.fastjson.JSONObject;
import com.envision.apim.poseidon.config.PConfig;
import com.envision.apim.poseidon.core.Poseidon;
import com.envision.apim.poseidon.request.PoseidonRequest;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class SampleCode{
public static class Request extends PoseidonRequest {
public void setQueryParam(String key, Object value){
queryParams().put(key, value);
}
public void setHeaderParam(String key, String value){
headerParams().put(key, value);
}
public void setBodyParam(Map<String, Object> bodyPara){
bodyParams().putAll(bodyPara);
}
public void setMethod(String method) {
this.method = method;
}
private String method;
public String baseUri() {
return "";
}
public String method() {
return method;
}
}
@Test
public void exportFlowTest(){
//1.在EnOS Console的左边导航栏中点击应用注册。
//2.点击需调用API的应用,查看基本信息中的AccessKey即为accessKey、SecretKey即为secretKey
String accessKey = "AccessKey of your APP";
String secretKey = "SecretKey of your APP";
//新建一个request 然后把需要的参数传进去存在Query的map中,key是参数名字,value是参数值
Request request = new Request();
request.setMethod("GET");
try {
JSONObject response = Poseidon.config(PConfig.init().appKey(accessKey).appSecret(secretKey).debug())
.url("https://{apigw-address}/dataflow-batch-service/v2.0/flows")
.queryParam("orgId", "yourOrgId")
.queryParam("userId", "yourUserId")
.queryParam("flowId", "36")
.queryParam("action", "export")
.getResponse(request, JSONObject.class);
System.out.println(response);
} catch (Exception e) {
e.printStackTrace();
}
}
}