工作流引擎Oozie:workflow
Posted ilinux_one
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了工作流引擎Oozie:workflow相关的知识,希望对你有一定的参考价值。
1. Oozie简介
Yahoo开发工作流引擎Oozie(驭象者),用于管理Hadoop任务(支持MapReduce、Spark、Pig、Hive),把这些任务以DAG(有向无环图)方式串接起来。Oozie任务流包括:coordinator、workflow;workflow描述任务执行顺序的DAG,而coordinator则用于定时任务触发,相当于workflow的定时管理器,其触发条件包括两类:
- 数据文件生成
- 时间条件
Oozie定义了一种基于XML的hPDL (Hadoop Process Definition Language)来描述workflow的DAG。在workflow中定义了
- 控制流节点(Control Flow Nodes)
- 动作节点(Action Nodes)
其中,控制流节点定义了流程的开始和结束(start、end),以及控制流程的执行路径(Execution Path),如decision、fork、join等;而动作节点包括Hadoop任务、SSH、HTTP、eMail和Oozie子流程等。控制流节点示例如下:
<workflow-app xmlns=‘uri:oozie:workflow:0.2‘ name="ooziedemo-wf"> <start to="timeCheck"/> ... <kill name="fail"> <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app> <!-- or --> <workflow-app xmlns=‘uri:oozie:workflow:0.2‘ name="ooziedemo-wf"> <start ../> <fork name="forking"> <path start="sqoopMerge1"/> <path start="sqoopMerge2"/> </fork> <join name="joining" to="end"/> <end ../> </workflow-app>
其中,fork、join是成对出现,表示了工作流的并发执行,最后汇聚到一个node。从Oozie的工作流调度机制可以看出,Oozie没有能力表达复杂的DAG,比如:嵌套的依赖关系。此外,Oozie工作流可以参数化,比如:在工作流定义中使用像${inputDir}之类的变量,然后通过job.properties配置对应参数,在启动时将这些配置参数传入工作流:
oozie job -oozie http://<host>:11000/oozie/ -config job.properties -run
2. Workflow
Action Node定义了基本的工作任务节点。(以下介绍版本基于Oozie 4.1.0)
MapReduce
一般地,我用java action启动MapReduce任务,对于任务的动态变化参数,在workflow的configuration进行配置,然后在job.properties指定参数值。
<action name="Data Clean"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.reduce.tasks</name> <value>${reducerNum}</value> </property> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> </configuration> <main-class>...</main-class> <java-opts>-Xms256m -Xmx512m</java-opts> <arg>..</arg> <arg>${nameNode}/user/${wf:user()}/xx</arg> ... <arg>${cleanDate}</arg> <capture-output /> </java> <ok to="end" /> <error to="fail" /> </action>
其中, ${wf:user()}为workflow的内置参数,表示当前用户名。一般地,使用该参数,为了保证写权限(毕竟没有写文件到其他用户文件夹的权限)。
Spark
Oozie支持Spark action,不过支持的不是特别好。提交spark任务时,需要加载spark-assembly jar。
<action name="Spark Data Clean"> <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <master>yarn-cluster</master> <mode>cluster</mode> <name>etl${cleanDate}</name> <class>...</class> <jar>/<hdfs>/<path>/lib/xxx.jar</jar> <spark-opts> --num-executors ${executors} --driver-memory 4g --executor-memory 4g --executor-cores 5 --queue=${queueName} </spark-opts> <arg>..</arg> </spark> <ok to="end" /> <error to="fail" /> </action>
Pig
Oozie内置pig action,其中<script>为pig脚本所在的HDFS路径,param为pig脚本中的参数。Oozie调度pig任务略坑,先随机指定一台机器,然后将pig脚本dist到该机器,然后执行。但是,因为集群中不同机器部署的pig版本可能不一致,而导致任务跑失败。
<action name="Pig Data Clean"> <pig> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> </configuration> <script>/<hdfs>/<path>/data-clean.pig</script> <param>CLEANDATE=${cleanDate}</param> </pig> <ok to="end"/> <error to="fail"/> </action>
在pig脚本中,一般用$ + 大写字母表示输入参数,示例如下:
A = load ‘/<hdfs>/<path>/$CLEANDATE‘ using OrcStorage();
...
E = ...
store E into ‘/<path>/$CLEANDATE‘;
实际上,在本地执行带参数的pig脚本时,也是用-param命令:
pig -f test.pig -param CLEANDATE=2016-05-26
Hive
Oozie也可以调度Hive任务,一般使用hive2 action通过beeline连接Hive Server 2,然后执行HiveQL:
<action name="Hive2"> <hive2 xmlns="uri:oozie:hive2-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapreduce.job.queuename</name> <value>${queueName}</value> </property> </configuration> <jdbc-url>jdbc:hive2://host:10000/db-name</jdbc-url> <script>${NameNode}/<hdfs>/<path>/test.hql</script> <param>DAYTIME=${dayTime}</param> </hive2> <ok to="end"/> <error to="fail"/> </action>
其中,param为HiveQL中的输入参数,其对应hql为
alter table db.log_tb
add if not exists partition (day_time=date ‘${DAYTIME}‘)
location ‘${DAYTIME}‘;
hive命令执行本地hql通过--hivevar传入参数:
hive -f test.hql --hivevar DAYTIME=2016-05-17
此外,在执行hive2 action时需有如下依赖:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-beeline</artifactId> <version>${hive.version}</version> </dependency>
以上是关于工作流引擎Oozie:workflow的主要内容,如果未能解决你的问题,请参考以下文章
Oozie workflow工作流action间参数传递实现