数据采集工具Flume和Sqoop
Posted Xiao Miao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据采集工具Flume和Sqoop相关的知识,希望对你有一定的参考价值。
文章目录
数据采集工具Flume和Sqoop
一、Flume的功能与应用
1.功能
数据采集:将数据从一个地方采集到另一个地方
将数据进行了复制
大数据的数据采集:将各种需要处理的数据源复制到大数据仓库中
实现:实时数据流的数据采集,可以将不同各种数据源的数据采集到各种目标地
数据源:文件、网络端口
Flume:实时
目标地:HDFS、Hbase、Hive、Kafka
特点:
功能全面
所有的读取和写入的程序,都已经封装好了
只需要配置从哪读,写入哪里,就要可以实现采集
允许自定义开发
如果功能不能满足实际的业务需求,Flume提供各种接口,允许自定义开发
基于Java开发的应用程序
开发相对简单
所有功能都封装好了,只要调用即可
写一个配置文件:从哪读,都谁,写到哪去
可以实现分布式采集
分布式采集:每一台机器都可以用Flume进行采集
注意:Flume不是分布式架构
2.应用
应用于实时数据流采集场景
基于文件或者网络协议端口的数据流采集
二、Flume的基本组成
Agent:每个Agent就是一个Flume的程序,每个Agent由三部分组成:source、Channel、Sink
Source:负责读取数据,Source会动态监听数据源,将数据源新增的数据实时采集变成Event数据流,将每个Event发送到Channel中
- 每一条数据会变成一个Event
- 实时监听数据源
Channel:临时缓存数据,将source发送过来的event的数据缓存起来 ,供Sink取数据
内存、文件(磁盘)
Sink:负责发送数据,从Channel中读取采集到的数据,将数据写入目标地
sink主动到Channel中读取数据
Event:用于构建每一条数据的对象,每一条数据就会变成一个Event,进行传递,最终写入目标
组成
- head:定义一些KV属性和配置,默认head时空的
- body:数据存在body中
理解:
Event
Map head;
byte[] body; //每一条数据的字节流
三、Flume的开发规则:
step1:开发一个Flume的参数配置文件
properties格式的文件:
#step1:定义一个agent:agent的名称、定义source、channel、sink
#step2:定义source:读什么、读哪
#step3:定义channel:缓存在什么地方
#step4:定义sink:写入什么地方
step2:运行flume的agent程序
flume -ng
Usage:bin/flume-ng <command>[options]...
为什么叫Flume-ng?
- flume-og:老的版本,架构非常麻烦,性能非常差,后不用了
- flume-ng:现在用的版本
flume-ng agent -c <conf> -f <file> -n <name>
agent:表示要运行一个Flume程序
- -c :指定Flume的配置文件目录
- -f :要运行那个文件
- -n :运行的agent的名字是什么
一个程序文件中可以有多个agent程序,通过名字来区别
四、Flume开发测试
需求:采集Hive的日志、临时缓存在内存中、将日志写入Flume的日志中并打印在命令中
source:采集一个文件数据
创建测试目录:
cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir usercase
复制官方示例:
cp conf/flume-conf.properties.template usercase/hive-mem-log.properties
hive-mem-log.properties:采集hive的日志临时缓存在内存中最终打印在日志中
Exec Source
- 执行一条Linux的命令来实现采集
- 命令:搭配tail -f动态采集文件最新的内容
Chanel:Flume提供了各种channel应用缓存数据
- memory channel将数据缓存在内存中
Sink:Flume提供很多sink - Logger Sink 日志类型的Sink
开发配置文件hive-mem-log.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
#define the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
#define the sink
a1.sinks.k1.type = logger
#bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
运行
#1.切换到指定目录
cd /export/server/flume-1.6.0-cdh5.14.0-bin/
#2.运行agent程序
flume-ng agent -c conf/ -f usercase/hive-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
- -Dflume.root.logger=INFO,console:将flume的日志打印在命令行
结果:
五、常用Source
1.Exec
功能:通过执行一条Linux命令来实现数据量动态采集
- 固定搭配tail -F使用
应用场景:实现动态监听采集(单个文件)的数据
2.Taildir
功能:从Apache Flume1.7版本开始支持,动态监听采集多个文件
- 如果用的是1.5或者1.6,遇到这个问题,需要自己手动编译这个功能
测试实现
需求:让Flume动态监听一个文件和一个目录下的所有文件
准备
cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir position
mkdir -p /export/data/flume
echo " " >> /export/data/flume/bigdata01.txt
mkdir -p /export/data/flume/bigdata
开发
cp usercase/hive-mem-log.properties usercase/taildir-mem-log.properties
taildir-mem-log.properties
# define sourceName/channelName/sinkName for the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1
# define the s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json
#将所有需要监控的数据源变成一个组,这个组内有两个数据源
a1.sources.s1.filegroups = f1 f2
#指定了f1是谁:监控一个文件
a1.sources.s1.filegroups.f1 = /export/data/flume/bigdata01.txt
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.headerKey1 = value1
#指定f2是谁:监控一个目录下的所有文件
a1.sources.s1.filegroups.f2 = /export/data/flume/bigdata/.*
#指定f2采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.fileHeader = true
# define the c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# def the k1
a1.sinks.k1.type = logger
#source、channel、sink bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
运行
flume-ng agent -c conf/ -f usercase/taildir-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
元数据文件的功能:/export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json
问题:如果Flume程序故障,重启Flume程序,已经被采集过的数据还要不要采集?
需求:不需要,不能导致数据重复
功能:记录Flume所监听的每个文件已经被采集的位置
[
{"inode":34599996,"pos":14,"file":"/export/data/flume/bigdata01.txt"},{"inode":67595704,"pos":19,"file":"/export/data/flume/bigdata/test01.txt"},{"inode":67805657,"pos":7,"file":"/export/data/flume/bigdata/test02.txt"}
]
3.其他source
Kafka Source:监听读取Kafka数据
Spooldir Source:监控一个目录,只要这个目录中产生一个文件,就会采集一个文件
缺点:不能动态监控文件,被采集的文件是不能发生变化的
六、常见Channel
mem Channel:将数据缓存在内存中
-
特点:读写快、容量小、安全性较差
-
应用:小数据量的高性能的传输
file Channel:将数据缓存在文件中
-
特点:读写相对慢、容量大、安全性较高
-
应用:数据量大,读写性能要求不高的场景下
常用属性
capacity:缓存大小:指定Channel中最多存储多少条event
transactionCapacity:每次传输的大小
- 每次source最多放多少个event和每次sink最多取多少个event
- 这个值一般为capacity的十分之一,不能超过capacity
七、常见Sink
1.常用的Sink
- Kafka Sink
- HDFS Sink
问题:为什么离线采集不直接写入Hive,使用Hive sink
- 原因1:很多场景下,需要对数据提前做一步ETL,将ETL以后的结果再入库
- 原因2:Hive Sink有严格的要求,表必须为桶表,文件类型必须为orc
解决:如果要实现将数据直接放入Hive表?
- 用HDFS sink代替Hive sink
HDFS Sink功能:将Flume采集的数据写入HDFS
问题:Flume作为HDFS客户端,写入HDFS数据
- Flume必须知道HDFS地址
- Flume必须拥有HDFS的jar包
解决
- 方式一:Flume写地址的时候,指定HDFS的绝对地址
hdfs://node1:8020/nginx/log
手动将需要的jar包放入Flume的lib目录下
- 方式二:在Flume中配置Hadoop的环境变量,将core-site和hdfs-site放入Flume的配置文件目录
需求:将Hive的日志动态采集写入HDFS
cp hive-mem-log.properties hive-mem-hdfs.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log
#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test1
#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1
启动:
flume-ng agent -c conf/ -f usercase/hive-mem-hdfs.properties -n a1 -Dflume.root.logger=INFO,console
指定文件大小
-
问题:Flume默认写入HDFS上会产生很多小文件,都在1KB左右,不利用HDFS存储
-
解决:指定文件大小
hdfs.rollInterval 30 每隔多长时间产生一个文件,单位为s
hdfs.rollSize 1024 每个文件多大产生一个文件,字节
hdfs.rollCount 10 多少个event生成一个文件
如果不想使用某种规则,需要关闭,设置为0
cp hive-mem-hdfs.properties hive-mem-size.properties
hive-mem-size.properties
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per a1,
# in this case called 'a1'
#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1
#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log
#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test2
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 10240
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0
#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1
指定分区
cp hive-mem-hdfs.properties hive-mem-part.properties
运行:
flume-ng agent -c conf/ -f usercase/hive-mem-part.properties -n a1 -Dflume.root.logger=INFO,console
其他参数
#指定生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = nginx
#指定生成的文件的后缀
a1.sinks.k1.hdfs.fileSuffix = .log
#指定写入HDFS的文件的类型:普通的文件
a1.sinks.k1.hdfs.fileType = DataStream
2.Flume架构和高级组件
Flume架构
- 1.多Sink
- 一个agent可以有多个source、channel、sink
- 多个sink架构中,为了每个sink都有一份完整数据,每个sink必须对应一个独立的channel
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2
- 2.Collect架构
- 两层Flume架构:如果大量并发直接写入HDFS,导致HDFS的IO负载比较高
- 第一层
- source:taildir source
- sink:avro sink
- 第二层
- source:avro source
- sink:HDFS sink
高级组件
Flume Channel Selectors
- 功能: 用于决定source怎么将数据给channel
- 规则:默认:source默认将数据给每个channel一份
- Replicating Channel Selector (default)
- 选择:根据event头部的key值不同,给不同的channel
- Multiplexing Channel Selector
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4
Flume Interceptors:拦截器
- 功能:可以给event的头部添加KV,还可以对数据进行过滤
- 提供
- 1.Timestamp Interceptor:自动在每个event头部添加一个KV
- key:timestamp
- value:event产生的时间
- 1.Timestamp Interceptor:自动在每个event头部添加一个KV
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
- 2.Host Interceptor:自动在每个event头部添加一个KV
- key:host
- value:这个event所在的机器的名称
- 3.Static Interceptor:自动在每个event头部添加一个KV
- KV由用户自己指定
- 4.Regex Filtering Interceptor:正则过滤拦截器,判断数据是否符合正则表达式,不符合就直接过滤,不采集
Sink processor
- 功能:实现collect架构中的高可用和负载均衡
- 高可用failover:两个sink,一个工作,一个不工作
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
- priority:权重越大,就先工作
- 负载均衡load_balance:两个sink,一起工作
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
- 分配策略:round_robin,random
- 第一层必须有两个sink,作为一个整体,称为sink group
八、Sqoop的功能和作用
功能
- 用于实现mysql等RDBMS数据库与HDFS之间的数据导入与导出
导入与导出:相对HDFS而言 - 导入:将MySQL的数据导入到HDFS
- 导出:将HDFS的数据导出到MySQL
本质
- 底层就是MapReduce程序(大多数都是三大阶段的MapReduce)
- 将Sqoop的程序转换成了MapReduce程序,提交给YARN运行,实现分布式采集
- 导入:MySQL -->> HDFS
- Input:DBInputFormat:读MySQL
- Output:TextOutputFormat:写HDFS
- 导出:HDFS -->> MySQL
- Input:TextInputFormat:读HDFS
- Output:DBOutputFormat:写MySQL
特点
- 必须依赖于Hadoop:MapReduce + YARN
- MapReduce是离线计算框架,Sqoop离线数据采集的工具,只能适合于离线业务平台
应用
- 数据同步:定期将离线的数据进行采集同步到数据仓库中
- 全量:每次都采集所有数据
- 增量:每次只采集最新的数据,大部分都是增量处理
- 数据迁移:将历史数据【MySQL、Oracle】存储到HDFS中
- 全量:第一次一定是全量的
测试
sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456
九、Sqoop导入:HDFS
准备数据:
- MySQL创建数据库
create database sqoopTest;
use sqoopTest;
- MySQL创建数据表
CREATE TABLE `tb_tohdfs` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL,
`age` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- MySQL插入数据
insert into tb_tohdfs values(null,"laoda",18);
insert into tb_tohdfs values(null,"laoer",19);
insert into tb_tohdfs values(null,"laosan",20);
insert into tb_tohdfs values(null,"laosi",21);
导入语法:
#查看sqoop import帮助
sqoop import --help
- 指定数据源:MySQL
- url
- username
- password
- table
- 指定目标地:HDFS
- 指定写入的位置
测试导入
- 需求1:将MySQL中tb_tohdfs表的数据导入HDFS的/sqoop/import/test01目录中
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test01
- 需求2:将tb_tohdfs表的id和name导入HDFS的/sqoop/import/test01目录,并且用制表符分隔
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
- -m:指定MapTask的个数
- –fields-terminated-by:用于指定输出的分隔符
- –columns:指定导入哪些列
- –delete-target-dir :提前删除输出目录
- 需求3:将tb_tohdfs表中的id >2的数据导入HDFS的/sqoop/import/test01目录中
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--where 'id > 2' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
-
–where :用于指定行的过滤条件
-
需求4:将tb_tohdfs表中的id>2的数据中id和name两列导入/sqoop/import/test01目录中
-
方案1
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--where 'id > 2' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
- 方案2
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
-e 'select id,name from tb_tohdfs where id>2 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
- -e,–query :使用SQL语句读取数据.只要使用SQL语句,必须在where子句中加上$CONDITIONS
十、Sqoop导入:Hive
准备数据
use default;
create table fromsqoop(
id int,
name string,
age int
);
- 1.直接导入
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hive-import \\
--hive-database default \\
--hive-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1
-
–hive-import \\:表示导入Hive表
-
–hive-database default \\:表示指定导入哪个Hive的数据库
-
–hive-table fromsqoop \\:表示指定导入哪个Hive的表
-
–fields-terminated-by ‘\\001’ \\:指定Hive表的分隔符,一定要与Hive表的分隔符一致
原理 -
step1:将MySQL的数据通过MapReduce先导入HDFS
-
step2:将HDFS上导入的这个文件通过load命令加载到了Hive表中
-
2.hcatalog导入
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hcatalog-database default \\
--hcatalog-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1
原理
- step1:先获取Hive表的元数据
- step2:将Hive表的目录直接作为MapReduce输出
十一、Sqoop导入:增量导入
增量需求
- 第一天:产生数据
+----+--------+-----+
| 1 | laoda | 18 |
| 2 | laoer | 19 |
| 3 | laosan | 20 |
| 4 | laosi | 21 |
+----+--------+-----+
- 第二天的0点:采集昨天的数据
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
- 第二天:产生新的数据
+----+--------+-----+
| 5 | laowu | 22 |
| 6 | laoliu | 23 |
| 7 | laoqi | 24 |
| 8 | laoba | 25 |
+----+--------+-----+
- 第三天:采集昨天的数据
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
- 每次导入都是所有的数据,每次都是全量采集,会造成数据重复
Sqoop中的两种增量方式
设计:用于对某一列值进行判断,只要大于上一次的值就会被导入
参数
Incremental import arguments:
--check-column <column> Source column to check for incremental
change
--incremental <import-type> Define an incremental import of type
'append' or 'lastmodified'
--last-value <value> Last imported value in the incremental
check column
-
–check-column :按照哪一列进行增量导入
-
–last-value:用于指定上一次的值
-
–incremental:增量的方式
-
append
-
lastmodified
-
1.append
- 要求:必须有一列自增的值,按照自增的int值进行判断
- 特点:只能导入新增的数据,无法导入更新的数据
- 测试
- 第一次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--check-column id \\
--incremental append \\
--last-value 1 \\
-m 1
- 插入新的数据
insert into tb_tohdfs values(null,"laowu",22);
insert into tb_tohdfs values(null,"laoliu",23);
insert into tb_tohdfs values(null,"laoqi",24);
insert into tb_tohdfs values(null,"laoba",25);
- 第二次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1
2.lastmodifield
- 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
- 特点:既导入新增的数据也导入更新的数据
- 测试
- MySQL中创建测试数据
CREATE TABLE `tb_lastmode` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`word` varchar(200) NOT NULL,
`lastmode` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into tb_lastmode values(null,'hadoop',null);
insert into tb_lastmode values(null,'spark',null);
insert into tb_lastmode values(null,'hbase',null);
- 第一次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-12 21:55:30' \\
-m 1
- 数据发生变化
insert into tb_lastmode values(null,'hive',null);
update tb_lastmode set word = 'sqoop' where id = 1;
第二次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--merge-key id \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-12 22:01:47' \\
-m 1
- –merge-key :按照id进行合并
特殊方式
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
-e 'select id,name from tb_tohdfs where id > 12 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
- 要求:必须每次将最新导入的数据放到一个目录单独存储,不能相同
十二、Sqoop导出:全量导出
准备数据
- MySQL中创建测试表
use sqoopTest;
CREATE TABLE `tb_url` (
`id` int(11) NOT NULL,
`url` varchar(200) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Hive中创建表,并加载数据
vim /export/data/lateral.txt
1 http://facebook.com/path/p1.php?query=1
2 http://www.baidu.com/news/index.jsp?uuid=frank
3 http://www.jd.com/index?source=baidu
use default;
create table tb_url(
id int,
url string
) row format delimited fields terminated by '\\t';
load data local inpath '/export/data/lateral.txt' into table tb_url;
全量导出
sqoop export \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_url \\
--export-dir /user/hive/warehouse/tb_url \\
--input-fields-terminated-by '\\t' \\
-m 1
- –export-dir:指定导出的HDFS目录
- –input-fields-terminated-by :用于指定导出的HDFS文件的分隔符是什么
十三、Sqoop导出:增量导出
增量导出场景
- Hive中有一张结果表:存储每天分析的结果
--第一天:10号处理9号
id daystr UV PV IP
1 2020-11-09 1000 10000 500
insert into result
select id,daystr,uv,pv ,ip from datatable where daystr=昨天的日期
--第二天:11号处理10号
id daystr UV PV IP
1 2020-11-09 1000 10000 500
2 2020-11-10 2000 20000 1000
MySQL:存储每一天的结果
1 2020-11-09 1000 10000 500
增量导出方式
- updateonly:只增量导出更新的数据
- allowerinsert:既导出更新的数据,也导出新增的数据
1.updateonly
- 修改lateral.txt数据
1 http://www.itcast.com/path/p1.php?query=1
2 http://www.baidu.com/news/index.jsp?uuid=frank
3 http://www.jd.com/index?source=baidu
4 http://www.heima.com
- 重新加载覆盖
load data local inpath '/export/data/lateral.txt' overwrite into table tb_url;
- 增量导出
sqoop export \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_url \\
--export-dir /user/hive/warehouse/tb_url \\
--input-fields-terminated-by '\\t' \\
--update-key id \\
--update-mode updateonly \\
-m 1;
2.allowerinsert
- 修改lateral.txt
1 http://bigdata.itcast.com/path/p1.php?query=1
2 http://www.baidu.com/news/index.jsp?uuid=frank
3 http://www.jd.com/index?source=baidu
4 http://www.heima.com
- 覆盖表中数据
load data local inpath '/export/data/lateral.txt' overwrite into table tb_url;
- 增量导出
sqoop export \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_url \\
--export-dir /user/hive/warehouse/tb_url \\
--input-fields-terminated-by '\\t' \\
--update-key id \\
--update-mode allowinsert \\
-m 1
十四、Sqoop Job
- 增量导入的问题
- 增量导入每次都要手动修改上次的值执行,怎么解决?
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test04 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1
- Sqoop Job的使用
insert into tb_tohdfs values(null,'laojiu',26);
insert into tb_tohdfs values(null,'laoshi',27);
- 创建job
sqoop job --create job01 \\
-- import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test04 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 8 \\
-m 1
-
创建job,不会运行程序,只是在元数据中记录信息
-
列举job
sqoop job --list
- 查看job的信息
sqoop job --show jobName
- 运行job
sqoop job --exec jobName
- 删除job
sqoop job --delete jobName
运行job01
sqoop job --exec job01
插入新数据
insert into tb_tohdfs values(null,'laoshiyi',28);
insert into tb_tohdfs values(null,'laoshier',29);
运行job01
sqoop job --exec job01
十五、Sqoop密码问题与脚本封装
- 如何解决手动输入密码和密码明文问题?
- 1:在sqoop的sqoop-site.xml中配置将密码存储在客户端中
- 2:将密码存储在文件中,通过文件的权限来管理密码
sqoop job --create job02 \\
-- import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password-file file:///export/data/sqoop.passwd \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test05 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1
- –password-file
- 读取的是HDFS文件,这个文件中只能有一行密码(通过notepad++编辑)
#mysql密码
123456
Sqoop封装脚本
- 如何封装Sqoop的代码到文件中?
- step1:将代码封装到一个文件中
vim /export/data/test.sqoop
import
--connect
jdbc:mysql://node3:3306/sqoopTest
--username
root
--password-file
file:///export/data/sqoop.passwd
--table
tb_tohdfs
--target-dir
/sqoop/import/test05
--fields-terminated-by
'\\t'
-m
1
- 要求:一行只放一个参数
- step2:运行这个文件
sqoop --options-file /export/data/test.sqoop
以上是关于数据采集工具Flume和Sqoop的主要内容,如果未能解决你的问题,请参考以下文章