Flume

Posted 亿钱君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume相关的知识,希望对你有一定的参考价值。


flume官方文档:结合官方文档使用,多练习

第 1 章 Flume 概述

1.1 Flume 定义

  • Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS
  • Flume 是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传
    输的系统

    在这里插入图片描述

1.2 Flume 基础架构

在这里插入图片描述

  • Agent:Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的
  • Source:是负责接收数据到 Flume Agent 的组件
  • Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储
    或索引系统、或者被发送到另一个 Flume Agent
  • Channel:是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运
    作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作
  • Event:传输单元,Flume 数据传输的基本单元,由 HeaderBody 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组
    在这里插入图片描述

第 2 章 Flume 快速入门

2.1 Flume 安装部署

在这里插入图片描述
注意:并配置 flumeenv.sh 文件的具体做法

[atguigu@hadoop102 conf]$ vim flume-env.sh

在这里插入图片描述

2.2 Flume 入门案例

2.2.1 监控端口数据官方案例

1)案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

2)需求分析:
在这里插入图片描述
3)实现步骤:
在这里插入图片描述
在 flume-netcat-logger.conf 文件中添加如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置文件解析
在这里插入图片描述
4. 先开启 flume 监听端口
第一种写法:

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a1 --conf-file job/flume-netcat-logger.conf -
Dflume.root.logger=INFO,console

第二种写法:

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f 
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console

在这里插入图片描述

在这里插入图片描述
5.使用 netcat 工具向本机的 44444 端口发送内容

[atguigu@hadoop102 ~]$ nc localhost 44444
hello 
atguigu

在这里插入图片描述

2.2.2 实时监控单个追加文件

1)案例需求:实时监控 Hive 日志,并上传到 HDFS 中

2)需求分析:
在这里插入图片描述
3)实现步骤:

  • 1.Flume 要想将数据输出到 HDFS,须持有 Hadoop 相关 jar 包,将以下jar包拷贝到拷贝到/opt/module/flume/lib 文件夹下
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
  • 2.在/opt/module/flume/job/创建 flume-file-hdfs.conf 文件,并添加以下内容
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

注意1:设置sink路径时,hdfs路径要根据自己配置定。
如何找到自己hdfs位置和端口号呢?

[atguigu@hadoop102 logs]$ hdfs getconf -confKey fs.default.name

在这里插入图片描述
故此时sink路径为:
在这里插入图片描述

在这里插入图片描述

  • 3.运行 Flume,发现在hdfs下自动创建了
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

在这里插入图片描述

  • 4.开启 Hadoop 和 Hive 并操作 Hive 产生日志
[atguigu@hadoop102 hadoop-2.7.2]$ myhadoop.sh start

[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
  • 5.测试 Hive 产生日志
    在这里插入图片描述

2.2.3 实时监控目录下多个新文件

  • 1)案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

  • 2)需求分析:
    在这里插入图片描述

  • 3)实现步骤:

0.创建一个被监控的目录:/opt/module/flume/upload/

1.创建配置文件 flume-dir-hdfs.conf,并添加如下内容

[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

在这里插入图片描述

在这里插入图片描述
2.启动监控文件夹命令

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a3 --conf-file job/flume-dir-hdfs.conf

在这里插入图片描述
在这里插入图片描述
3. 向 upload 文件夹中添加文件

[atguigu@hadoop102 upload]$ touch atguigu.txt
[atguigu@hadoop102 upload]$ touch atguigu.tmp

在这里插入图片描述
注意:
1:往被监控的文件夹中添加新文件时,不可以往里添加文件夹里已经扫描完的同名文件,添加进去,也不会再扫描。

2:如果要更新被监控文件夹里的文件时,直接更新扫描完成的文件,不会发生改变,因为已经扫描完的带.COMPLETED文件不会再次去扫描,需要把该需要更新的带.COMPLETED的文件删除后,重新添加进去更新后的文件,才会再次扫描

2.2.4 实时监控目录下的多个追加文件

  • Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;
  • Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;
  • 而 Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控

未成功!!!

第 3 章 Flume 进阶

3.1 Flume 事务

在这里插入图片描述

3.2 Flume Agent 内部原理

在这里插入图片描述
重要组件:
1)ChannelSelector
2)SinkProcessor

3.3 Flume 拓扑结构

3.3.1 简单串联
在这里插入图片描述
3.3.2 复制和多路复用
在这里插入图片描述
Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的
地。

3.3.3 负载均衡和故障转移
在这里插入图片描述
3.3.4 聚合
在这里插入图片描述
在这里插入图片描述

3.4 Flume 企业开发案例

3.4.1 复制和多路复用

1)案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储
到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2)需求分析:
在这里插入图片描述
3)实现步骤:

在这里插入图片描述

  • 1.创建 flume-file-flume.conf,并编辑
    配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
  • 2.创建 flume-flume-hdfs.conf,编辑配置文件
    配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2- 
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  • 3.创建 flume-flume-dir.conf,编辑配置文件
    配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录

  • 4.执行配置文件
    别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a3 --conf-file job/group1/flume-flume-dir.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a2 --conf-file job/group1/flume-flume-hdfs.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a1 --conf-file job/group1/flume-file-flume.conf
  • 5.启动 Hadoop 和 Hive
[atguigu@hadoop102 hadoop-2.7.2]$ myhadoop.sh start
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
  • 6.测试:检查 HDFS 上数据
    在这里插入图片描述

  • 7.检查/opt/module/datas/flume3 目录中数据
    在这里插入图片描述

3.4.2 负载均衡和故障转移

1)案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用
FailoverSinkProcessor,实现故障转移的功能。

2)需求分析
在这里插入图片描述

3)实现步骤

3.4.3 聚合

1)案例需求
在这里插入图片描述
2)需求分析
在这里插入图片描述
3)实现步骤:

3.5 自定义 Interceptor(拦截器)

3.6 自定义 Source

3.7 自定义 Sink(更多自定这个)

3.8 Flume 数据流监控

3.8.1 Ganglia 的安装与部署

1) 安装 httpd 服务与 php

[atguigu@hadoop102 flume]$ sudo yum -y install httpd php

2) 安装其他依赖

[atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool 
rrdtool-devel
[atguigu@hadoop102 flume]$ sudo yum -y install apr-devel

在这里插入图片描述
4) 修改配置文件/etc/httpd/conf.d/ganglia.conf,修改成以下截图

[atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf

在这里插入图片描述
5) 修改配置文件/etc/ganglia/gmetad.conf

[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf

在这里插入图片描述
6) 修改配置文件/etc/ganglia/gmond.conf

在这里插入图片描述
7) 修改配置文件/etc/selinux/config

[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config

在这里插入图片描述
8) 启动 ganglia

[atguigu@hadoop102 flume]$ sudo service httpd start
[atguigu@hadoop102 flume]$ sudo service gmetad start
[atguigu@hadoop102 flume]$ sudo service gmond start

9) 打开网页浏览 ganglia 页面
http://192.168.10.102/ganglia
在这里插入图片描述
在这里插入图片描述

3.8.2 操作 Flume 测试监控

在这里插入图片描述

第 4 章 企业真实面试题(重点)

以上是关于Flume的主要内容,如果未能解决你的问题,请参考以下文章

本地环境idea进行远程debug调试flume代码

Flume配置案例

关于flume配置加载

001- Flume 概述

如何通过 Java 代码更改 Apache Flume 的配置文件?

Flume