配置 SHELL 类型任务节点


Data IDE 底层支持多种引擎,包括 Hive、Spark、MapReduce 等。在创建工作流时,可以通过添加 SHELL 类型任务节点,来进行数据开发。本文介绍如何配置SHELL类型任务节点。

执行HiveSQL任务

使用 SHELL 类型任务节点,通过命令行方式,执行 HiveSQL 任务,实现 Batch 计算。

命令格式

canaanhive [arguments]

参数说明

参数

说明

示例

-f <arg>

HQL文件名

-f demo.sql

-d <arg>

时间参数

-d 2018-01-01

-E <paraN>=<valN>

HQL文件中用户自定义参数赋值

-E para=abc

-str <sql>

SQL语句

-str “show tables;”


对于自定义参数 -E,若传入 -E para=abc 时,HQL文件中的 ${env.para} 将自动被替换为abc。


对于时间参数 -d,以传入 -d 2018-01-01 为例,HQL文件中的 ${env.FORMAT} 将自动被替换,FORMAT参数说明如下:

备注

下表 FORMAT 参数中的字母 P 代表 Previous,因此 PnD/PnM/PnY 代表 前 n 天/月/年

字母 N 代表 Next,因此 NnD/NnM/NnY 代表 后 n 天/月/年


FORMAT

范围

YYYYMMDD

2018-01-01

YYYYMMDD_PnD

1<= n <=30

2017-12-31 ~ 2017-12-02

YYYYMMDD_PnM

1<= n <=12

2017-12-01 ~ 2017-01-01

YYYYMMDD_PnY

1<= n <=2

2017-01-01 ~ 2016-01-01

YYYYMMDD_NnD

1<= n <=2

2018-01-02 ~ 2018-01-03

YYYYMMDD_NnM

1<= n <=2

2018-02-01 ~ 2018-03-01

YYYYMMDD_NnY

1<= n <=2

2019-01-01 ~ 2020-01-01

YYYYMM

2018-01

YYYYMMDD_PnD

1<= n <=2

2017-12 ~ 2017-12

YYYYMMDD_PnM

1<= n <=2

2017-12 ~ 2017-11

YYYYMMDD_PnY

1<= n <=2

2017-01 ~ 2016-01

YYYYMMDD_NnD

1<= n <=2

2018-01 ~ 2018-01

YYYYMMDD_NnM

1<= n <=2

2018-02 ~ 2018-03

YYYYMMDD_NnY

1<= n <=2

2019-01 ~ 2020-01

YYYY

2018

YYYY_PnD

1<= n <=2

2017 ~ 2017

YYYY_PnM

1<= n <=2

2017 ~ 2017

YYYY_PnY

1<= n <=2

2017 ~ 2016

YYYY_NnD

1<= n <=2

2018 ~ 2018

YYYY_NnM

1<= n <=2

2018 ~ 2018

YYYY_NnY

1<= n <=2

2019 ~ 2020

MM

01

DD

01

示例

SHELL节点中输入命令行如下:


canaanhive -f demo.sql -d 2018-01-01 -E DB=demo


HQL文件 demo.sql 示例代码如下:


use ${env.DB};
create table if not exists demo(id string);
insert into demo values('${env.YYYYMMDD}');


执行内容如下:


use demo;
create table if not exists demo(id string);
insert into demo values('2018-01-01');

执行Spark任务

使用 SHELL 类型任务节点,通过命令行方式,执行 PySpark、Spark 任务。

命令格式

以 PySpark Job 为例,创建 SHELL 类型节点,使用 SHELL 命令,运行 Job 的主函数。

sh predict.sh

提交 PySpark Job

submit-pyspark-application    [options]      <python file>     [app arguments]

参数说明

Options

Function

–python 2.7/3.5

python version. Support 2.7 or 3.5. Default is 2.7.

–pythonEnvPath. If not set, default python envrionment will be used

VirtualEnv path In HDFS.

–name NAME

Name of your application.

–queue QUEUE_NAME

The YARN queue to submit to (Default: “default”).

–num-executors NUM

Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM.

–executor-cores NUM

Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode)

–driver-cores NUM

Number of cores used by the driver, only in cluster mode (Default: 1).

–conf PROP=VALUE

Arbitrary Spark configuration property.

–py-files PY_FILES

Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.

–files FILES

Comma-separated list of files to be placed in the working directory of each executor.

–archives ARCHIVES

Comma separated list of archives to be extracted into the working directory of each executor.

–driver-memory MEM

Memory for driver (e.g. 1000M, 2G) (Default: 2G).

–driver-java-options

Extra Java options to pass to the driver.

–driver-library-path

Extra library path entries to pass to the driver.

–driver-class-path

Extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath.

提交 Spark Job

submit-spark-application    [options]      <app-jar>     [app arguments]

参数说明

Options

Function

–class CLASS_NAME

Your application’s main class (for Java / Scala apps).

–name NAME

Name of your application.

–packages

Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths. Will search the local maven repo, then maven central and any additional remote repositories given by the –repositories option. The format for the coordinates should be groupId:artifactId:version.

–jars JARS

Comma-separated list of local jars to include on the driver and executor classpaths.

–conf PROP=VALUE

Arbitrary Spark configuration property.

–files FILES

Comma-separated list of files to be placed in the working directory of each executor.

–archives ARCHIVES

Comma-separated list of archives to be extracted into the working directory of each executor.

–driver-memory MEM

Memory for driver (e.g. 1000M, 2G) (Default: 2G).

–driver-java-options

Extra Java options to pass to the driver.

–driver-library-path

Extra library path entries to pass to the driver.

–driver-class-path

Extra class path entries to pass to the driver. Note that jars added with –jars are automatically included in the classpath.

–executor-cores NUM

Number of cores per executor. (Default: 1 in YARN mode, or all available cores on the worker in standalone mode).

–driver-cores NUM

Number of cores used by the driver, only in cluster mode (Default: 1).

–queue QUEUE_NAME

The YARN queue to submit to (Default: “default”).

–num-executors NUM

Number of executors to launch (Default: 2). If dynamic allocation is enabled, the initial number of executors will be at least NUM.

示例

SHELL 节点输入 SHELL 命令行,运行主函数 predict.sh:


sh predict.sh


主函数代码样例如下:


submit_pyspark_application_func(){
    submit-pyspark-application \
    --deploy-mode cluster \
    --queue ${1} \
    --name pyspark_predict_test \
    --num-executors 10 \
    --driver-memory 16g \
    --executor-memory 12g \
    --driver-cores 2 \
    --executor-cores 3 \
    --conf spark.eventLog.enabled=true \
    --conf spark.network.timeout=240000 \
    --conf spark.executor.heartbeatInterval=24000 \
    --conf spark.yarn.executor.memoryOverhead=8192 \
    --archives hdfs://user/db_test/userPythonLib.zip#ANACONDA  \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/MINICONDA/bin/python \
    --conf spark.yarn.maxAppAttempts=1 \
    --conf spark.logger_table=wens_status_algo_running \
    --conf spark.hdfs_user=${2} \
    --conf spark.hdfs_path=hdfs://titan/user/${2} \
    --conf spark.start_date=${3} \
    --conf spark.end_date=${4} \
    --conf spark.site_ids=${5} \
    --conf spark.metric_save_path=/user/${2}/operaphm_temperature/metrics \
    --py-files anomaly.py,hadoop_common_functions.py,layout.py,utm.zip,rle.py,common_tools.py,steadystatefilter.py,math_utils.py \
    --conf spark.eventLog.enabled=true  predict.py
}

echo "test"


其中,predict.py 是入口py文件,需要和 predict.sh 放在同一层级的目录下面。