Flume安装及配置

Posted 码上攻城

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume安装及配置相关的知识,希望对你有一定的参考价值。

Flume 提供了大量内置的 Source、Channel 和 Sink 类型。而且不同类型的 Source、Channel 和 Sink 可以自由组合—–组合方式基于配置文件的设置,非常灵活。比如:Channel 可以把事件暂存在内存里,也可以持久化到本地硬盘上。Sink 可以把日志写入 HDFS、HBase,甚至是另外一个 Source 等。

安装

  • 下载源码包到 /usr/local/src 目录下

  • 解压tar.gz包

  • 修改配置文件,在 /usr/local/src/apache-flume-1.7.0-bin/conf 目录下

  • 配置环境变量

核心配置

  • 在 /usr/local/src/apache-flume-1.7.0-bin/conf 目录下提供默认配置模板

  • 配置示例——NetCat Source:监听一个指定的网络端口,只要应用程序向这个端口里面写数据,这个 Source 组件就可以获取到数据。 其中 Sink:logger, Channel:memory 。

  • 配置如下:

 
   
   
 
  1. # Name the components on this agent

  2. agent.sources = netcat

  3. agent.channels = memoryChannel

  4. agent.sinks = loggerSink

  5. # configure the source

  6. agent.sources.netcat.type = netcat

  7. agent.sources.netcat.bind = 192.168.111.238

  8. agent.sources.netcat.port = 8888

  9. # Describe the sink

  10. agent.sinks.loggerSink.type = logger

  11. # Each channel's type is defined.

  12. agent.channels.memoryChannel.type = memory

  13. # It specifies the capacity of the memory channel

  14. agent.channels.memoryChannel.capacity = 100

  15. # Bind the source and sink to the channel

  16. agent.sources.netcat.channels = memoryChannel

  17. agent.sinks.loggerSink.channel = memoryChannel

运行测试

  • 启动名字为 agent 的 Flume Agent 服务端进程

 
   
   
 
  1. ./bin/flume-ng agent -n agent -c ./conf -f ./conf/flume-netcat.properties -Dflume.root.logger=DEBUG,console

参数说明:

-n 指定 agent 名称(与配置文件中代理的名字相同)

-c 指定 Flume 中配置文件的目录

-f 指定配置文件

-Dflume.root.logger=DEBUG,console 设置日志等级

  • 使用 telnet 发送数据

 
   
   
 
  1. zhwye@ubuntu238:~$ telnet 192.168.111.238 8888

  2. Trying 192.168.111.238...

  3. Connected to 192.168.111.238.

  4. Escape character is '^]'.

  5. big data world

  6. OK

  • 服务端进程收集到的日志数据如下

 
   
   
 
  1. ...

  2. 2018-04-12 07:07:22,965 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:315)] Starting connection handler

  3. 2018-04-12 07:07:31,457 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:327)] Chars read = 16

  4. 2018-04-12 07:07:31,468 (netcat-handler-0) [DEBUG - org.apache.flume.source.NetcatSource$NetcatSocketHandler.run(NetcatSource.java:331)] Events processed = 1

  5. 2018-04-12 07:07:35,191 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 62 69 67 20 64 61 74 61 20 77 6F 72 6C 64 0D    big data world. }

常用配置模式

NetCat Source——HDFS Sink 配置

  • 监听一个指定的网络端口,只要应用程序向这个端口里面写数据,这个 Source 组件就可以获取到数据。 其中 Sink:HDFS, Channel:file 

  • 配置文件

 
   
   
 
  1. # Name the components on this agent

  2. a1.sources = r1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.r1.type = netcat

  7. a1.sources.r1.bind = 192.168.111.238

  8. a1.sources.r1.port = 8888

  9. # Describe the sink

  10. a1.sinks.k1.type = hdfs

  11. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput

  12. a1.sinks.k1.hdfs.writeFormat = Text

  13. a1.sinks.k1.hdfs.fileType = DataStream

  14. a1.sinks.k1.hdfs.rollInterval = 10

  15. a1.sinks.k1.hdfs.rollSize = 0

  16. a1.sinks.k1.hdfs.rollCount = 0

  17. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S

  18. a1.sinks.k1.hdfs.useLocalTimeStamp = true

  19. # Use a channel which buffers events in file

  20. a1.channels.c1.type = file

  21. a1.channels.c1.checkpointDir = /usr/flume/checkpoint

  22. a1.channels.c1.dataDirs = /usr/flume/data

  23. # Bind the source and sink to the channel

  24. a1.sources.r1.channels = c1

  25. a1.sinks.k1.channel = c1

  • 启动 Flume agent a1 服务端

 
   
   
 
  1. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-hdfs.conf   -Dflume.root.logger=DEBUG,console

  • 使用 telnet 发送数据

 
   
   
 
  1. zhwye@ubuntu238:~$ telnet 192.168.111.238 8888

  2. Trying 192.168.111.238...

  3. Connected to 192.168.111.238.

  4. Escape character is '^]'.

  5. big data world

  6. OK

Spooling Directory Source——HDFS Sink 配置

  • 监听一个指定的目录,只要应用程序向这个指定的目录中添加新的文件,Source 组件就可以获取到该o数据,并解析该文件的内容,然后写入到 Channle。写入完成后,标记该文件已完成或者删除该文件。 其中 Sink:HDFS, Channel:file

 
   
   
 
  1. Property Name       Default      Description

  2. channels                

  3. type                            The component type name, needs to be spooldir.

  4. spoolDir                        Spooling Directory Source监听的目录

  5. fileSuffix         .COMPLETED    文件内容写入到channel之后,标记该文件

  6. deletePolicy       never         文件内容写入到channel之后的删除策略: never or immediate

  7. fileHeader         false         Whether to add a header storing the absolute path filename.

  8. ignorePattern      ^$           Regular expression specifying which files to ignore (skip)

  9. interceptors                    指定传输中eventhead(头信息),常用timestamp

  • 配置文件

 
   
   
 
  1. # Name the components on this agent

  2. a1.sources = r1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.r1.type = spooldir

  7. a1.sources.r1.spoolDir = /usr/local/datainput

  8. a1.sources.r1.fileHeader = true

  9. a1.sources.r1.interceptors = i1

  10. a1.sources.r1.interceptors.i1.type = timestamp

  11. # Describe the sink

  12. # Describe the sink

  13. a1.sinks.k1.type = hdfs

  14. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput

  15. a1.sinks.k1.hdfs.writeFormat = Text

  16. a1.sinks.k1.hdfs.fileType = DataStream

  17. a1.sinks.k1.hdfs.rollInterval = 10

  18. a1.sinks.k1.hdfs.rollSize = 0

  19. a1.sinks.k1.hdfs.rollCount = 0

  20. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S

  21. a1.sinks.k1.hdfs.useLocalTimeStamp = true

  22. # Use a channel which buffers events in file

  23. a1.channels.c1.type = file

  24. a1.channels.c1.checkpointDir = /usr/flume/checkpoint

  25. a1.channels.c1.dataDirs = /usr/flume/data

  26. # Bind the source and sink to the channel

  27. a1.sources.r1.channels = c1

  28. a1.sinks.k1.channel = c1

  • 启动 Flume agent a1 服务端

 
   
   
 
  1. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-spool-hdfs.conf   -Dflume.root.logger=DEBUG,console

  • 使用 cp 命令向 Spooling Directory 中发送数据

 
   
   
 
  1. cp datafile  /usr/local/datainput   (注:datafile中的内容为:big data world!)

  • Spooling Directory Source的两个注意事项

 
   
   
 
  1. 1.拷贝到spool目录下的文件不可以再打开编辑

  2. 2.不能将具有相同文件名字的文件拷贝到这个目录下

Exec Source——HDFS Sink 配置

  • 监听一个指定的命令,获取一条命令的结果作为它的数据源 常用的是 tail-F file 指令,只要应用程序向日志(文件)里面写数据,Source 组件就可以获取到日志(文件)中最新的内容 。 其中 Sink:HDFS, Channel:file

  • 配置文件

 
   
   
 
  1. # Name the components on this agent

  2. a1.sources = r1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.r1.type = exec

  7. a1.sources.r1.command = tail -F /usr/local/log.file

  8. # Describe the sink

  9. a1.sinks.k1.type = hdfs

  10. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput

  11. a1.sinks.k1.hdfs.writeFormat = Text

  12. a1.sinks.k1.hdfs.fileType = DataStream

  13. a1.sinks.k1.hdfs.rollInterval = 10

  14. a1.sinks.k1.hdfs.rollSize = 0

  15. a1.sinks.k1.hdfs.rollCount = 0

  16. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S

  17. a1.sinks.k1.hdfs.useLocalTimeStamp = true

  18. # Use a channel which buffers events in file

  19. a1.channels.c1.type = file

  20. a1.channels.c1.checkpointDir = /usr/flume/checkpoint

  21. a1.channels.c1.dataDirs = /usr/flume/data

  22. # Bind the source and sink to the channel

  23. a1.sources.r1.channels = c1

  24. a1.sinks.k1.channel = c1

  • 启动 Flume agent a1 服务端

 
   
   
 
  1. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-exec-hdfs.conf   -Dflume.root.logger=DEBUG,console

  • 使用 echo 命令向 /usr/local/datainput 中发送数据

 
   
   
 
  1. echo  big data > log.file

Avro Source——HDFS Sink 配置

  • 监听一个指定的 Avro 端口,通过 Avro 端口可以获取到 Avro Client 发送过来的文件 。只要应用程序通过 Avro 端口发送文件,Source 组件就可以获取到该文件中的内容。 其中 Sink:HDFS,Channel:file (注:Avro 和 Thrift 都是一些序列化的网络端口–通过这些网络端口可以接受或者发送信息,Avro 可以发送一个给定的文件给 Flume,是 Agent 间的通信协议)

  • 编写配置文件

 
   
   
 
  1. # Name the components on this agent

  2. a1.sources = r1

  3. a1.sinks = k1

  4. a1.channels = c1

  5. # Describe/configure the source

  6. a1.sources.r1.type = avro

  7. a1.sources.r1.bind = 192.168.111.238

  8. a1.sources.r1.port = 4141

  9. # Describe the sink

  10. a1.sinks.k1.type = hdfs

  11. a1.sinks.k1.hdfs.path = hdfs://hadoop238:9000/dataoutput

  12. a1.sinks.k1.hdfs.writeFormat = Text

  13. a1.sinks.k1.hdfs.fileType = DataStream

  14. a1.sinks.k1.hdfs.rollInterval = 10

  15. a1.sinks.k1.hdfs.rollSize = 0

  16. a1.sinks.k1.hdfs.rollCount = 0

  17. a1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S

  18. a1.sinks.k1.hdfs.useLocalTimeStamp = true

  19. # Use a channel which buffers events in file

  20. a1.channels.c1.type = file

  21. a1.channels.c1.checkpointDir = /usr/flume/checkpoint

  22. a1.channels.c1.dataDirs = /usr/flume/data

  23. # Bind the source and sink to the channel

  24. a1.sources.r1.channels = c1

  25. a1.sinks.k1.channel = c1

  • 启动 Flume agent a1 服务端

 
   
   
 
  1. flume-ng  agent -n a1  -c ../conf  -f ../conf/flume-avro-hdfs.conf   -Dflume.root.logger=DEBUG,console

  • 使用avro-client发送文件

 
   
   
 
  1. flume-ng avro-client -c  ../conf  -H 192.168.111.238  -p 4141 -F /usr/local/log.file

总结

总结 Exec Source 和 Spooling Directory Source 是两种常用的日志采集的方式,其中 Exec Source 可以对日志的实时采集,但是当 Flume 不运行或者指令执行出错时,Exec Source 将无法收集到日志数据,日志会出现丢失,从而无法保证收集日志的完整性。而 Spooling Directory Source 在对日志的实时采集上有欠缺。



以上是关于Flume安装及配置的主要内容,如果未能解决你的问题,请参考以下文章

Flume的安装及使用

Flume的安装及使用

Flume的安装及使用

Flume的安装及使用

flume安装

日志采集框架Flume的安装及使用