数据采集工具Flume和Sqoop

Posted Xiao Miao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据采集工具Flume和Sqoop相关的知识,希望对你有一定的参考价值。

数据采集工具Flume和Sqoop

一、Flume的功能与应用

1.功能

数据采集:将数据从一个地方采集到另一个地方
将数据进行了复制
大数据的数据采集:将各种需要处理的数据源复制到大数据仓库中

实现:实时数据流的数据采集,可以将不同各种数据源的数据采集到各种目标地
数据源:文件、网络端口
Flume:实时
目标地:HDFS、Hbase、Hive、Kafka

特点:
功能全面
所有的读取和写入的程序,都已经封装好了
只需要配置从哪读,写入哪里,就要可以实现采集

允许自定义开发
如果功能不能满足实际的业务需求,Flume提供各种接口,允许自定义开发
基于Java开发的应用程序

开发相对简单
所有功能都封装好了,只要调用即可
写一个配置文件:从哪读,都谁,写到哪去

可以实现分布式采集
分布式采集:每一台机器都可以用Flume进行采集
注意:Flume不是分布式架构

2.应用

应用于实时数据流采集场景
基于文件或者网络协议端口的数据流采集

美团的Flume设计架构

二、Flume的基本组成

在这里插入图片描述
Agent:每个Agent就是一个Flume的程序,每个Agent由三部分组成:source、Channel、Sink

Source:负责读取数据,Source会动态监听数据源,将数据源新增的数据实时采集变成Event数据流,将每个Event发送到Channel中

  • 每一条数据会变成一个Event
  • 实时监听数据源

Channel:临时缓存数据,将source发送过来的event的数据缓存起来 ,供Sink取数据
内存、文件(磁盘)

Sink:负责发送数据,从Channel中读取采集到的数据,将数据写入目标地
sink主动到Channel中读取数据

Event:用于构建每一条数据的对象,每一条数据就会变成一个Event,进行传递,最终写入目标
组成

  • head:定义一些KV属性和配置,默认head时空的
  • body:数据存在body中
    理解:
Event
Map head;
byte[] body; //每一条数据的字节流

三、Flume的开发规则:

step1:开发一个Flume的参数配置文件
properties格式的文件:

#step1:定义一个agent:agent的名称、定义source、channel、sink
#step2:定义source:读什么、读哪
#step3:定义channel:缓存在什么地方
#step4:定义sink:写入什么地方

step2:运行flume的agent程序

flume -ng
Usage:bin/flume-ng <command>[options]...

为什么叫Flume-ng?

  • flume-og:老的版本,架构非常麻烦,性能非常差,后不用了
  • flume-ng:现在用的版本
flume-ng agent -c <conf> -f <file> -n <name>

agent:表示要运行一个Flume程序

  • -c :指定Flume的配置文件目录
  • -f :要运行那个文件
  • -n :运行的agent的名字是什么

一个程序文件中可以有多个agent程序,通过名字来区别

四、Flume开发测试

需求:采集Hive的日志、临时缓存在内存中、将日志写入Flume的日志中并打印在命令中
source:采集一个文件数据

创建测试目录:

cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir usercase

复制官方示例:

cp conf/flume-conf.properties.template usercase/hive-mem-log.properties

hive-mem-log.properties:采集hive的日志临时缓存在内存中最终打印在日志中

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
Exec Source

  • 执行一条Linux的命令来实现采集
  • 命令:搭配tail -f动态采集文件最新的内容
    在这里插入图片描述
    在这里插入图片描述在这里插入图片描述

Chanel:Flume提供了各种channel应用缓存数据

  • memory channel将数据缓存在内存中
    在这里插入图片描述
    在这里插入图片描述
    Sink:Flume提供很多sink
  • Logger Sink 日志类型的Sink

开发配置文件hive-mem-log.properties

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per a1, 
# in this case called 'a1'

#define the agent
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#define the source
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log


#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000

#define the sink
a1.sinks.k1.type = logger


#bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

运行

#1.切换到指定目录
cd /export/server/flume-1.6.0-cdh5.14.0-bin/
#2.运行agent程序
flume-ng agent -c conf/ -f usercase/hive-mem-log.properties -n a1 -Dflume.root.logger=INFO,console
  • -Dflume.root.logger=INFO,console:将flume的日志打印在命令行

结果:
在这里插入图片描述

五、常用Source

1.Exec

功能:通过执行一条Linux命令来实现数据量动态采集

  • 固定搭配tail -F使用

应用场景:实现动态监听采集(单个文件)的数据

2.Taildir

功能:从Apache Flume1.7版本开始支持,动态监听采集多个文件

  • 如果用的是1.5或者1.6,遇到这个问题,需要自己手动编译这个功能

测试实现
需求:让Flume动态监听一个文件和一个目录下的所有文件

准备

cd /export/server/flume-1.6.0-cdh5.14.0-bin
mkdir position
mkdir -p /export/data/flume
echo " " >> /export/data/flume/bigdata01.txt
mkdir  -p /export/data/flume/bigdata

开发

cp usercase/hive-mem-log.properties usercase/taildir-mem-log.properties 

taildir-mem-log.properties

# define sourceName/channelName/sinkName for the agent 
a1.sources = s1
a1.channels = c1
a1.sinks = k1

# define the s1
a1.sources.s1.type = TAILDIR
#指定一个元数据记录文件
a1.sources.s1.positionFile = /export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json
#将所有需要监控的数据源变成一个组,这个组内有两个数据源
a1.sources.s1.filegroups = f1 f2
#指定了f1是谁:监控一个文件
a1.sources.s1.filegroups.f1 = /export/data/flume/bigdata01.txt
#指定f1采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f1.headerKey1 = value1
#指定f2是谁:监控一个目录下的所有文件
a1.sources.s1.filegroups.f2 = /export/data/flume/bigdata/.*
#指定f2采集到的数据的header中包含一个KV对
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.fileHeader = true 

# define the c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# def the k1
a1.sinks.k1.type = logger

#source、channel、sink bond
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

运行

flume-ng agent -c conf/ -f usercase/taildir-mem-log.properties -n a1 -Dflume.root.logger=INFO,console

在这里插入图片描述
元数据文件的功能:/export/server/flume-1.6.0-cdh5.14.0-bin/position/taildir_position.json

问题:如果Flume程序故障,重启Flume程序,已经被采集过的数据还要不要采集?
需求:不需要,不能导致数据重复

功能:记录Flume所监听的每个文件已经被采集的位置

[
{"inode":34599996,"pos":14,"file":"/export/data/flume/bigdata01.txt"},{"inode":67595704,"pos":19,"file":"/export/data/flume/bigdata/test01.txt"},{"inode":67805657,"pos":7,"file":"/export/data/flume/bigdata/test02.txt"}
]

3.其他source

Kafka Source:监听读取Kafka数据

Spooldir Source:监控一个目录,只要这个目录中产生一个文件,就会采集一个文件
缺点:不能动态监控文件,被采集的文件是不能发生变化的

六、常见Channel

mem Channel:将数据缓存在内存中

  • 特点:读写快、容量小、安全性较差

  • 应用:小数据量的高性能的传输

file Channel:将数据缓存在文件中

  • 特点:读写相对慢、容量大、安全性较高

  • 应用:数据量大,读写性能要求不高的场景下

常用属性

capacity:缓存大小:指定Channel中最多存储多少条event

transactionCapacity:每次传输的大小

  • 每次source最多放多少个event和每次sink最多取多少个event
  • 这个值一般为capacity的十分之一,不能超过capacity

七、常见Sink

1.常用的Sink

  • Kafka Sink
  • HDFS Sink

问题:为什么离线采集不直接写入Hive,使用Hive sink

  • 原因1:很多场景下,需要对数据提前做一步ETL,将ETL以后的结果再入库
  • 原因2:Hive Sink有严格的要求,表必须为桶表,文件类型必须为orc

解决:如果要实现将数据直接放入Hive表?

  • 用HDFS sink代替Hive sink

HDFS Sink功能:将Flume采集的数据写入HDFS

问题:Flume作为HDFS客户端,写入HDFS数据

  • Flume必须知道HDFS地址
  • Flume必须拥有HDFS的jar包

解决

  • 方式一:Flume写地址的时候,指定HDFS的绝对地址
hdfs://node1:8020/nginx/log

手动将需要的jar包放入Flume的lib目录下

  • 方式二:在Flume中配置Hadoop的环境变量,将core-site和hdfs-site放入Flume的配置文件目录

需求:将Hive的日志动态采集写入HDFS

cp hive-mem-log.properties hive-mem-hdfs.properties 
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per a1, 
# in this case called 'a1'


#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log 

#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000


#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test1


#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1

启动:

flume-ng agent -c conf/ -f usercase/hive-mem-hdfs.properties -n a1 -Dflume.root.logger=INFO,console

在这里插入图片描述
在这里插入图片描述
指定文件大小

  • 问题:Flume默认写入HDFS上会产生很多小文件,都在1KB左右,不利用HDFS存储

  • 解决:指定文件大小

hdfs.rollInterval	30			每隔多长时间产生一个文件,单位为s
hdfs.rollSize		1024		每个文件多大产生一个文件,字节
hdfs.rollCount		10			多少个event生成一个文件
如果不想使用某种规则,需要关闭,设置为0
 cp hive-mem-hdfs.properties hive-mem-size.properties  

hive-mem-size.properties

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per a1, 
# in this case called 'a1'


#定义当前的agent的名称,以及对应source、channel、sink的名字
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#定义s1:从哪读数据,读谁
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /export/server/hive-1.1.0-cdh5.14.0/logs/hiveserver2.log 

#定义c1:缓存在什么地方
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000


#定义k1:将数据发送给谁
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://node1:8020/flume/test2 
#指定按照时间生成文件,一般关闭
a1.sinks.k1.hdfs.rollInterval = 0
#指定文件大小生成文件,一般120 ~ 125M对应的字节数
a1.sinks.k1.hdfs.rollSize = 10240
#指定event个数生成文件,一般关闭
a1.sinks.k1.hdfs.rollCount = 0

#s1将数据给哪个channel
a1.sources.s1.channels = c1
#k1从哪个channel中取数据
a1.sinks.k1.channel = c1

指定分区

 cp hive-mem-hdfs.properties hive-mem-part.properties  

运行:

flume-ng agent -c conf/ -f usercase/hive-mem-part.properties -n a1 -Dflume.root.logger=INFO,console

在这里插入图片描述
其他参数

#指定生成的文件的前缀
a1.sinks.k1.hdfs.filePrefix = nginx
#指定生成的文件的后缀
a1.sinks.k1.hdfs.fileSuffix = .log
#指定写入HDFS的文件的类型:普通的文件
a1.sinks.k1.hdfs.fileType = DataStream 

2.Flume架构和高级组件

Flume架构

  • 1.多Sink
    • 一个agent可以有多个source、channel、sink
    • 多个sink架构中,为了每个sink都有一份完整数据,每个sink必须对应一个独立的channel
a1.sources = s1
a1.channels = c1 c2
a1.sinks = k1 k2

在这里插入图片描述

  • 2.Collect架构
    • 两层Flume架构:如果大量并发直接写入HDFS,导致HDFS的IO负载比较高
    • 第一层
      • source:taildir source
      • sink:avro sink
    • 第二层
      • source:avro source
      • sink:HDFS sink

在这里插入图片描述
高级组件

Flume Channel Selectors

  • 功能: 用于决定source怎么将数据给channel
  • 规则:默认:source默认将数据给每个channel一份
    • Replicating Channel Selector (default)
    • 选择:根据event头部的key值不同,给不同的channel
      • Multiplexing Channel Selector
a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

Flume Interceptors:拦截器

  • 功能:可以给event的头部添加KV,还可以对数据进行过滤
  • 提供
    • 1.Timestamp Interceptor:自动在每个event头部添加一个KV
      • key:timestamp
      • value:event产生的时间
a1.sources = r1
a1.channels = c1
a1.sources.r1.channels =  c1
a1.sources.r1.type = seq
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
  • 2.Host Interceptor:自动在每个event头部添加一个KV
    • key:host
    • value:这个event所在的机器的名称
  • 3.Static Interceptor:自动在每个event头部添加一个KV
    • KV由用户自己指定
  • 4.Regex Filtering Interceptor:正则过滤拦截器,判断数据是否符合正则表达式,不符合就直接过滤,不采集

Sink processor

-  功能:实现collect架构中的高可用和负载均衡
- 高可用failover:两个sink,一个工作,一个不工作
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
  • priority:权重越大,就先工作
  • 负载均衡load_balance:两个sink,一起工作
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.selector = random
  • 分配策略:round_robin,random
    在这里插入图片描述
  • 第一层必须有两个sink,作为一个整体,称为sink group

八、Sqoop的功能和作用

功能

  • 用于实现mysql等RDBMS数据库与HDFS之间的数据导入与导出
    导入与导出:相对HDFS而言
  • 导入:将MySQL的数据导入到HDFS
  • 导出:将HDFS的数据导出到MySQL

本质

  • 底层就是MapReduce程序(大多数都是三大阶段的MapReduce)
  • 将Sqoop的程序转换成了MapReduce程序,提交给YARN运行,实现分布式采集
  • 导入:MySQL -->> HDFS
    • Input:DBInputFormat:读MySQL
    • Output:TextOutputFormat:写HDFS
  • 导出:HDFS -->> MySQL
    • Input:TextInputFormat:读HDFS
    • Output:DBOutputFormat:写MySQL
      特点
  • 必须依赖于Hadoop:MapReduce + YARN
  • MapReduce是离线计算框架,Sqoop离线数据采集的工具,只能适合于离线业务平台

应用

  • 数据同步:定期将离线的数据进行采集同步到数据仓库中
    • 全量:每次都采集所有数据
    • 增量:每次只采集最新的数据,大部分都是增量处理
  • 数据迁移:将历史数据【MySQL、Oracle】存储到HDFS中
    • 全量:第一次一定是全量的

测试

sqoop list-databases --connect jdbc:mysql://node3:3306 --username root --password 123456

在这里插入图片描述

九、Sqoop导入:HDFS

准备数据:

  • MySQL创建数据库
create database sqoopTest;
use sqoopTest;
  • MySQL创建数据表
CREATE TABLE `tb_tohdfs` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL,
  `age` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  • MySQL插入数据
insert into tb_tohdfs values(null,"laoda",18);
insert into tb_tohdfs values(null,"laoer",19);
insert into tb_tohdfs values(null,"laosan",20);
insert into tb_tohdfs values(null,"laosi",21);

导入语法:

#查看sqoop import帮助
sqoop import --help
  • 指定数据源:MySQL
    • url
    • username
    • password
    • table
  • 指定目标地:HDFS
    • 指定写入的位置

测试导入

  • 需求1:将MySQL中tb_tohdfs表的数据导入HDFS的/sqoop/import/test01目录中
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test01

在这里插入图片描述

  • 需求2:将tb_tohdfs表的id和name导入HDFS的/sqoop/import/test01目录,并且用制表符分隔
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--delete-target-dir  \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
  • -m:指定MapTask的个数
  • –fields-terminated-by:用于指定输出的分隔符
  • –columns:指定导入哪些列
  • –delete-target-dir :提前删除输出目录
    在这里插入图片描述
  • 需求3:将tb_tohdfs表中的id >2的数据导入HDFS的/sqoop/import/test01目录中
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--where 'id > 2' \\
--delete-target-dir  \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
  • –where :用于指定行的过滤条件
    在这里插入图片描述

  • 需求4:将tb_tohdfs表中的id>2的数据中id和name两列导入/sqoop/import/test01目录中

  • 方案1

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--columns id,name \\
--where 'id > 2' \\
--delete-target-dir  \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
  • 方案2
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
-e 'select id,name from tb_tohdfs where id>2 and $CONDITIONS' \\
--delete-target-dir  \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
  • -e,–query :使用SQL语句读取数据.只要使用SQL语句,必须在where子句中加上$CONDITIONS
    在这里插入图片描述

十、Sqoop导入:Hive

准备数据

use default;
create table fromsqoop(
id int,
name string,
age int
);
  • 1.直接导入
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hive-import \\
--hive-database default \\
--hive-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1
  • –hive-import \\:表示导入Hive表

  • –hive-database default \\:表示指定导入哪个Hive的数据库

  • –hive-table fromsqoop \\:表示指定导入哪个Hive的表

  • –fields-terminated-by ‘\\001’ \\:指定Hive表的分隔符,一定要与Hive表的分隔符一致
    在这里插入图片描述
    原理

  • step1:将MySQL的数据通过MapReduce先导入HDFS

  • step2:将HDFS上导入的这个文件通过load命令加载到了Hive表中

  • 2.hcatalog导入

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--hcatalog-database default \\
--hcatalog-table fromsqoop \\
--fields-terminated-by '\\001' \\
-m 1

原理

  • step1:先获取Hive表的元数据
  • step2:将Hive表的目录直接作为MapReduce输出

十一、Sqoop导入:增量导入

增量需求

  • 第一天:产生数据
+----+--------+-----+
|  1 | laoda  |  18 |
|  2 | laoer  |  19 |
|  3 | laosan |  20	|
|  4 | laosi  |  21 |
+----+--------+-----+
  • 第二天的0点:采集昨天的数据
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
  • 第二天:产生新的数据
+----+--------+-----+
|  5 | laowu  |  22 |
|  6 | laoliu |  23 |
|  7 | laoqi  |  24 |
|  8 | laoba  |  25 |
+----+--------+-----+
  • 第三天:采集昨天的数据
sqoop import --connect jdbc:mysql://node3:3306/sqoopTest --username root --password 123456 --table tb_tohdfs --target-dir /sqoop/import/test02 -m 1
  • 每次导入都是所有的数据,每次都是全量采集,会造成数据重复

Sqoop中的两种增量方式

设计:用于对某一列值进行判断,只要大于上一次的值就会被导入
参数

Incremental import arguments:
   --check-column <column>        Source column to check for incremental
                                  change
   --incremental <import-type>    Define an incremental import of type
                                  'append' or 'lastmodified'
   --last-value <value>           Last imported value in the incremental
                                  check column
  • –check-column :按照哪一列进行增量导入

  • –last-value:用于指定上一次的值

  • –incremental:增量的方式

    • append

    • lastmodified

1.append

  • 要求:必须有一列自增的值,按照自增的int值进行判断
  • 特点:只能导入新增的数据,无法导入更新的数据
  • 测试
    • 第一次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--check-column id \\
--incremental append \\
--last-value 1 \\
-m 1

在这里插入图片描述

  • 插入新的数据
insert into tb_tohdfs values(null,"laowu",22);
insert into tb_tohdfs values(null,"laoliu",23);
insert into tb_tohdfs values(null,"laoqi",24);
insert into tb_tohdfs values(null,"laoba",25);
  • 第二次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test02 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1

在这里插入图片描述
2.lastmodifield

  • 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
  • 特点:既导入新增的数据也导入更新的数据
  • 测试
    • MySQL中创建测试数据
CREATE TABLE `tb_lastmode` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `word` varchar(200) NOT NULL,
  `lastmode` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP  ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into tb_lastmode values(null,'hadoop',null);
insert into tb_lastmode values(null,'spark',null);
insert into tb_lastmode values(null,'hbase',null);
  • 第一次采集
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-12 21:55:30' \\
-m 1

在这里插入图片描述

  • 数据发生变化
insert into tb_lastmode values(null,'hive',null);
update tb_lastmode set word = 'sqoop' where id = 1;

第二次采集

sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_lastmode \\
--target-dir /sqoop/import/test03 \\
--fields-terminated-by '\\t' \\
--merge-key id \\
--incremental lastmodified \\
--check-column lastmode \\
--last-value '2021-05-12 22:01:47' \\
-m 1
  • –merge-key :按照id进行合并
    在这里插入图片描述
    特殊方式
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
-e 'select id,name from tb_tohdfs where id > 12 and $CONDITIONS' \\
--delete-target-dir \\
--target-dir /sqoop/import/test01 \\
--fields-terminated-by '\\t' \\
-m 1
  • 要求:必须每次将最新导入的数据放到一个目录单独存储,不能相同

十二、Sqoop导出:全量导出

准备数据

  • MySQL中创建测试表
use sqoopTest;
CREATE TABLE `tb_url` (
  `id` int(11) NOT NULL,
  `url` varchar(200) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Hive中创建表,并加载数据

vim /export/data/lateral.txt
1	http://facebook.com/path/p1.php?query=1
2	http://www.baidu.com/news/index.jsp?uuid=frank
3	http://www.jd.com/index?source=baidu

use default;
create table tb_url(
id int,
url string
) row format delimited fields terminated by '\\t';

load data local inpath '/export/data/lateral.txt' into table tb_url;

在这里插入图片描述
全量导出

sqoop export \\
--connect  jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_url \\
--export-dir /user/hive/warehouse/tb_url \\
--input-fields-terminated-by '\\t' \\
-m 1 
  • –export-dir:指定导出的HDFS目录
  • –input-fields-terminated-by :用于指定导出的HDFS文件的分隔符是什么
    在这里插入图片描述

十三、Sqoop导出:增量导出

增量导出场景

  • Hive中有一张结果表:存储每天分析的结果
--第一天:10号处理9号
id		daystr			UV 			PV			IP
1		2020-11-09		1000		10000		500

insert into result
select id,daystr,uv,pv ,ip from datatable where daystr=昨天的日期
--第二天:11号处理10号
id		daystr			UV 			PV			IP
1		2020-11-09		1000		10000		500
2		2020-11-10		2000		20000		1000

MySQL:存储每一天的结果

1		2020-11-09		1000		10000		500

增量导出方式

  • updateonly:只增量导出更新的数据
  • allowerinsert:既导出更新的数据,也导出新增的数据

1.updateonly

  • 修改lateral.txt数据
1	http://www.itcast.com/path/p1.php?query=1
2	http://www.baidu.com/news/index.jsp?uuid=frank
3	http://www.jd.com/index?source=baidu
4	http://www.heima.com
  • 重新加载覆盖
load data local inpath '/export/data/lateral.txt' overwrite into table tb_url;

在这里插入图片描述

  • 增量导出
sqoop export \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_url \\
--export-dir /user/hive/warehouse/tb_url \\
--input-fields-terminated-by '\\t' \\
--update-key id \\
--update-mode updateonly \\
-m 1;

在这里插入图片描述
2.allowerinsert

  • 修改lateral.txt
1	http://bigdata.itcast.com/path/p1.php?query=1
2	http://www.baidu.com/news/index.jsp?uuid=frank
3	http://www.jd.com/index?source=baidu
4	http://www.heima.com
  • 覆盖表中数据
load data local inpath '/export/data/lateral.txt' overwrite into table tb_url;

在这里插入图片描述

  • 增量导出
sqoop export \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_url \\
--export-dir /user/hive/warehouse/tb_url \\
--input-fields-terminated-by '\\t' \\
--update-key id \\
--update-mode allowinsert \\
-m 1

在这里插入图片描述

十四、Sqoop Job

  • 增量导入的问题
    • 增量导入每次都要手动修改上次的值执行,怎么解决?
sqoop import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test04 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1
  • Sqoop Job的使用
insert into tb_tohdfs values(null,'laojiu',26);
insert into tb_tohdfs values(null,'laoshi',27);
  • 创建job
sqoop job --create job01 \\
-- import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password 123456 \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test04 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 8 \\
-m 1
  • 创建job,不会运行程序,只是在元数据中记录信息

  • 列举job

sqoop job --list
  • 查看job的信息
sqoop job --show jobName
  • 运行job
sqoop job --exec jobName
  • 删除job
sqoop job --delete jobName

运行job01

sqoop job --exec job01

在这里插入图片描述
插入新数据

insert into tb_tohdfs values(null,'laoshiyi',28);
insert into tb_tohdfs values(null,'laoshier',29);

运行job01

sqoop job --exec job01

在这里插入图片描述

十五、Sqoop密码问题与脚本封装

  • 如何解决手动输入密码和密码明文问题?
    • 1:在sqoop的sqoop-site.xml中配置将密码存储在客户端中
    • 2:将密码存储在文件中,通过文件的权限来管理密码
sqoop job --create job02 \\
-- import \\
--connect jdbc:mysql://node3:3306/sqoopTest \\
--username root \\
--password-file file:///export/data/sqoop.passwd \\
--table tb_tohdfs \\
--target-dir /sqoop/import/test05 \\
--fields-terminated-by '\\t' \\
--incremental append \\
--check-column id \\
--last-value 4 \\
-m 1
  • –password-file
  • 读取的是HDFS文件,这个文件中只能有一行密码(通过notepad++编辑)
#mysql密码
123456

在这里插入图片描述
Sqoop封装脚本

  • 如何封装Sqoop的代码到文件中?
    • step1:将代码封装到一个文件中
vim /export/data/test.sqoop
import
--connect
jdbc:mysql://node3:3306/sqoopTest
--username 
root
--password-file 
file:///export/data/sqoop.passwd
--table 
tb_tohdfs
--target-dir 
/sqoop/import/test05
--fields-terminated-by 
'\\t' 
-m 
1
  • 要求:一行只放一个参数
    • step2:运行这个文件
sqoop --options-file /export/data/test.sqoop

以上是关于数据采集工具Flume和Sqoop的主要内容,如果未能解决你的问题,请参考以下文章

Flume和 Sqoop

Day15:数据采集工具Flume与Sqoop

Flume,Sqoop学习以及应用

修真大数据协作框架Sqoop+Flume+Oozie+Hue(59讲)

数据集成:Flume和Sqoop

使用 SQOOP 和 FLUME 将数据从 RDBMS 移动到 Hadoop