Flume

Posted 亿钱君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume相关的知识,希望对你有一定的参考价值。

这里写目录标题


flume官方文档:结合官方文档使用,多练习

第 1 章 Flume 概述

1.1 Flume 定义

  • Flume最主要的作用就是,实时读取服务器本地磁盘的数据,将数据写入到HDFS
  • Flume 是一个高可用的,高可靠的,分布式的海量日志采集、聚合和传
    输的系统

1.2 Flume 基础架构

  • Agent:Agent 是一个 JVM 进程,它以事件的形式将数据从源头送至目的
  • Source:是负责接收数据到 Flume Agent 的组件
  • Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储
    或索引系统、或者被发送到另一个 Flume Agent
  • Channel:是位于 Source 和 Sink 之间的缓冲区。因此,Channel 允许 Source 和 Sink 运
    作在不同的速率上。Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个Sink 的读取操作
  • Event:传输单元,Flume 数据传输的基本单元,由 HeaderBody 两部分组成,Header 用来存放该 event 的一些属性,为 K-V 结构,Body 用来存放该条数据,形式为字节数组

第 2 章 Flume 快速入门

2.1 Flume 安装部署


注意:并配置 flumeenv.sh 文件的具体做法

[atguigu@hadoop102 conf]$ vim flume-env.sh

2.2 Flume 入门案例

2.2.1 监控端口数据官方案例

1)案例需求:
使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

2)需求分析:

3)实现步骤:

在 flume-netcat-logger.conf 文件中添加如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置文件解析

4. 先开启 flume 监听端口
第一种写法:

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a1 --conf-file job/flume-netcat-logger.conf -
Dflume.root.logger=INFO,console

第二种写法:

[atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -n a1 -f 
job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console


5.使用 netcat 工具向本机的 44444 端口发送内容

[atguigu@hadoop102 ~]$ nc localhost 44444
hello 
atguigu

2.2.2 实时监控单个追加文件

1)案例需求:实时监控 Hive 日志,并上传到 HDFS 中

2)需求分析:

3)实现步骤:

  • 1.Flume 要想将数据输出到 HDFS,须持有 Hadoop 相关 jar 包,将以下jar包拷贝到拷贝到/opt/module/flume/lib 文件夹下
commons-configuration-1.6.jar、
hadoop-auth-2.7.2.jar、
hadoop-common-2.7.2.jar、
hadoop-hdfs-2.7.2.jar、
commons-io-2.4.jar、
htrace-core-3.1.0-incubating.jar
  • 2.在/opt/module/flume/job/创建 flume-file-hdfs.conf 文件,并添加以下内容
[atguigu@hadoop102 job]$ vim flume-file-hdfs.conf
# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2
# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log
a2.sources.r2.shell = /bin/bash -c
# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs- #是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 1000
#设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 30
#设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

注意1:设置sink路径时,hdfs路径要根据自己配置定。
如何找到自己hdfs位置和端口号呢?

[atguigu@hadoop102 logs]$ hdfs getconf -confKey fs.default.name


故此时sink路径为:

  • 3.运行 Flume,发现在hdfs下自动创建了
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf

  • 4.开启 Hadoop 和 Hive 并操作 Hive 产生日志
[atguigu@hadoop102 hadoop-2.7.2]$ myhadoop.sh start

[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
  • 5.测试 Hive 产生日志

2.2.3 实时监控目录下多个新文件

  • 1)案例需求:使用 Flume 监听整个目录的文件,并上传至 HDFS

  • 2)需求分析:

  • 3)实现步骤:

0.创建一个被监控的目录:/opt/module/flume/upload/

1.创建配置文件 flume-dir-hdfs.conf,并添加如下内容

[atguigu@hadoop102 job]$ vim flume-dir-hdfs.conf
a3.sources = r3
a3.sinks = k3
a3.channels = c3
# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\\.tmp)
# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload- 
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3


2.启动监控文件夹命令

[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a3 --conf-file job/flume-dir-hdfs.conf



3. 向 upload 文件夹中添加文件

[atguigu@hadoop102 upload]$ touch atguigu.txt
[atguigu@hadoop102 upload]$ touch atguigu.tmp


注意:
1:往被监控的文件夹中添加新文件时,不可以往里添加文件夹里已经扫描完的同名文件,添加进去,也不会再扫描。

2:如果要更新被监控文件夹里的文件时,直接更新扫描完成的文件,不会发生改变,因为已经扫描完的带.COMPLETED文件不会再次去扫描,需要把该需要更新的带.COMPLETED的文件删除后,重新添加进去更新后的文件,才会再次扫描

2.2.4 实时监控目录下的多个追加文件

  • Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;
  • Spooldir Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;
  • 而 Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控

未成功!!!

第 3 章 Flume 进阶

3.1 Flume 事务

3.2 Flume Agent 内部原理


重要组件:
1)ChannelSelector
2)SinkProcessor

3.3 Flume 拓扑结构

3.3.1 简单串联

3.3.2 复制和多路复用

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中,或者将不同数据分发到不同的 channel 中,sink 可以选择传送到不同的目的
地。

3.3.3 负载均衡和故障转移

3.3.4 聚合

3.4 Flume 企业开发案例

3.4.1 复制和多路复用

1)案例需求
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储
到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。
2)需求分析:

3)实现步骤:

  • 1.创建 flume-file-flume.conf,并编辑
    配置 1 个接收日志文件的 source 和两个 channel、两个 sink,分别输送给 flume-flume-hdfs 和 flume-flume-dir。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有 channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink 端的 avro 是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
  • 2.创建 flume-flume-hdfs.conf,编辑配置文件
    配置上级 Flume 输出的 Source,输出是到 HDFS 的 Sink。
[atguigu@hadoop102 group1]$ vim flume-flume-hdfs.conf
# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
# source 端的 avro 是一个数据接收服务
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume2/%Y%m%d/%H
#上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2- 
#是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 600
#设置每个文件的滚动大小大概是 128M
a2.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量无关
a2.sinks.k1.hdfs.rollCount = 0
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1
  • 3.创建 flume-flume-dir.conf,编辑配置文件
    配置上级 Flume 输出的 Source,输出是到本地目录的 Sink。
[atguigu@hadoop102 group1]$ vim flume-flume-dir.conf
# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop102
a3.sources.r1.port = 4142
# Describe the sink
a3.sinks.k1.type = file_roll
a3.sinks.k1.sink.directory = /opt/module/data/flume3
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 1000
a3.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

提示:输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录

  • 4.执行配置文件
    别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume。
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a3 --conf-file job/group1/flume-flume-dir.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a2 --conf-file job/group1/flume-flume-hdfs.conf
[atguigu@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name 
a1 --conf-file job/group1/flume-file-flume.conf
  • 5.启动 Hadoop 和 Hive
[atguigu@hadoop102 hadoop-2.7.2]$ myhadoop.sh start
[atguigu@hadoop102 hive]$ bin/hive
hive (default)>
  • 6.测试:检查 HDFS 上数据

  • 7.检查/opt/module/datas/flume3 目录中数据

3.4.2 负载均衡和故障转移

1)案例需求
使用 Flume1 监控一个端口,其 sink 组中的 sink 分别对接 Flume2 和 Flume3,采用
FailoverSinkProcessor,实现故障转移的功能。

2)需求分析

3)实现步骤

3.4.3 聚合

1)案例需求

2)需求分析

3)实现步骤:

3.5 自定义 Interceptor(拦截器)

3.6 自定义 Source

3.7 自定义 Sink(更多自定这个)

3.8 Flume 数据流监控

3.8.1 Ganglia 的安装与部署

1) 安装 httpd 服务与 php

[atguigu@hadoop102 flume]$ sudo yum -y install httpd php

2) 安装其他依赖

[atguigu@hadoop102 flume]$ sudo yum -y install rrdtool perl-rrdtool 
rrdtool-devel
[atguigu@hadoop102 flume]$ sudo yum -y install apr-devel


4) 修改配置文件/etc/httpd/conf.d/ganglia.conf,修改成以下截图

[atguigu@hadoop102 flume]$ sudo vim /etc/httpd/conf.d/ganglia.conf


5) 修改配置文件/etc/ganglia/gmetad.conf

[atguigu@hadoop102 flume]$ sudo vim /etc/ganglia/gmetad.conf


6) 修改配置文件/etc/ganglia/gmond.conf


7) 修改配置文件/etc/selinux/config

[atguigu@hadoop102 flume]$ sudo vim /etc/selinux/config


8) 启动 ganglia

[atguigu@hadoop102 flume]$ sudo service httpd start
[atguigu@hadoop102 flume]$ sudo service gmetad start
[atguigu@hadoop102 flume]$ sudo service gmond start

9) 打开网页浏览 ganglia 页面
http://192.168.10.102/ganglia

3.8.2 操作 Flume 测试监控

第 4 章 企业真实面试题(重点)

以上是关于Flume的主要内容,如果未能解决你的问题,请参考以下文章

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

python爬虫等获取实时数据+Flume+Kafka+Spark Streaming+mysql+Echarts实现数据动态实时采集分析展示

flume

Hadoop

数据采集工具——Flume

Flume 日志聚合