Flume:流式数据收集利器
Posted 奇点
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume:流式数据收集利器相关的知识,希望对你有一定的参考价值。
在数据生命周期里的第一环就是数据收集。收集通常有两种办法,一种是周期性批处理拷贝,一种是流式收集。今天我们就说说流式收集利器Flume怎么使用。
使用flume收集数据保存到多节点 by 尹会生
1 使用flume 收集数据到hdfs
由于工作的需要,领导要求收集公司所有在线服务器节点的文本数据,进行存储分析,从网上做了些比较,发现flume 是个简单实现,而且非常强大的工具,这里介绍给大家
首先下载软件:http://flume.apache.org
flume是著名的开源数据收集系统,采用java语言开发,主要工作逻辑可以分成source sinks 和channels 三个部分,
source 是收集器,官方提供了多种收集方式,能够支持文件和常用接口;
sinks是存储器,兼容了大部分市面上看到的文件系统类型,而且还可以将数据传递到下一节点中;
channels用了连接source和sinks;而且还可以使用一个source连接多个channel(sinks)进行传输链路的高可用。
我这里的传感器数据被统一收集到了nginx中,因此只要实现将nginx数据输出到hdfs就可以完成汇总了,为了便于分析,nginx的数据打印到了一个固定文件名的文件中,每天分割一次。那么flume一直监视这个文件就可以持续收集数据到hdfs了。通过官方文档发现flume的tail方式很好用,这里就使用了exec类型的source收集数据。接下来我们看下主要配置文件
# 指定souce sink channel 分别用什么名称,后面配置文件会调用
agent1.sources = s1
agent1.sinks = k1
agent1.channels = c1
# source的类型为执行tail命令,跟踪文件变化来进行数据读取
agent1.sources.s1.type = exec
agent1.sources.s1.command = tail -F /usr/local/openresty/nginx/data/user_info.data
agent1.sources.s1.channels = c1
# 使用主机名来作为存储hdfs的目录,区分不同的主机收集数据
agent1.sources.s1.interceptors = i1
agent1.sources.s1.interceptors.i1.type = host
agent1.sources.s1.interceptors.i1.hostHeader = hostname
# 存储方式使用hdfs
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.hdfs.path =hdfs://NameNodeIP:9000/tmp/nginx/%y-%m-%d/%H
# 存储的目录格式
agent1.sinks.k1.hdfs.filePrefix = %{hostname}/events-
# 设置存储间隔和文件分割,根据自己的实际文件大小自由调整
agent1.sinks.k1.hdfs.batchSize= 500
agent1.sinks.k1.hdfs.fileType = DataStream
agent1.sinks.k1.hdfs.writeFormat =Text
agent1.sinks.k1.hdfs.rollSize = 100
agent1.sinks.k1.hdfs.rollCount = 10000
agent1.sinks.k1.hdfs.rollInterval = 600
agent1.sinks.k1.hdfs.useLocalTimeStamp = True
# 设置中间缓存为内存方式还是磁盘方式
agent1.channels.c1.type = memory
agent1.channels.c1.keep-alive = 120
agent1.channels.c1.capacity = 500000
agent1.channels.c1.transactionCapacity = 600
# 将source 、sink 和channel关联起来
agent1.sources.s1.channels = c1
agent1.sinks.k1.channel = c1
配置好之后启动
cd /usr/local/flume160/
bin/flume-ng agent -c conf -n agent1 -f conf/flume-conf.properties -Dflume.root.logger=INFO,console
查看hdfs 发现我要采集的数据已经都在hdfs中
hadoop dfs -ls /tmp/nginx/15-12-31/17/172.24.150.74
要对hdfs中的数据分析,可以根据数据的格式制作hive表格,然后进行分析就可以了,非常方便吧!
2 收集数据到多个数据源
完成了领导的任务,继续研究下flume的其他强大功能,测试了一下上面提到的数据同时推送到其他节点的功能,使用的方法就是指定多个channel和sink,这里以收集到其他节点存储为文件格式为例,需要做以下修改
# agent1 的channel和sink改为2个
agent1.channels = c1 c2
agent1.sinks = k1 k2
# 第二个sink改为数据收集节点的ip和端口
agent1.sinks.k2.type = avro
agent1.sinks.k2.hostname=ReceiveIP
agent1.sinks.k2.port = 8888
agent1.sinks.k2.channel = c2
# 第二个channel配置和第一个channel一样
agent1.channels.c2.type = memory
agent1.channels.c2.keep-alive = 120
agent1.channels.c2.capacity = 500000
agent1.channels.c2.transactionCapacity = 600
# 将第二个sink连接起来
agent1.sources.s1.channels = c2
agent1.sinks.k2.channel = c2
第二个节点参考上面来配置一套flume,这里使用了FILE_ROLL的sink类型,保存数据到文件
# agent2的基本定义
agent2.sources = s1
agent2.sinks = k1
agent2.channels = c1
# 利用avro RPC 接收 agent1 传过来的数据
agent2.sources.s1.type = avro
agent2.sources.s1.bind = ReceiveIP
agent2.sources.s1.port = 8888
agent2.sources.s1.thread = 5
agent2.sources.s1.channels = c1
# 使用文件接收,这里可以定义文件名称等,省略
agent2.sinks.k1.type = FILE_ROLL
agent2.sinks.k1.sink.directory = /tmp/flume-fileout
agent2.sinks.k1.channel = c1
# 和第一节点一样
agent2.channels.c1.type = memory
agent2.channels.c1.keep-alive = 120
agent2.channels.c1.capacity = 500000
agent2.channels.c1.transactionCapacity = 600
# 连接source和sink
agent2.sources.s1.channels = c1
agent2.sinks.k1.channel = c1
第二节点启动起来
bin/flume-ng agent -c conf -n agent2 -f conf/flume-conf.properties -Dflume.root.logger=INFO,console
之后再次查看,发现数据在hdfs和第二个节点的/tmp/flume-fileout目录都保存了一份数据。这里只是简化功能,可以根据这种方法将采集的数据放入到实时计算,如spark streaming 、storm 等工具中,更能发挥它的价值。
关注大数据尖端技术发展,关注奇点大数据
以上是关于Flume:流式数据收集利器的主要内容,如果未能解决你的问题,请参考以下文章