大数据:任务调度

Posted Xiao Miao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:任务调度相关的知识,希望对你有一定的参考价值。

任务调度

一、任务流调度的需求

整体需求

  • 相同的业务线,有不同的需求会有多个程序来实现,这多个程序共同完成的需求,组合在一起就是工作流或者叫做任务流
  • 基于工作流来实现任务流的自动化运行

在这里插入图片描述
基于时间的任务运行

  • Job1和Job2是在每天固定的时间去采集昨天的数据
  • 每天00:00

基于运行依赖关系的任务运行

  • Job3必须等待Job1运行成功,才能运行
  • Job5必须等待Job3和Job4都运行成功才能运行

调度类型

  • 定时调度:基于某种时间的规律进行调度运行
  • 依赖调度:基于某种依赖关系进行调度运行

二、任务流调度的工具

Linux Crontab
Linux中自带的一个工具

  • 优点

    • 简单,不用做额外的部署,能实现大多数的定时需求
    • crontab -e
  • 缺点

    • 只能做定时任务的执行
  • 语法

    *			*			*			*			*			command
    分钟	   小时		    日			月		   周几
    

Oozie
Cloudera公司研发的Hadoop生态圈的调度工具

  • 官网:oozie.apache.org
  • 优点
    • 功能很强大,能满足几乎所有常规的任务流调度的需求
    • 支持DAG流程调度
  • 缺点
    • 本身不是分布式的工具,依赖于MapReduce来实现分布式
    • 原生的交互开发接口不友好
    • 整体的监控不完善
    • 学习成本比较高

Zeus
阿里巴巴最早基于Hadoop1研发的一个调度系统,目前市场上的Zeus一般都是携程版本的Zeus

  • 优点
    • 交互非常友好
    • 使用非常简单
    • 分布式的,功能相对也比较全面
  • 缺点
    • Bug非常多,阿里没有继续研发Zeus,不支持Hadoop2

Azkaban
LinkedIn公司研发的分布式调度工具

  • 优点
    • 重点着重于自身的调度功能的研发,其他的辅助性功能都通过插件来完成
    • 自身也是分布式调度系统
    • 界面交互性比较友好
    • 开发交互性:properties或者JSON
  • 缺点
    • 3.x版本开始才支持完全分布式

三、Oozie的简介

功能

  • Oozie是一个专门为管理Hadoop生态的程序调度而设计的工作流调度系统
  • 基于DAG实现依赖调度:WorkFlow
  • 基于定时器实现定时调度:Coordinator

特点

  • 优点:功能全面
  • 缺点:部署相对复杂、原生开发方式过于复杂

应用

  • 基于Hadoop平台的分布式离线任务流调度

原理

  • 底层依赖于MapReduce,将工作流变成MapReduce程序,提交个YARN
  • 由YARN来将不同的工作流分配到不同的机器上运行,用于构建分布式调度系统

四、Oozie的2种使用方式

原生方式

  • 这种方式,是通过自己写代码的方式来实现工作流的开发,效率低,容易出问题,不用

  • 实现一个效果:4个程序

    • 第一个程序:shell脚本,定时运行的
    • 第二个程序:Spark程序,必须等第一个程序运行完才能运行
    • 第三个程序:MapReduce程序,必须等第二个程序运行才能运行’
    • 第四个程序:Hive,必须等第三个程序运行完才能运行
  • 先要开发一个XML文件

    • 控制节点:start、end、kill
      • 控制程序运行的流程
    • start:开始节点
      • end:终止节点
      • kill:强制退出节点
      • fork:分支节点
      • join:合并节点
    • 程序节点:action
<start to="first">
<action name="first">
	<shell>
		<path>xxx.sh</path>
		<args></args>
	</shell>
	<ok to="second"> </ok>
	<error to="kill"></error>
</action>
<action name ="second">
	<spark>
		<jar></jar>
		<class></class>
		……
	</spark>
	<ok to="third"> </ok>
<error to="kill"></error>
<action>
    <action name ="forth">
	<hive>
		<scprit></scprit>
		<path></path>
		……
	</hive>
	<ok to="end"> </ok>
	<error to="kill"></error>
<action>
<kill name="kill"> 
	kill 
</kill>
    <end name="end"> 
	end 
</end>

集成Hue

  • 由于Oozie原生的方式交互性非常差,导致用户上手非常困难

  • Cloudera基于可视化需求,在Hue中集成Oozie开发和监控
    在这里插入图片描述
    在这里插入图片描述

五、WorkFlow 与 Fork 和 Join

创建测试脚本

  • 启动Oozie:在第一台机器
    • 启动:start-oozie.sh
      在这里插入图片描述
  • 关闭:stop-oozie.sh

测试

  • 创建四个脚本
mkdir /export/data/flow
  • /export/data/flow/test01.sh
#!/bin/bash
echo "this is test01"
  • /export/data/flow/test02.sh
#!/bin/bash
echo "this is test02"
  • /export/data/flow/test03.sh
#!/bin/bash
echo "this is test03"
  • /export/data/flow/test04.sh
#!/bin/bash
echo "this is test04"
  • 上传到HDFS
hdfs dfs -put /export/data/flow /user/oozie/

单job工作流
需求1:构建一个工作流,执行test01
在这里插入图片描述
多job工作流
需求2:构建一个工作流,先执行test01,再执行test02,最后执行test03
在这里插入图片描述

在这里插入图片描述
分支工作流

  • 需求3

  • test01先执行

  • test01执行完成,test02和test03并行执行

  • test02和test03都执行完成,执行test04
    在这里插入图片描述

在这里插入图片描述

六、SubFlow:子工作流

需求:在调度运行一个工作流的实现,需要嵌套调用另外一个工作流
在这里插入图片描述
在这里插入图片描述

七、定时调度的实现

在这里插入图片描述
在这里插入图片描述

八、自动化调度的实现

1.自动化调度需求

  • 目标:自动化实现增量任务流调度
  • 实施
    • 第一个job:增量采集
    • 第二个job:统计昨天的订单总个数
    • 第三个job:统计昨天的订单总金额
    • 第四个job:合并二和三的结果,得到每天的订单总个数,和总金额,导出到mysql

2.自动化调度脚本

  • 目标实现自动化脚本调度的开发

  • 路径

    • step1:增量采集脚本job1
    • step2:增量统计个数脚本job2
    • step3:增量统计金额脚本job3
    • step4:增量合并导出脚本job4
  • 实施

增量采集脚本job1

  • 创建脚本
vim /export/data/shell/01.collect.sh
  • 开发Shell脚本
#!/bin/bash

#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
	#参数个数不为0
	if [ $# -ne 1 ]
	then
		echo "参数至多只能有一个,为处理的日期,请重新运行!"
		exit 100
	else
		#参数个数只有1个,就用第一个参数作为处理的日期
		yesterday=$1
	fi
else
	#参数个数为0,默认处理昨天的日期
	yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"

echo "step2:开始运行采集的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop  import \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \\
--query "select * from tb_order where substring(create_time,1,10) = '${yesterday}' and \\$CONDITIONS " \\
--delete-target-dir \\
--target-dir /nginx/logs/tb_order/daystr=${yesterday} \\
--fields-terminated-by '\\t' \\
-m 1

echo "step2:采集的程序运行结束"


echo "step3:开始运行ETL"
#模拟ETL的过程,将采集的新增的数据移动到表的目录下
HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
#先判断结果是否存在,如果已经存在,先删除再移动
$HADOOP_HOME/bin/hdfs dfs -test -e  /user/hive/warehouse/tb_order/daystr=${yesterday}
if [ $? -eq 0 ]
then
	#存在
	$HADOOP_HOME/bin/hdfs dfs -rm -r  /user/hive/warehouse/tb_order/daystr=${yesterday}
	$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
else
	#不存在
	$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
fi 
echo "step3:ETL结束"

增量统计个数脚本job2

  • 创建脚本
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
  • 开发Shell脚本
#!/bin/bash

#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
	#参数个数不为0
	if [ $# -ne 1 ]
	then
		echo "参数至多只能有一个,为处理的日期,请重新运行!"
		exit 100
	else
		#参数个数只有1个,就用第一个参数作为处理的日期
		yesterday=$1
	fi
else
	#参数个数为0,默认处理昨天的日期
	yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"

echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday}  -f  hdfs://node1:8020/user/oozie/shell/02.analysis.sql

echo "step2:分析的程序运行结束"
  • 开发SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned  by (daystr string)
row format delimited fields terminated by '\\t'
location '/user/hive/warehouse/tb_order';

alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');


create table if not exists default.tb_order_num_rs(
daystr string,
order_number int
)
row format delimited fields terminated by '\\t';

insert into table default.tb_order_num_rs
select
daystr,
count(id) as order_number
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr

增量统计金额脚本job3

  • 创建脚本
vim /export/data/shell/03.analysis.sh
vim /export/data/shell/03.analysis.sql
  • 开发Shell脚本
#!/bin/bash

#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
	#参数个数不为0
	if [ $# -ne 1 ]
	then
		echo "参数至多只能有一个,为处理的日期,请重新运行!"
		exit 100
	else
		#参数个数只有1个,就用第一个参数作为处理的日期
		yesterday=$1
	fi
else
	#参数个数为0,默认处理昨天的日期
	yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"

echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday}  -f  hdfs://node1:8020/user/oozie/shell/03.analysis.sql

echo "step2:分析的程序运行结束"
  • 开发SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned  by (daystr string)
row format delimited fields terminated by '\\t'
location '/user/hive/warehouse/tb_order';

alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');


create table if not exists default.tb_order_price_rs(
daystr string,
order_price double
)
row format delimited fields terminated by '\\t';

insert into table default.tb_order_price_rs
select
daystr,
sum(price) as order_price
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr;

增量合并导出脚本job4

  • 创建脚本
vim /export/data/shell/04.export.sh
vim /export/data/shell/04.export.sql      
  • 开发Shell脚本
#!/bin/bash

#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
	#参数个数不为0
	if [ $# -ne 1 ]
	then
		echo "参数至多只能有一个,为处理的日期,请重新运行!"
		exit 100
	else
		#参数个数只有1个,就用第一个参数作为处理的日期
		yesterday=$1
	fi
else
	#参数个数为0,默认处理昨天的日期
	yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"

echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday}  -f  hdfs://node1:8020/user/oozie/shell/04.export.sql

echo "step2:分析的程序运行结束"

echo "step3:开始运行导出的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop export \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file hdfs://node1:8020/user/oozie/shell/sqoop.passwd \\
--table tb_order_rs \\
--hcatalog-database default \\
--hcatalog-table tb_order_rs \\
--input-fields-terminated-by '\\t' \\
--update-key daystr \\
--update-mode allowinsert \\
-m 1

echo "step3:导出的程序运行结束"       
  • 开发SQL文件
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
 )
row format delimited fields terminated by '\\t';
      
insert into table default.tb_order_rs
select
a.daystr,
a.order_number,
b.order_price
from default.tb_order_num_rs a join default.tb_order_price_rs b on a.daystr = b.daystr
where a.daystr='${hiveconf:yest}';

3.自动化调度实现

上传

cp  /export/data/sqoop.passwd /export/data/shell/
hdfs dfs -put /export/data/shell /user/oozie/

在MySQL中导入最新数据

use db_order;
insert into tb_order values('o00013','p00009','u00001',121,'2021-05-17 00:01:01');
insert into tb_order values('o00014','p00010','u00002',122,'2021-05-17 10:01:02');
insert into tb_order values('o00015','p00011','u00003',123,'2021-05-17 11:01:03');
insert into tb_order values('o00016','p00012','u00004',124,'2021-05-17 23:01:04');

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

以上是关于大数据:任务调度的主要内容,如果未能解决你的问题,请参考以下文章

大数据任务调度工具 Apache DolphinScheduler

大数据任务调度工具 Apache DolphinScheduler

大数据调度平台分类(Oozie/Azkaban/AirFlow/DolphinScheduler)

OPPO大数据离线任务调度系统OFLOW

开源交流丨任务or实例 详解大数据DAG调度系统Taier任务调度

贝壳大数据任务调度DAG体系设计实践