大数据:增量采集处理导出
Posted Xiao Miao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据:增量采集处理导出相关的知识,希望对你有一定的参考价值。
文章目录
增量采集、处理、导出
一、增量采集
1.增量采集的业务需求
离线与实时
离线:以时间为单位来实现数据的处理:采集、计算
- 场景:每天处理一次,每个小时处理一次,每个月,每年
- 特点:时效性比较低,一般都是分钟级别
- 工具:Hadoop生态圈
- Sqoop、HDFS、Hive、MapReduce、SparkCore、SparkSQL、Tez、Impala、Sqoop、mysql
实时:以数据为单位的实现数据的处理:采集、计算
- 场景:产生一条数据就要立刻采集以及处理一条数据
- 特点:时效性非常高,一般都是毫秒级别
- 工具:实时生态圈
- Flume、Canal、Kafka、SparkStreaming/Flink、Redis、Hbase
离线需求
- 实现离线采集、离线计算、离线结果保存
- 所有过程:都是增量的过程
增量与全量
- 全量:每次都所有数据进行处理
- 一般用于数据迁移、维度表的更新
- 增量:每次对最新【新增、更新】的数据进行处理
- 工作中主要的场景
2.增量采集的方案
Flume:增量文件采集
- exec:tail命令,动态的获取文件的尾部
- tail命令,自动读取文件的尾部
- taildir:动态实时监控多个文件
- 记录文件的采集位置:taildir_position.json
- 实现增量采集
Sqoop:增量采集数据库
-
方式一:按照某一列自增的int值来实现:append
- 要求:必须有一列自增的int值,必须有主键
- 特点:只能采集新增的数据
-
方式二:按照数据变化的时间列的值来实现:lastmodifield
- 要求:必须有一列时间列,时间列随着数据的更新而自动更新
- 特点:能采集新增和更新的数据
-
方式三:通过指定目录分区采集到对应的HDFS目录下
- 要求:表中有两个字段
- create_time:创建时间
- 新增的数据
- 要求:表中有两个字段
-
update_time:更新时间
- 更新的数据
-
怎么解决更新和新增数据的问题:通过SQL的过滤
-e "select * from table where substr(create_time,1,10) = '2021-05-16' or substr(update_time,1,10) = '2021-05-16'“
- 增量要求目录是提前存在,追加新增的数据进入,没有使用官方提供的增量,目录不能提前存在
--target-dir /nginx/log/2021-05-15/
- 问题:如何通过Sqoop将数据采集到Hive的分区表中?
- –hive-partition-key daystr:指定分区的字段是哪个字段
- –hive-partition-value 2021-05-15:指定导入哪个分区
- 原理
- 根据指定的参数,在HDFS中创建一个目录:key=value
table/daystr=2021-05-15
-
在Hive中加载分区即可
-
方式二:通过手动指定HDFS方式来代替
--target-dir /nginx/log/daystr=2021-05-15/hourstr=00
3.增量采集的实现
- 创建MySQL测试数据表
create database if not exists db_order;
use db_order;
drop table if exists tb_order;
create table tb_order(
id varchar(10) primary key,
pid varchar(10) not null,
userid varchar(10) not null,
price double not null,
create_time varchar(20) not null
);
- 插入测试数据
insert into tb_order values('o00001','p00001','u00001',100,'2021-05-13 00:01:01');
insert into tb_order values('o00002','p00002','u00002',100,'2021-05-13 10:01:02');
insert into tb_order values('o00003','p00003','u00003',100,'2021-05-13 11:01:03');
insert into tb_order values('o00004','p00004','u00004',100,'2021-05-13 23:01:04');
insert into tb_order values('o00005','p00005','u00001',100,'2021-05-14 00:01:01');
insert into tb_order values('o00006','p00006','u00002',100,'2021-05-14 10:01:02');
insert into tb_order values('o00007','p00007','u00003',100,'2021-05-14 11:01:03');
insert into tb_order values('o00008','p00008','u00004',100,'2021-05-14 23:01:04');
- 第一次增量采集:采集14日新增的数据
sqoop import \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file file:///export/data/sqoop.passwd \\
--query "select * from tb_order where substring(create_time,1,10) = '2021-05-14' and \\$CONDITIONS " \\
--delete-target-dir \\
--target-dir /nginx/logs/tb_order/daystr=2021-05-14 \\
--fields-terminated-by '\\t' \\
-m 1
- 15日MySQL中有新的数据生成
insert into tb_order values('o00009','p00005','u00001',100,'2021-05-15 00:01:01');
insert into tb_order values('o00010','p00006','u00002',100,'2021-05-15 10:01:02');
insert into tb_order values('o00011','p00007','u00003',100,'2021-05-15 11:01:03');
insert into tb_order values('o00012','p00008','u00004',100,'2021-05-15 23:01:04');
- 第二次增量采集:采集15日新增的数据
sqoop import \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file file:///export/data/sqoop.passwd \\
--query "select * from tb_order where substring(create_time,1,10) = '2021-05-15' and \\$CONDITIONS " \\
--delete-target-dir \\
--target-dir /nginx/logs/tb_order/daystr=2021-05-15 \\
--fields-terminated-by '\\t' \\
-m 1
二、增量处理
1.Hive中创建表
use default;
drop table default.tb_order;
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by '\\t'
location '/user/hive/warehouse/tb_order';
2.第一次增量处理
- 假设今天15号:对14号的数据进行处理
- 增量ETL
- 如果ETL发生在进入Hive之前,ETL工作中都是通过SparkCore/MR程序进行处理
yarn jar etl.jar main_class \\
#输入目录:数据采集的目录
/nginx/logs/tb_order/daystr=2021-05-14 \\
#输出目录:构建Hive表的目录
/user/hive/warehouse/tb_order/daystr=2021-05-14
- 这个程序一般不会给参数,昨天的日期都是在ETL程序中动态获取的
//Input
Path inputPath = new Path("/nginx/logs/tb_order/daystr="+yesterday)
TextInputFormat.setInputPaths(job,inputPath)
//Output
Path outputPath = new Path("/user/hive/warehouse/tb_order/daystr="+yesterday)
TextOutputFormat.setOutputPath(job,outputPath)
- 模拟数据ETL的过程
hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-14 /user/hive/warehouse/tb_order/
- 加载分区
- 第一种方式:修改Hive元数据,默认用于修复添加分区,不常用
msck repair table tb_order;
- 这个命令在Hive2.1版本中是个bug,不能用
- 增量处理
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='2021-05-14'
group by daystr;
3.第二次增量处理
- 假设今天16号:对15号的数据进行处理
- 增量ETL
- 模拟数据ETL的过程
hdfs dfs -cp /nginx/logs/tb_order/daystr=2021-05-15 /user/hive/warehouse/tb_order/
加载分区
- 第二种方式,添加分区元数据信息
alter table tb_order add if not exists partition(daystr='2021-05-15');
- 要求:HDFS上目录的名称必须为分区字段=值
/user/hive/warehouse/tb_order/daystr=2021-05-15
-
Hive自动根据分区默认的目录名来关联的
-
场景
/user/hive/warehouse/tb_order/2021-05-15
- 实现:通过location关键字手动指定分区对应的HDFS
alter table tb_order add if not exists partition(daystr='2021-05-15') location '/user/hive/warehouse/tb_order/2021-05-15';
- 增量处理
select
daystr,
count(id) as order_number,
8ujhb sum(price) as order_price
from default.tb_order
where daystr='2021-05-15
group by daystr;
三、增量导出
1.Hive中建立APP层结果表
drop table if exists tb_order_rs;
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
)
row format delimited fields terminated by '\\t';
MySQL中创建导出结果表
use db_order;
drop table if exists db_order.tb_order_rs;
create table db_order.tb_order_rs(
daystr varchar(20) primary key,
order_number int,
order_price double
);
2.第一次增量导出
- 第一次分析的结果写入HiveAPP层的结果表
insert into table tb_order_rs
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='2021-05-14'
group by daystr;
第一次导出到MySQL
sqoop export \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password 123456 \\
--table tb_order_rs \\
--hcatalog-database default \\
--hcatalog-table tb_order_rs \\
--input-fields-terminated-by '\\t' \\
--update-key daystr \\
--update-mode allowinsert \\
-m 1
3.第二次增量导出
第二次分析的结果写入HiveAPP层的结果表
insert into table tb_order_rs
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='2021-05-15'
group by daystr;
第二次导出到MySQL
sqoop export \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password 123456 \\
--table tb_order_rs \\
--hcatalog-database default \\
--hcatalog-table tb_order_rs \\
--input-fields-terminated-by '\\t' \\
--update-key daystr \\
--update-mode allowinsert \\
-m 1
四、增量采集脚本的开发
-
需求:实现增量的采集,默认采集昨天的数据,允许采集指定日期的数据,并且实现模拟ETL过程
- 如果给定了参数,处理参数日期的数据
- 如果没有给参数,默认处理昨天的数据
-
创建脚本
mkdir /export/data/shell
vim /export/data/shell/01.collect.sh
- 开发脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo "参数至多只能有一个,为处理的日期,请重新运行!"
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=$1
fi
else
#参数个数为0,默认处理昨天的日期
yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"
echo "step2:开始运行采集的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop import \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password-file file:///export/data/sqoop.passwd \\
--query "select * from tb_order where substring(create_time,1,10) = '${yesterday}' and \\$CONDITIONS " \\
--delete-target-dir \\
--target-dir /nginx/logs/tb_order/daystr=${yesterday} \\
--fields-terminated-by '\\t' \\
-m 1
echo "step2:采集的程序运行结束"
echo "step3:开始运行ETL"
#模拟ETL的过程,将采集的新增的数据移动到表的目录下
HADOOP_HOME=/export/server/hadoop-2.6.0-cdh5.14.0
#先判断结果是否存在,如果已经存在,先删除再移动
$HADOOP_HOME/bin/hdfs dfs -test -e /user/hive/warehouse/tb_order/daystr=${yesterday}
if [ $? -eq 0 ]
then
#存在
$HADOOP_HOME/bin/hdfs dfs -rm -r /user/hive/warehouse/tb_order/daystr=${yesterday}
$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
else
#不存在
$HADOOP_HOME/bin/hdfs dfs -cp /nginx/logs/tb_order/daystr=${yesterday} /user/hive/warehouse/tb_order/
fi
echo "step3:ETL结束"
五、增量处理脚本的开发
- 创建脚本
vim /export/data/shell/02.analysis.sh
vim /export/data/shell/02.analysis.sql
- Shell脚本
#!/bin/bash
#step1:先获取要采集的数据时间,规则:如果没有给参数,就默认处理昨天的日期,如果给了参数,就参数对应的日期
if [ $# -ne 0 ]
then
#参数个数不为0
if [ $# -ne 1 ]
then
echo "参数至多只能有一个,为处理的日期,请重新运行!"
exit 100
else
#参数个数只有1个,就用第一个参数作为处理的日期
yesterday=$1
fi
else
#参数个数为0,默认处理昨天的日期
yesterday=`date -d '-1 day' +%Y-%m-%d`
fi
echo "step1:要处理的日期是:${yesterday}"
echo "step2:开始运行分析"
#step2:运行分析程序
HIVE_HOME=/export/server/hive-1.1.0-cdh5.14.0
$HIVE_HOME/bin/hive --hiveconf yest=${yesterday} -f /export/data/shell/02.analysis.sql
echo "step2:分析的程序运行结束"
hive --hiveconf 参数 用于传递参数
hive -f 文件名 执行某个文件中的sql
SQL文件
create table if not exists default.tb_order(
id string ,
pid string,
userid string,
price double ,
create_time string
)
partitioned by (daystr string)
row format delimited fields terminated by '\\t'
location '/user/hive/warehouse/tb_order';
alter table default.tb_order add if not exists partition (daystr='${hiveconf:yest}');
create table if not exists default.tb_order_rs(
daystr string,
order_number int,
order_price double
)
row format delimited fields terminated by '\\t';
insert into table default.tb_order_rs
select
daystr,
count(id) as order_number,
sum(price) as order_price
from default.tb_order
where daystr='${hiveconf:yest}'
group by daystr;
六、增量导出脚本的开发
- 创建脚本
vim /export/data/shell/03.export.sh
- Shell脚本
#!/bin/bash
echo "step1:开始运行导出的程序"
#step2:运行增量采集
SQOOP_HOME=/export/server/sqoop-1.4.6-cdh5.14.0
$SQOOP_HOME/bin/sqoop export \\
--connect jdbc:mysql://node3:3306/db_order \\
--username root \\
--password 123456 \\
--table tb_order_rs \\
--hcatalog-database default \\
--hcatalog-table tb_order_rs \\
--input-fields-terminated-by '\\t' \\
--update-key daystr \\
--update-mode allowinsert \\
-m 1
echo "step1:导出的程序运行结束"
以上是关于大数据:增量采集处理导出的主要内容,如果未能解决你的问题,请参考以下文章
大数据综合项目--网站流量日志数据分析系统(详细步骤和代码)