Flume
Posted 亿钱君
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume相关的知识,希望对你有一定的参考价值。
flume官方文档:结合官方文档使用,多练习
第 1 章 Flume 概述
1.1 Flume 定义
- Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS。
- Flume 是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传
输的系统
1.2 Flume 基础架构
- Agent:Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的
- Source:是负责接收数据到 Flume Agent 的组件
- Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储
或索引系统、或者被发送到另一个 Flume Agent - Channel:是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运
作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作 - Event:传输单元,Flume 数据传输的基本单元,由 Header 和 Body 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
第 2 章 Flume 快速入门
2.1 Flume 安装部署
注意:并配置 flumeenv.sh 文件的具体做法
[atguigu@hadoop102 conf]$ vim flume-env.sh
2.2 Flume 入门案例
2.2.1 监控端口数据官方案例
1)案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。
2)需求分析:
3)实现步骤:
在 flume-netcat-logger.conf 文件中添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置文件解析
4. 先开启 flume 监听端口
第一种写法:
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
a1 --conf-file job/flume-netcat-logger.conf -
Dflume.root.logger=INFO,console
第二种写法:
[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
5.使用 netcat 工具向本机的 44444 端口发送内容
[atguigu@hadoop102 ~]$ nc localhost 44444
hello
atguigu
2.2.2 实时监控单个追加文件
1)案例需求:实时监控 Hive 日志,并上传到 HDFS 中
2)需求分析:
3)实现步骤:
- 1.Flume 要想将数据输出到 HDFS,须持有 Hadoop 相关 jar 包,将以下jar包拷贝到拷贝到/opt/module/flume/lib 文件夹下。
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
- 2.在/opt/module/flume/job/创建 flume-file-hdfs.conf 文件,并添加以下内容
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
注意1:设置sink路径时,hdfs路径要根据自己配置定。
如何找到自己hdfs位置和端口号呢?
[atguigu@hadoop102 logs]$ hdfs getconf -confKey fs.default.name
故此时sink路径为:
- 3.运行 Flume,发现在hdfs下自动创建了
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf
- 4.开启 Hadoop 和 Hive 并操作 Hive 产生日志
[atguigu@hadoop102 hadoop-2.7.2]$ myhadoop.sh start
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
- 5.测试 Hive 产生日志
2.2.3 实时监控目录下多个新文件
-
1)案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS
-
2)需求分析:
-
3)实现步骤:
0.创建一个被监控的目录:/opt/module/flume/upload/
1.创建配置文件 flume-dir-hdfs.conf,并添加如下内容
[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3
2.启动监控文件夹命令
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
a3 --conf-file job/flume-dir-hdfs.conf
3. 向 upload 文件夹中添加文件
[atguigu@hadoop102 upload]$ touch atguigu.txt
[atguigu@hadoop102 upload]$ touch atguigu.tmp
注意:
1:往被监控的文件夹中添加新文件时,不可以往里添加文件夹里已经扫描完的同名文件,添加进去,也不会再扫描。
2:如果要更新被监控文件夹里的文件时,直接更新扫描完成的文件,不会发生改变,因为已经扫描完的带.COMPLETED文件不会再次去扫描,需要把该需要更新的带.COMPLETED的文件删除后,重新添加进去更新后的文件,才会再次扫描
2.2.4 实时监控目录下的多个追加文件
- Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;
- Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;
- 而 Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控。
未成功!!!
第 3 章 Flume 进阶
3.1 Flume 事务
3.2 Flume Agent 内部原理
重要组件:
1)ChannelSelector
2)SinkProcessor
3.3 Flume 拓扑结构
3.3.1 简单串联
3.3.2 复制和多路复用
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的
地。
3.3.3 负载均衡和故障转移
3.3.4 聚合
3.4 Flume 企业开发案例
3.4.1 复制和多路复用
1)案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储
到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2)需求分析:
3)实现步骤:
- 1.创建 flume-file-flume.conf,并编辑
配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
- 2.创建 flume-flume-hdfs.conf,编辑配置文件
配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
- 3.创建 flume-flume-dir.conf,编辑配置文件
配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2
提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录
- 4.执行配置文件
别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
a3 --conf-file job/group1/flume-flume-dir.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
a2 --conf-file job/group1/flume-flume-hdfs.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name
a1 --conf-file job/group1/flume-file-flume.conf
- 5.启动 Hadoop 和 Hive
[atguigu@hadoop102 hadoop-2.7.2]$ myhadoop.sh start
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
-
6.测试:检查 HDFS 上数据
-
7.检查/opt/module/datas/flume3 目录中数据
3.4.2 负载均衡和故障转移
1)案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用
FailoverSinkProcessor,实现故障转移的功能。
2)需求分析
3)实现步骤
略
3.4.3 聚合
1)案例需求
2)需求分析
3)实现步骤:
略
3.5 自定义 Interceptor(拦截器)
3.6 自定义 Source
3.7 自定义 Sink(更多自定这个)
3.8 Flume 数据流监控
3.8.1 Ganglia 的安装与部署
1) 安装 httpd 服务与 php
[atguigu@hadoop102 flume]$ sudo yum -y install httpd php
2) 安装其他依赖
[atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool
rrdtool-devel
[atguigu@hadoop102 flume]$ sudo yum -y install apr-devel
4) 修改配置文件/etc/httpd/conf.d/ganglia.conf,修改成以下截图
[atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf
5) 修改配置文件/etc/ganglia/gmetad.conf
[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf
6) 修改配置文件/etc/ganglia/gmond.conf
7) 修改配置文件/etc/selinux/config
[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config
8) 启动 ganglia
[atguigu@hadoop102 flume]$ sudo service httpd start
[atguigu@hadoop102 flume]$ sudo service gmetad start
[atguigu@hadoop102 flume]$ sudo service gmond start
9) 打开网页浏览 ganglia 页面
http://192.168.10.102/ganglia
3.8.2 操作 Flume 测试监控
略
第 4 章 企业真实面试题(重点)
略
以上是关于Flume的主要内容,如果未能解决你的问题,请参考以下文章