Flume:流式数据收集利器

Posted 奇点

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume:流式数据收集利器相关的知识,希望对你有一定的参考价值。

在数据生命周期里的第一环就是数据收集。收集通常有两种办法,一种是周期性批处理拷贝,一种是流式收集。今天我们就说说流式收集利器Flume怎么使用。


使用flume收集数据保存到多节点 by 尹会生


1 使用flume 收集数据到hdfs


由于工作的需要,领导要求收集公司所有在线服务器节点的文本数据,进行存储分析,从网上做了些比较,发现flume 是个简单实现,而且非常强大的工具,这里介绍给大家


首先下载软件:http://flume.apache.org


flume是著名的开源数据收集系统,采用java语言开发,主要工作逻辑可以分成source sinks 和channels 三个部分,

source 是收集器,官方提供了多种收集方式,能够支持文件和常用接口;

sinks是存储器,兼容了大部分市面上看到的文件系统类型,而且还可以将数据传递到下一节点中;

channels用了连接source和sinks;而且还可以使用一个source连接多个channel(sinks)进行传输链路的高可用。


我这里的传感器数据被统一收集到了nginx中,因此只要实现将nginx数据输出到hdfs就可以完成汇总了,为了便于分析,nginx的数据打印到了一个固定文件名的文件中,每天分割一次。那么flume一直监视这个文件就可以持续收集数据到hdfs了。通过官方文档发现flume的tail方式很好用,这里就使用了exec类型的source收集数据。接下来我们看下主要配置文件


# 指定souce sink channel 分别用什么名称,后面配置文件会调用

agent1.sources = s1

agent1.sinks = k1

agent1.channels = c1


# source的类型为执行tail命令,跟踪文件变化来进行数据读取

agent1.sources.s1.type = exec

agent1.sources.s1.command = tail -F /usr/local/openresty/nginx/data/user_info.data

agent1.sources.s1.channels = c1


# 使用主机名来作为存储hdfs的目录,区分不同的主机收集数据

agent1.sources.s1.interceptors = i1

agent1.sources.s1.interceptors.i1.type = host

agent1.sources.s1.interceptors.i1.hostHeader = hostname



# 存储方式使用hdfs

agent1.sinks.k1.type = hdfs

agent1.sinks.k1.hdfs.path =hdfs://NameNodeIP:9000/tmp/nginx/%y-%m-%d/%H

# 存储的目录格式

agent1.sinks.k1.hdfs.filePrefix = %{hostname}/events-

# 设置存储间隔和文件分割,根据自己的实际文件大小自由调整

agent1.sinks.k1.hdfs.batchSize= 500

agent1.sinks.k1.hdfs.fileType = DataStream

agent1.sinks.k1.hdfs.writeFormat =Text

agent1.sinks.k1.hdfs.rollSize = 100

agent1.sinks.k1.hdfs.rollCount = 10000

agent1.sinks.k1.hdfs.rollInterval = 600

agent1.sinks.k1.hdfs.useLocalTimeStamp = True


# 设置中间缓存为内存方式还是磁盘方式

agent1.channels.c1.type = memory

agent1.channels.c1.keep-alive = 120

agent1.channels.c1.capacity = 500000

agent1.channels.c1.transactionCapacity = 600


# 将source 、sink 和channel关联起来

agent1.sources.s1.channels = c1

agent1.sinks.k1.channel = c1


配置好之后启动

cd /usr/local/flume160/

bin/flume-ng agent -c conf -n agent1 -f conf/flume-conf.properties -Dflume.root.logger=INFO,console


查看hdfs 发现我要采集的数据已经都在hdfs中

hadoop dfs -ls /tmp/nginx/15-12-31/17/172.24.150.74

要对hdfs中的数据分析,可以根据数据的格式制作hive表格,然后进行分析就可以了,非常方便吧!



2 收集数据到多个数据源

完成了领导的任务,继续研究下flume的其他强大功能,测试了一下上面提到的数据同时推送到其他节点的功能,使用的方法就是指定多个channel和sink,这里以收集到其他节点存储为文件格式为例,需要做以下修改


# agent1 的channel和sink改为2个

agent1.channels = c1 c2

agent1.sinks = k1 k2


# 第二个sink改为数据收集节点的ip和端口

agent1.sinks.k2.type = avro

agent1.sinks.k2.hostname=ReceiveIP

agent1.sinks.k2.port = 8888

agent1.sinks.k2.channel = c2


# 第二个channel配置和第一个channel一样

agent1.channels.c2.type = memory

agent1.channels.c2.keep-alive = 120

agent1.channels.c2.capacity = 500000

agent1.channels.c2.transactionCapacity = 600


# 将第二个sink连接起来

agent1.sources.s1.channels = c2

agent1.sinks.k2.channel = c2




第二个节点参考上面来配置一套flume,这里使用了FILE_ROLL的sink类型,保存数据到文件


# agent2的基本定义

agent2.sources = s1

agent2.sinks = k1

agent2.channels = c1


# 利用avro RPC 接收 agent1 传过来的数据

agent2.sources.s1.type = avro

agent2.sources.s1.bind = ReceiveIP

agent2.sources.s1.port = 8888

agent2.sources.s1.thread = 5

agent2.sources.s1.channels = c1


# 使用文件接收,这里可以定义文件名称等,省略

agent2.sinks.k1.type = FILE_ROLL

agent2.sinks.k1.sink.directory = /tmp/flume-fileout

agent2.sinks.k1.channel = c1


# 和第一节点一样

agent2.channels.c1.type = memory

agent2.channels.c1.keep-alive = 120

agent2.channels.c1.capacity = 500000

agent2.channels.c1.transactionCapacity = 600


# 连接source和sink

agent2.sources.s1.channels = c1

agent2.sinks.k1.channel = c1



第二节点启动起来

bin/flume-ng agent -c conf -n agent2 -f conf/flume-conf.properties -Dflume.root.logger=INFO,console



之后再次查看,发现数据在hdfs和第二个节点的/tmp/flume-fileout目录都保存了一份数据。这里只是简化功能,可以根据这种方法将采集的数据放入到实时计算,如spark streaming 、storm 等工具中,更能发挥它的价值。


关注大数据尖端技术发展,关注奇点大数据



以上是关于Flume:流式数据收集利器的主要内容,如果未能解决你的问题,请参考以下文章

Flume框架基础

Flume 介绍及安装

FlumeFlume基础之安装与使用

sparkstreaming+flume+kafka实时流式处理完整流程

flume初识

大数据组件--Flume