打怪升级之小白的大数据之旅(七十一)<Hadoop生态:初识Flume>

Posted GaryLea

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了打怪升级之小白的大数据之旅(七十一)<Hadoop生态:初识Flume>相关的知识,希望对你有一定的参考价值。

打怪升级之小白的大数据之旅(七十一)

Hadoop生态:初识Flume

上次回顾

上一章,我们学习完了hive的内容,本章开始是Hadoop中经常使用的另外一个框架 Flume

初识Flume

下面这个是flume的标志

flume的中文是水槽,但我觉得将它音译为浮木更加贴切

官方对Flume的解释是这样的:

  • Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。Flume基于流式架构,灵活简单

我对Flume的理解:

  • 在古代的时候,交通没有这么发达,通常比较大量的货物都是通过水路进行运送
  • 对应Flume中:source就是货物的源头,channel就是运送货物的船只,Sink就是码头来接收货物的
  • 因为可能会有转运货物的情况:在水面上进行货物的转运,这个情况对应的就是两个Flume之间的对接

Flume架构

了解完Flume是什么,接着我就正式介绍一下Flueme的架构

Flume由 Source Channel 和 Sink组成

  • Channel: 想象上面我说的栗子,船上是装数据的,它就是Channel
  • Source: 数据肯定有源头,这个数据的源头就是通过Source来获取数据
  • Sink: 既然有源头势必有终点,一个Flume的终点就是Sink,通过Sink可以将Channel的数据取出来
  • Agent: 每个Flume它启动时的进程就是Agent,当我们启动Agent时,就创建了一个Flume
  • Event: 此时,每一组数据通过Source进入到Channel时,这个过程就是Event事件来进行

正式点的Flume架构介绍

Agent

  • Agent是一个JVM进程,它以事件的形式将数据从源头送至目的。
  • Agent主要有3个部分组成,Source、Channel、Sink

Source

  • Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据
  • 常用的source有
    • Avro Source(用于两个flume之间的传输),它是一个轻量级的RPC的调用框架
    • ExecSource 命令行监控(常用于测试环境)
    • Taildir Source 监控多个目录中的动态变化数据,常用于在生产环境中,监控本地文件
    • Kafka Source 用于监控Kafka中的数据
    • NetCat Source 用于监控网络中的数据

Sink

  • Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个Flume Agent
  • Sink组件目的地包括hdfs、logger、avro、thrift、ipc、file、HBase、solr以及自定义Sink

Channel

  • Channel是位于Source和Sink之间的缓冲区。因此,Channel允许Source和Sink运作在不同的速率上。
  • Channel是线程安全的,可以同时处理几个Source的写入操作和几个Sink的读取操作。
  • Flume自带两种Channel:Memory Channel和File Channel。
    • Memory Channel是内存中的队列。Memory Channel在不需要关心数据丢失的情景下适用。如果需要关心数据丢失,那么Memory Channel就不应该使用,因为程序死亡、机器宕机或者重启都会导致数据丢失。
    • File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据

Event

  • 传输单元,Flume数据传输的基本单元,以Event的形式将数据从源头送至目的地。
  • Event由Header和Body两部分组成,Header用来存放该event的一些属性,为K-V结构,Body用来存放该条数据,形式为字节数组

Flume的安装

安装地址
(1)Flume官网地址:http://flume.apache.org/
(2)文档查看地址:http://flume.apache.org/FlumeUserGuide.html
(3)下载地址:http://archive.apache.org/dist/flume/

Flume的安装比较简单,下载好对应版本后,直接放到服务器中解压即可,我下载的是1.9.0版本的flume

# 将apache-flume-1.9.0-bin.tar.gz上传到linux的/opt/software目录下
# 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
# 修改apache-flume-1.9.0-bin的名称为flume
mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume
# 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar

# 建议添加Flume的环境变量
# flume
sudo vim /etc/profile.d/my_env.sh 
# flume
export FLUME_HOME=/opt/module/flume
export PATH=$PATH:$FLUME_HOME/bin

Flume入门案例

我们先在flume的根目录下创建一个用于案例测试的文件夹 test_job

mkdir /opt/module/flume/test_job

官方案例:监控端口数据

需求:使用Flume监听一个端口,收集该端口数据,并打印到控制台

案例实现:

安装 netcat工具:模拟一个客户端生产数据

# 安装netcat工具
sudo yum install -y nc

检查待测试端口是否被占用

sudo netstat -nlp | grep 44444

创建Flume的Agent进程启动配置文件(因为Flume需要根据配置文件知道Source是谁,数据要传递给谁)

vim /opt/module/flume/test_job/netcat-flume.conf
# 定义Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 指定sink
a1.sinks.k1.type = logger

# 指定Channel缓冲区类型
a1.channels.c1.type = memory

# channel最大容量
a1.channels.c1.capacity = 1000

# 每次传输事件的大小
a1.channels.c1.transactionCapacity = 100

# 绑定source,sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 参数说明

根据配置文件启动flume

第一种写法:
flume-ng agent --conf conf/ --name a1 --conf-file test_job/netcat-flume.conf -Dflume.root.logger=INFO,console
第二种写法:
flume-ng agent -c conf/ -n a1 -f test_job/netcat-flume.conf -Dflume.root.logger=INFO,console
  • 启动命令参数说明
    --conf/-c:表示配置文件存储在conf/目录
    --name/-n:表示给agent起名为a1
    --conf-file/-f:flume本次启动读取的配置文件是在test_job文件夹下的flume-telnet.conf文件。
    -Dflume.root.logger=INFO,console :-D表示flume运行时动态修改flume.root.logger参数属性值,并将控制台日志打印级别设置为INFO级别。日志级别包括:log、info、warn、error
    

启动netcat,模拟客户端,传递数据

nc localhost 44444
  • 运行之后,我们在该窗口下开始模拟传输数据:分别写入hello world
  • 左边窗口是我们的agent进程(启动的flume),右边就是我们模拟的客户端netcat

这个案例是不是似曾相识?没错,这个案例就类似我们网络编程中的那个Socket端口监听

实时监控单个追加文件

学会了如何使用控制台在本地进行数据的读取,接下来,我们来学习如何监控一个文件的内容写入,写入流程如下:

  • 我们要实现监控HDFS中的文件监控,就需要运行Hadoop,自然的,首先要配置好Java和Hadoop,并且运行Hadoop集群,
  • 这个步骤我前面有专门介绍过,下面案例采用读取hadoop中hive产生的日志信息,我们直接进入案例实操

案例实现:

创建flume-file-hdfs.conf文件

vim /opt/module/flume/test_job/flume-file-hdfs.conf
# Name
# Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = hdfs
# 配置保存在HDFS的路径
a1.sinks.k1.hdfs.path = hdfs://hadoop112:8020/flume/%y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 一次积攒多少个Event就flush到HDFS
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型可以支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume

flume-ng agent -c conf/ -f test_job/flume-file-hdfs.conf -n a1 

运行hive并操作hive产生日志信息

# 模拟日志信息产生
echo hello >> hive.log 

运行结果如下:成功在HDFS保存了日志信息

实时监控目录下多个新文件

我们继续对上一个案例进行改进,当我们知道了如何监控一个文件,接下来就是如何监控一个文件夹下的多个文件,此时就需要学习一下spooling source, spooling的流程如下:

为了便于区分,我修改了一下hdfs的保存路径,案例如下:

# Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/flume/upload
# 设置监控的文件后缀名
a1.sources.r3.fileSuffix = .COMPLETED
a1.sources.r3.fileHeader = true
#忽略所有以.tmp结尾的文件,不上传
a1.sources.r3.ignorePattern = ([^ ]*\\.tmp)

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = hdfs
# 配置保存在HDFS的路径
a1.sinks.k1.hdfs.path = hdfs://hadoop112:8020/flume/upload/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 一次积攒多少个Event就flush到HDFS
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型可以支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

运行flume

flume-ng agent -c conf/ -f test_job/flume-dir-hdfs.conf  -n a1 

模拟文件夹中多个文件及其数据的产生

# 模拟文件夹
mkdir /opt/module/flume/upload
# 模拟生成多个文件
vim 1.txt
vim 2.txt
vim 3.txt
# 模拟日志信息产生
echo hello >> 1.txt
echo world >> 2.txt
echo flume >> 3.txt

运行结果如下:成功在HDFS保存了日志信息

实时监控目录下的多个追加文件

回顾一下前面的案例:

  • Exec source适用于监控一个实时追加的文件,不能实现断点续传
  • Spooldir Source适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

有了前面的铺垫,接下来就是我们日常工作中经常使用的重点内容了

  • Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传 Taildir 流程如下:

tail dir 和前面我们的spooling类似,只是source不同而已,因为tail dir 会对操作行为做一个标记,为了便于查看,我们将该标记路径改为tail_dir.json文件,配置如下:

# Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
# TailDir的标记文件保存路径
a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files/.*file.*
a3.sources.r3.filegroups.f2 = /opt/module/flume/files2/.*log.*


# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# sink
a1.sinks.k1.type = hdfs
# 配置保存在HDFS的路径
a1.sinks.k1.hdfs.path = hdfs://hadoop112:8020/flume/upload/%Y%m%d/%H
# 上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-
# 是否按照时间滚动文件夹
a1.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 一次积攒多少个Event就flush到HDFS
a1.sinks.k1.hdfs.batchSize = 100
# 设置文件类型可以支持压缩
a1.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
# 设置每个文件的滚动大小
a1.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

创建测试文件夹

mkdir /opt/module/flume/files
mkdir /opt/module/flume/files2

运行flume


模拟测试数据

echo hello >> files/1.file
echo world >> files2/1.file
echo java >> files/1.file
echo hadoop >> files2/2.file
echo hive >> files2/2.file
echo flume >> files/3.file
echo hadoop222 >> files2/1.file

运行结果如下:

总结

今天为大家带来了flume框架,一句话总结:

  • flume 是专门用于海量日志数据传输而诞生的框架

本章的重点内容就是掌握TailDir的用法,下一章,我为大家带来Flume框架的原理知识

以上是关于打怪升级之小白的大数据之旅(七十一)<Hadoop生态:初识Flume>的主要内容,如果未能解决你的问题,请参考以下文章

打怪升级之小白的大数据之旅(四十一)<大数据与Hadoop概述>

打怪升级之小白的大数据之旅(六十一)<Hive旅程第二站:Hive安装>

打怪升级之小白的大数据之旅(六十一)<Hive旅程第二站:Hive安装>

打怪升级之小白的大数据之旅(七十二)<Flume进阶>

打怪升级之小白的大数据之旅(七十二)<Flume进阶>

打怪升级之小白的大数据之旅(七十二)<Flume进阶>