大数据技术之FlumeFlume进阶企业真实面试题
Posted @从一到无穷大
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据技术之FlumeFlume进阶企业真实面试题相关的知识,希望对你有一定的参考价值。
文章目录
1 Flume 进阶
1.1 Flume 事务
1.2 Flume Agent 内部原理
重要组件
(1)ChannelSelector
ChannelSelector的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是Replicating
(复制)和Multiplexing
(多路复用)。
ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel 。
(2)SinkProcessor
SinkProcessor 共有三种类型,分别是DefaultSinkProcessor
、LoadBalancingSinkProcessor
和 FailoverSinkProcessor
。
DefaultSinkProcessor 对应的是单个的 Sink,LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group,LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。
1.3 Flume 拓扑结构
1.3.1 简单串联
这种模式是将多个 flume 顺序连接起来了,从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量,flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 flume 宕机,会影响整个传输系统。
1.3.2 复制和多路复用
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 channel 中,或者将不同数据分发到不同的 channel 中, sink 可以选择传送到不同的目的地。
1.3.3 负载均衡和故障转移
Flume 支持使用将多个 sink 逻辑上分到一个 sink 组, sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。
1.3.4 聚合
这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个flume 采集日志,传送到一个集中收集日志的 flume,再由此flume 上传到hdfs、hive、hbase 等,进行日志分析。
1.4 Flume 企业开发案例
1.4.1 复制和多路复用
1. 案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给Flume-3,Flume-3 负责输出到 Local FileSystem。
2. 需求分析
3. 实现步骤
(1)准备工作
在/opt/module/flume-1.7.0/job
目录下创建 group1 文件夹
[Tom@hadoop102 job]$ cd group1/
在/opt/module/flume-1.7.0/datas/
目录下创建 flume3 文件夹
[Tom@hadoop102 datas]$ mkdir flume3
(2)创建flume-file-flume.conf
配置1 个接收日志文件的source 和两个channel、两个sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir 。
编辑配置文件
[Tom@hadoop102 group1]$ vim flume-file-flume.conf
添加如下内容
#name
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2
#Source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/hive.log
a1.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position1.json
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
#Bind
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
(3)创建 flume-flume-hdfs .conf
配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink 。
编辑配置文件
[Tom@hadoop102 group1]$ vim flume-flume-hdfs.conf
添加如下内容
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/group1/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = logs-
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)创建 flume-flume-dir .conf
配置上级 Flume 输出的 Source,输出是到本地目录的 Sink 。
编辑配置文件
[Tom@hadoop102 group1]$ vim flume-flume-dir.conf
添加如下内容
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/flume-1.7.0/data/group1
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录。
(5)执行配置文件
分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume 。
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group1/flume-flume-dir.conf
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group1/flume-flume-hdfs.conf
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group1/flume-file-flume.conf
(6)启动 Hadoop 并向 hive.log 添加数据
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-yarn.sh
[Tom@hadoop102 data]$ echo hello >> hive.log
[Tom@hadoop102 data]$ echo hust >> hive.log
(7)检查 HDFS 上数据
(8)检查 /opt/module/flume-1.7.0/datas/flume3
目录中数据
总用量 16
-rw-rw-r--. 1 Tom Tom 6 9月 12 22:02 1631453983368-46
-rw-rw-r--. 1 Tom Tom 5 9月 12 22:02 1631453983368-47
-rw-rw-r--. 1 Tom Tom 0 9月 12 22:03 1631453983368-48
1.4.2 负载均衡和故障转移
1. 案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3 ,采用 FailoverSinkProcessor ,实现故障转移的功能。
2. 需求分析
3. 实现步骤
(1)准备工作
在/opt/module/flume-1.7.0/job
目录下创建 group2 文件夹
[Tom@hadoop102 job]$ cd group2/
(2)创建 flume-netcat-flume.conf
配置1 个 netcat source 和1 个channel、1 个sink group(2 个sink),分别输送给 flume-flume-console1 和 flume-flume-console2。
编辑配置文件
[Tom@hadoop102 group2]$ vim flume-netcat-flume.conf
添加如下内容
#name
a1.sources = r1
a1.channels = c1
a1.sinks = k1 k2
a1.sinkgroups = g1
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
#SinkGroup
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
(3)创建 flume-flume-console1 .conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
编辑配置文件
[Tom@hadoop102 group2]$ vim flume-flume-console1.conf
添加如下内容
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = logger
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(4)创建 flume-flume-console2 .conf
配置上级 Flume 输出的 Source,输出是到本地控制台。
编辑配置文件
[Tom@hadoop102 group2]$ vim flume-flume-console2.conf
添加如下内容
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = logger
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
<font color= size=3 >(5)执行配置文件
分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume 。
[Tom@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console
[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group2/flume-flume-consosole1.conf -Dflume.root.logger=INFO,console
[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a1 -f job/group2/flume-netcat-flume.conf
(6)使用 netcat 工具向本机的 44444 端口发送内容
[Tom@hadoop102 flume-1.7.0]$ nc localhost 44444
(7)查看flume-flume-console2 及 flume-flume-console1 的控制台打印日志
(8)将 flume-flume-console2 kill ,观察 flume-flume-console1 的控制台打印情况。
使用jps -ml
查看 Flume 进程
[Tom@hadoop102 job]$ jps -ml
5696 org.apache.flume.node.Application -n a3 -f job/group2/flume-flume-console2.conf
5430 org.apache.flume.node.Application -n a1 -f job/group2/flume-netcat-flume.conf
5275 org.apache.flume.node.Application -n a2 -f job/group2/flume-flume-console1.conf
3581 org.apache.hadoop.hdfs.server.datanode.DataNode
5821 sun.tools.jps.Jps -ml
3438 org.apache.hadoop.hdfs.server.namenode.NameNode
1.4.3 聚合
1. 案例需求
hadoop102 上的 Flume1 监控文件opt/module/data/group.log
hadoop103 上的 Flume2 监控某一个端口的数据流,
Flume1 与 Flume2 将数据发送给 hadoop104 上的 Flume3,Flume3 将最终数据打印到控制台。
2. 需求分析
3. 实现步骤
(1)准备工作
分发 Flume
[Tom@hadoop102 module]$ xsync flume
在hadoop102、hadoop103 以及hadoop104 的/opt/module/flume-1.7.0/job
目录下创建一个group3 文件夹。
[Tom@hadoop102 job]$ mkdir group3
[Tom@hadoop103 job]$ mkdir group3
[Tom@hadoop104 job]$ mkdir group3
(2)创建 flume1-logger-flume.conf
配置 Source 用于监控 hive.log 文件,配置 Sink 输出数据到下一级 Flume。
在 hadoop102 上编辑配置文件
[Tom@hadoop102 group3]$ vim flume1-logger-flume.conf
添加如下内容
#name
a2.sources = r1
a2.channels = c1
a2.sinks = k1
#Source
a2.sources.r1.type = TAILDIR
a2.sources.r1.filegroups = f1
a2.sources.r1.filegroups.f1 = /opt/module/flume-1.7.0/data/flume.log
a2.sources.r1.positionFile = /opt/module/flume-1.7.0/position/position2.json
#Channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
#Sink
a2.sinks.k1.type = avro
a2.sinks.k1.hostname = hadoop104
a2.sinks.k1.port = 4141
#Bind
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
(3)创建 flume2-netcat-flume.conf
配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume:在 hadoop103 上编辑配置文件
[Tom@hadoop103 group3 ]$ vim flume2-netcat-flume.conf
添加如下内容
#name
a3.sources = r1
a3.channels = c1
a3.sinks = k1
#Source
a3.sources.r1.type = netcat
a3.sources.r1.bind = localhost
a3.sources.r1.port = 44444
#Channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
#Sink
a3.sinks.k1.type = avro
a3.sinks.k1.hostname = hadoop104
a3.sinks.k1.port = 4142
#Bind
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1
(4)创建 flume3-flume-logger.conf
配置 source 用于接收 flume1 与 flume2 发送过来的数据流,最终合并后 sink 到控制台。
在 hadoop104 上编辑配置文件
[Tom@hadoop104 group3 ]$ touch flume3-flume-logger.conf
[Tom@hadoop104 group3 ]$ vim flume3-flume-logger.conf
添加如下内容
#name
a4.sources = r1 r2
a4.channels = c1
a4.sinks = k1
#Source
a4.sources.r1.type = avro
a4.sources.r1.bind = hadoop104
a4.sources.r1.port = 4141
a4.sources.r2.type = avro
a4.sources.r2.bind = hadoop104
a4.sources.r2.port = 4142
#Channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 1000
a4.channels.c1.transactionCapacity = 100
#Sink
a4.sinks.k1.type = logger
#Bind
a4.sources.r1.channels = c1
a4.sources.r2.channels = c1
a4.sinks.k1.channel = c1
(5) 执行配置文件
分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf 。
[Tom@hadoop104 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a4 -f job/group4/flume3-flume-logger.conf -Dflume.root.logger=INFO,console
[Tom@hadoop103 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a3 -f job/group4/flume2-netcat-flume.conf
[huxili@hadoop102 flume-1.7.0]$ bin/flume-ng agent -c conf/ -n a2 -f job/group4/flume1-logger-flume.conf
(6)在 hadoop102 上向 /opt/module/flume-1.7.0/data/
目录下的 group .log 追加内容
[Tom@hadoop102 data]$ echo "hello" >> flume.log
(7)在 hadoop103 上向 44444 端口发送数据
[Tom@hadoop103 flume-1.7.0]$ nc localhost 44444
hust
OK
(8)检查 hadoop104 上数据
1.5 自定义 Interceptor
1. 案例需求
使用 Flume 采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
2. 需求分析
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构, Multiplexing 的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel 中,所以我们需要自定义一个Interceptor,为不同类型的event 的Header 中的key 赋予不同的值。
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义 interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)(实际测试时,我们测试字符串是否包含’‘hello’’)。
3. 实现步骤
(1)创建一个 maven 项目,并引入以下依赖。
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
(2)定义 CustomInterceptor 类并实现 Interceptor 接口。
package com.Tom.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TypeInterceptor implements Interceptor
// 声明一个存放事件的集合
private List<Event> addHeaderEvents;
@Override
public void initialize()
// 初始化
addHeaderEvents = new ArrayList<Event>();
@Override
// 单个事件拦截
public Event intercept(Event event)
// 1. 获取事件的头信息
Map<String, String> headers = event.getHeaders();
// 2. 获取事件中的body信息
String body = new String(event.getBody());
// 3. 根据body中是否有"hello"来决定添加怎样的头信息
if(body.contains("hello"))
// 4. 添加头信息
headers.put("topic", "first");
else
headers.put("topic", "second");
return event;
@Override
// 批量事件拦截
public List<Event> intercept(List<Event> events) 以上是关于大数据技术之FlumeFlume进阶企业真实面试题的主要内容,如果未能解决你的问题,请参考以下文章