0#1hadoop生态圈之日志采集框架Flume入门

Posted 择码记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了0#1hadoop生态圈之日志采集框架Flume入门相关的知识,希望对你有一定的参考价值。

【1、简介】

flume 内部是一个个 agent,

agent 内部是 source,sink,channel。

source 用于跟数据源对接,

sink 用于将数据下沉到下一个 agent 或者外部存储系统,

channel 是 agent 内部数据传输通道,用于将数据从 source 到 sink。


图1 单个 agent 采集数据


【0#1】hadoop生态圈之日志采集框架Flume入门

图2 多级 agent 之间串联


【2、安装部署】

1、下载 apache-flume-1.6.0-bin.tar.gz

http://flume.apache.org/download.html

2、解压

tar -zxvf apache-flume-1.6.0-bin.tar.gz -C [指定路径]

3、进入 flume/conf,修改 flume-env.sh,配置 JAVA_HOME

4、配置采集方案,描述在配置文件中,文件名可自定义

5、指定采集方案配置文件,启动 flume agent


【3、一个简单例子:从网络端口接收数据,下沉到 logger】

1、在 flume 下的 conf 目录新建一个配置文件

vi netcat-logger.conf


# 定义这个agent中各组件的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 描述和配置source组件:r1

# 类型, 从网络端口接收数据,在本机启动,所以localhost

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444


# 描述和配置sink组件:k1

a1.sinks.k1.type = logger


# 描述和配置channel组件,此处使用是内存缓存的方式

# 下沉的时候是一批一批的,下沉的时候是一个个event

# Channel参数解释

# capacity:默认该通道中最大的可以存储的event数量

#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# 描述和配置source  channel   sink之间的连接关系

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


2、启动 flume-agent

bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console


-c conf   指定flume自身的配置文件所在目录

-f conf/netcat-logger.conf  指定我们所描述的采集方案

-n a1  指定我们这个agent的名字


3、测试

【0#1】hadoop生态圈之日志采集框架Flume入门

【0#1】hadoop生态圈之日志采集框架Flume入门


退出 telnet:ctrl + ]

telnet> quit


【4、从文件夹中采集数据】

1、配置文件

vi spool-logger.conf

#定义三大组件的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source组件

# 监听目录,spoolDir指定目录, fileHeader要不要给文件夹前坠名

type=spoolDir采集目录源,目录里有就采

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /home/hadoop/flumespool

a1.sources.r1.fileHeader = true


# 配置sink组件

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


2、启动

bin/flume-ng agent -c ./conf -f ./conf/spool-logger.conf -n a1 -Dflume.root.logger=INFO,console


3、测试

【0#1】hadoop生态圈之日志采集框架Flume入门

【0#1】hadoop生态圈之日志采集框架Flume入门

【0#1】hadoop生态圈之日志采集框架Flume入门


【5、日志文件不断追加内容,用 tail 命令获取数据,下沉到 hdfs】

[hadoop@mini1 conf]$ vi tail-hdfs.conf 

#定义三大组件的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source组件

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/log/test.log

a1.sources.r1.channels = c1


# 配置sink组件

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = /bi/flume/events/%y-%m-%d/%H%M/

a1.sinks.k1.hdfs.filePrefix = events-

#以下三行设置了每隔10分钟滚动一次

a1.sinks.k1.hdfs.round = true

#定义三大组件的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source组件

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/log/test.log

a1.sources.r1.channels = c1


# 配置sink组件

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = /bi/flume/events/%y-%m-%d/%H%M/

a1.sinks.k1.hdfs.filePrefix = events-

#以下三行设置了每隔10分钟滚动一次 hdfs 的目录

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

#文件滚动周期(秒)

#每隔 3 秒滚动文件

a1.sinks.k1.hdfs.rollInterval = 3

#文件滚动的大小限制(bytes)

#每 500 字节滚动文件

a1.sinks.k1.hdfs.rollSize = 500

#写入多少个event数据后滚动文件(事件个数)

#每 20 个 events 滚动文件

a1.sinks.k1.hdfs.rollCount = 20

#在刷新到 hdfs 前允许写到文件的 events 个数

#每 5 个 events 更新到 hdfs 一次

a1.sinks.k1.hdfs.batchSize= 5

a1.sinks.k1.hdfs.useLocalTimeStamp = true

#生成的文件类型,默认是Sequencefile,可用DataStream,则为普通文本

a1.sinks.k1.hdfs.fileType = DataStream


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


【0#1】hadoop生态圈之日志采集框架Flume入门

[hadoop@mini1 log]$ cat makelog.sh 

#!bin/bash

#每隔一定时间往 test.log 文件追加数据

i=1

while true

do

echo "gavin$i" >> /home/hadoop/log/test.log

i=$(($i+1))

sleep 0.5

done


用命令

tail -F log/test.log

查看追加的效果。


执行 flume agent 命令

bin/flume-ng agent -c conf -f conf/tail-hdfs.conf -n a1


结果

【0#1】hadoop生态圈之日志采集框架Flume入门

【0#1】hadoop生态圈之日志采集框架Flume入门

【0#1】hadoop生态圈之日志采集框架Flume入门


【6、多个 agent 连接】

从tail命令获取数据发送到avro端口

另一个节点可配置一个avro源来中继数据,

发送到外部存储(

本例直接在命令行打出来方便看到效果,

如果要存储到外部存储,诸如 hdfs,可以参考上例。)

[hadoop@mini1 conf]$ cat tail-avro.conf 

#定义三大组件的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source组件

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /home/hadoop/log/test.log

a1.sources.r1.channels = c1


# 配置sink组件

##sink端的avro是一个数据发送者

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = mini2

a1.sinks.k1.port = 4141

a1.sinks.k1.batch = 2


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


从 avro 端口接收数据,下沉到 logger

[hadoop@mini2 conf]$ cat avro-logger.conf 

#定义三大组件的名称

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# 配置source组件

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

# 绑定本机所有 ip

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141


# 配置sink组件

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1


在 mini2 上,执行命令

bin/flume-ng agent -c conf -f conf/avro-logger.conf -n a1 -Dflume.root.logger=INFO,console

接收 avro 数据。


在mini1上,执行命令

bin/flume-ng agent -c conf -f conf/tail-avro.conf -n a1

不断接收日志的新数据,以 avro 数据下沉到下一个组件。


[hadoop@mini1 log]$ sh makelog.sh


结果

并且发现它是分批出现的,一批为 100 个 event。

查看刚才的配置,发现是

a1.channels.c1.transactionCapacity = 100

起了作用。

#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量


关于 flume 入门就到此为止。

更多资料,请参考官方文档:

http://flume.apache.org/FlumeUserGuide.html


以上是关于0#1hadoop生态圈之日志采集框架Flume入门的主要内容,如果未能解决你的问题,请参考以下文章

日志采集框架Flume

flume日志采集框架使用

flumesqoopoozie

Hadoop辅助工具——FlumeSqoop

日志采集框架Flume的安装及使用

日志采集框架Flume