大数据:任务调度
Posted Xiao Miao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:任务调度相关的知识,希望对你有一定的参考价值。
文章目录
任务调度
一、任务流调度的需求
整体需求
- 相同的业务线,有不同的需求会有多个程序来实现,这多个程序共同完成的需求,组合在一起就是工作流或者叫做任务流
- 基于工作流来实现任务流的自动化运行
基于时间的任务运行
- Job1和Job2是在每天固定的时间去采集昨天的数据
- 每天00:00
基于运行依赖关系的任务运行
- Job3必须等待Job1运行成功,才能运行
- Job5必须等待Job3和Job4都运行成功才能运行
调度类型
- 定时调度:基于某种时间的规律进行调度运行
- 依赖调度:基于某种依赖关系进行调度运行
二、任务流调度的工具
Linux Crontab
Linux中自带的一个工具
-
优点
- 简单,不用做额外的部署,能实现大多数的定时需求
- crontab -e
-
缺点
- 只能做定时任务的执行
-
语法
* * * * * command 分钟 小时 日 月 周几
Oozie
Cloudera公司研发的Hadoop生态圈的调度工具
- 官网:oozie.apache.org
- 优点
- 功能很强大,能满足几乎所有常规的任务流调度的需求
- 支持DAG流程调度
- 缺点
- 本身不是分布式的工具,依赖于MapReduce来实现分布式
- 原生的交互开发接口不友好
- 整体的监控不完善
- 学习成本比较高
Zeus
阿里巴巴最早基于Hadoop1研发的一个调度系统,目前市场上的Zeus一般都是携程版本的Zeus
- 优点
- 交互非常友好
- 使用非常简单
- 分布式的,功能相对也比较全面
- 缺点
- Bug非常多,阿里没有继续研发Zeus,不支持Hadoop2
Azkaban
LinkedIn公司研发的分布式调度工具
- 优点
- 重点着重于自身的调度功能的研发,其他的辅助性功能都通过插件来完成
- 自身也是分布式调度系统
- 界面交互性比较友好
- 开发交互性:properties或者JSON
- 缺点
- 3.x版本开始才支持完全分布式
三、Oozie的简介
功能
- Oozie是一个专门为管理Hadoop生态的程序调度而设计的工作流调度系统
- 基于DAG实现依赖调度:WorkFlow
- 基于定时器实现定时调度:Coordinator
特点
- 优点:功能全面
- 缺点:部署相对复杂、原生开发方式过于复杂
应用
- 基于Hadoop平台的分布式离线任务流调度
原理
- 底层依赖于MapReduce,将工作流变成MapReduce程序,提交个YARN
- 由YARN来将不同的工作流分配到不同的机器上运行,用于构建分布式调度系统
四、Oozie的2种使用方式
原生方式
-
这种方式,是通过自己写代码的方式来实现工作流的开发,效率低,容易出问题,不用
-
实现一个效果:4个程序
- 第一个程序:shell脚本,定时运行的
- 第二个程序:Spark程序,必须等第一个程序运行完才能运行
- 第三个程序:MapReduce程序,必须等第二个程序运行才能运行’
- 第四个程序:Hive,必须等第三个程序运行完才能运行
-
先要开发一个XML文件
- 控制节点:start、end、kill
- 控制程序运行的流程
- start:开始节点
- end:终止节点
- kill:强制退出节点
- fork:分支节点
- join:合并节点
- 程序节点:action
- 控制节点:start、end、kill
<start to="first">
<action name="first">
<shell>
<path>xxx.sh</path>
<args></args>
</shell>
<ok to="second"> </ok>
<error to="kill"></error>
</action>
<action name ="second">
<spark>
<jar></jar>
<class></class>
……
</spark>
<ok to="third"> </ok>
<error to="kill"></error>
<action>
<action name ="forth">
<hive>
<scprit></scprit>
<path></path>
……
</hive>
<ok to="end"> </ok>
<error to="kill"></error>
<action>
<kill name="kill">
kill
</kill>
<end name="end">
end
</end>
集成Hue
-
由于Oozie原生的方式交互性非常差,导致用户上手非常困难
-
Cloudera基于可视化需求,在Hue中集成Oozie开发和监控
五、WorkFlow 与 Fork 和 Join
创建测试脚本
- 启动Oozie:在第一台机器
- 启动:start-oozie.sh
- 启动:start-oozie.sh
- 关闭:stop-oozie.sh
测试
- 创建四个脚本
mkdir /export/data/flow
- /export/data/flow/test01.sh
#!/bin/bash
echo "this is test01"
- /export/data/flow/test02.sh
#!/bin/bash
echo "this is test02"
- /export/data/flow/test03.sh
#!/bin/bash
echo "this is test03"
- /export/data/flow/test04.sh
#!/bin/bash
echo "this is test04"
- 上传到HDFS
hdfs dfs -put /export/data/flow /user/oozie/
单job工作流
需求1:构建一个工作流,执行test01
多job工作流
需求2:构建一个工作流,先执行test01,再执行test02,最后执行test03
分支工作流
-
需求3
-
test01先执行
-
test01执行完成,test02和test03并行执行
-
test02和test03都执行完成,执行test04
六、SubFlow:子工作流
需求:在调度运行一个工作流的实现,需要嵌套调用另外一个工作流
七、定时调度的实现
八、自动化调度的实现
1.自动化调度需求
- 目标:自动化实现增量任务流调度
- 实施
- 第一个job:增量采集
- 第二个job:统计昨天的订单总个数
- 第三个job:统计昨天的订单总金额
- 第四个job:合并二和三的结果,得到每天的订单总个数,和总金额,导出到mysql
2.自动化调度脚本
-
目标:实现自动化脚本调度的开发
-
路径
- step1:增量采集脚本job1
- step2:增量统计个数脚本job2
- step3:增量统计金额脚本job3
- step4:增量合并导出脚本job4
-
实施
增量采集脚本job1
- 创建脚本
vim /export/data/shell/01.collect.sh
- 开发Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo "参数至多只能有一个,为处理的日期,请重新运行!"
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=$1
fi
else
#参数个数为0,默认处理昨天的日期
yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"
echo "step2:开始运行采集的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop import \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \\
--query "select * from tb_order where substring(create_time,1,10) = '${yesterday}' and \\$CONDITIONS " \\
--delete-target-dir \\
--target-dir /nginx/logs/tb_order/daystr=${yesterday} \\
--fields-terminated-by '\\t' \\
-m 1
echo "step2:采集的程序运行结束"
echo "step3:开始运行ETL"
#模拟ETL的过程,将采集的新增的数据移动到表的目录下
HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
#先判断结果是否存在,如果已经存在,先删除再移动
$HADOOP_HOME/bin/hdfs dfs -test -e /user/hive/warehouse/tb_order/daystr=${yesterday}
if [ $? -eq 0 ]
then
#存在
$HADOOP_HOME/bin/hdfs dfs -rm -r /user/hive/warehouse/tb_order/daystr=${yesterday}
$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
else
#不存在
$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
fi
echo "step3:ETL结束"
增量统计个数脚本job2
- 创建脚本
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
- 开发Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo "参数至多只能有一个,为处理的日期,请重新运行!"
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=$1
fi
else
#参数个数为0,默认处理昨天的日期
yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"
echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/02.analysis.sql
echo "step2:分析的程序运行结束"
- 开发SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by '\\t'
location '/user/hive/warehouse/tb_order';
alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');
create table if not exists default.tb_order_num_rs(
daystr string,
order_number int
)
row format delimited fields terminated by '\\t';
insert into table default.tb_order_num_rs
select
daystr,
count(id) as order_number
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr
增量统计金额脚本job3
- 创建脚本
vim /export/data/shell/03.analysis.sh
vim /export/data/shell/03.analysis.sql
- 开发Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo "参数至多只能有一个,为处理的日期,请重新运行!"
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=$1
fi
else
#参数个数为0,默认处理昨天的日期
yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"
echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/03.analysis.sql
echo "step2:分析的程序运行结束"
- 开发SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by '\\t'
location '/user/hive/warehouse/tb_order';
alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');
create table if not exists default.tb_order_price_rs(
daystr string,
order_price double
)
row format delimited fields terminated by '\\t';
insert into table default.tb_order_price_rs
select
daystr,
sum(price) as order_price
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr;
增量合并导出脚本job4
- 创建脚本
vim /export/data/shell/04.export.sh
vim /export/data/shell/04.export.sql
- 开发Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo "参数至多只能有一个,为处理的日期,请重新运行!"
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=$1
fi
else
#参数个数为0,默认处理昨天的日期
yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"
echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f hdfs://node1:8020/user/oozie/shell/04.export.sql
echo "step2:分析的程序运行结束"
echo "step3:开始运行导出的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop export \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \\
--table tb_order_rs \\
--hcatalog-database default \\
--hcatalog-table tb_order_rs \\
--input-fields-terminated-by '\\t' \\
--update-key daystr \\
--update-mode allowinsert \\
-m 1
echo "step3:导出的程序运行结束"
- 开发SQL文件
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
)
row format delimited fields terminated by '\\t';
insert into table default.tb_order_rs
select
a.daystr,
a.order_number,
b.order_price
from default.tb_order_num_rs a join default.tb_order_price_rs b on a.daystr = b.daystr
where a.daystr='${hiveconf:yest}';
3.自动化调度实现
上传
cp /export/data/sqoop.passwd /export/data/shell/
hdfs dfs -put /export/data/shell /user/oozie/
在MySQL中导入最新数据
use db_order;
insert into tb_order values('o00013','p00009','u00001',121,'2021-05-17 00:01:01');
insert into tb_order values('o00014','p00010','u00002',122,'2021-05-17 10:01:02');
insert into tb_order values('o00015','p00011','u00003',123,'2021-05-17 11:01:03');
insert into tb_order values('o00016','p00012','u00004',124,'2021-05-17 23:01:04');
以上是关于大数据:任务调度的主要内容,如果未能解决你的问题,请参考以下文章
大数据任务调度工具 Apache DolphinScheduler
大数据任务调度工具 Apache DolphinScheduler
大数据调度平台分类(Oozie/Azkaban/AirFlow/DolphinScheduler)