flume原理及代码实现

Posted 栀子花开~

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume原理及代码实现相关的知识,希望对你有一定的参考价值。

转载标明出处:http://www.cnblogs.com/adealjason/p/6240122.html

 

最近想玩一下流计算,先看了flume的实现原理及源码

源码可以去apache 官网下载

下面整理下flume的原理及代码实现:

flume是一个实时数据收集工具,hadoop的生态圈之一,主要用来在分布式环境下各服务器节点做数据收集,然后汇总到统一的数据存储平台,flume支持多种部署架构模式,单点agent部署,分层架构模式部署,如通过一个负载均衡agent将收集的数据分发到各个子agent,然后在汇总到同一个agent上,数据传输到统一的数据存储平台,再次不多废话,flume支持的部署架构图可以参见源码中的doc目录下的图片

 

 

flume原理:

目前最新版本是Flume NG,以下基于Flume NG来说:

flume由以下几个核心概念:

flume event:flume内部的数据单元,包含两部分,一个头结点,一个body结点,头结点是一个Map<String, String>,部署的agent结点可以通过现有的Interceptor或者自定义Interceptor往消息头里放置数据,如ip,hostname等标识消息来源于哪台服务器,event在flume内部做流转,是数据传输的载体

flume source:source是flume的数据来源,flume支持多种数据来源,如taildir监控一个文件的变化,spollDir监控一个文件夹的变化,jmsSource接收jms消息等,最常用的avroSource是构成flume分层架构的基础,source是一个接口,flume提供了多种消息接入方式,在sourceType枚举类中都有详细列出,特殊说明下,由于flume是面向接口编程,其中有一个Other的枚举,是占位符,使用者可以自定义source源,只要求在flume启动的时候可以加载到这个类即可(底层是通过反射获取到class的实例的)

flume channel:flume是基于pipeline的模式,channel的存在丰富了flume的数据传播途径,channel可以再source和sink之间做缓冲,动态调节数据的收集及发送(内部有一个xxxCounter会没接收到一个event或者发送一个event都会做记录),缓冲source和sink之间的压力,其二channel可以关联多个source,如一个source可以按照配置选择的将数据复制到各个管道,或者按照消息头自动分发到指定的管道,一个channel可以接多个sink,这个实现了同一份数据的多发发送池,实现了数据的复用及负载均衡等功能,channel内部流转的数据载体是event,flume channel支持多种数据缓冲实现方式,如fileChannel:用一个文件做数据缓存、memoryChannel:使用内存缓存,底层实现是一个LinkedBlockingDeque,一个双向阻塞列表,具体可参见ChannelType

flume sink:flume的数据发送池,主要负责数据的发送,从channel接收到event,然后发送到指定的数据接收方,flume提供多种sink实现,具体可参见SinkType,常用的有:loggerSink:这个主要用于flume的部署调试,它会将接收到的event事件直接用log4j输出出来,RollingFileSink:这个sink主要是将接收到的日志文件序列化到一个文件目录中,所以需要配置文件的地址,切分文件的频率等,avroSink:这个是flume分层架构中最常用的sink,一般和avroSource配对使用,avro是apache的一个子项目,用于数据的序列化,使用avroSource及avroSink时,需要在avroSource的agent节点服务器上监听一个端口,avroSink的agent把接收到的数据发送到该ip、port上即完成了flume的分层部署,avro仅是一个数据序列化工具,底层实现由一个RpcClient的东东来将数据在这source和sink之间传输(可以留一下启动日志,会自动创建一个RpcClient),当然,flume的编码是按照面向接口来的,所以和source一样支持自定义的sink

上述是几个核心的概念,正式由于flume的这种设计思想及编码风格,让flume有很强的拓展性

 

当然仅仅有这几个还是不可以完全让flume运行起来的,flume提供了很多辅助类用于驱动、分发内部event及整个flume系统的运转,基本如下:

配置领域:

AgentConfiguration:这个看名字就知道是flume的配置元素领域内的东西,是的,使用者在flume-conf.properties中配置的数据解析成AgentConfiguration,是配置文件到面向对象的一个抽象

AbstractConfigurationProvider:该类看名字就是一个抽象的配置Provider类,内部有一个很重要的方法就是:getConfiguration(),该方法中通过如下几个private方法来加载flume的channel、source、sink、sinkGroups并将它们关联起来

        loadChannels(agentConf, channelComponentMap);

        loadSources(agentConf, channelComponentMap, sourceRunnerMap);

        loadSinks(agentConf, channelComponentMap, sinkRunnerMap);

flume还支持动态加载,PollingPropertiesFileConfigurationProvider(AbstractConfigurationProvider的一个具体实现)在flume启动的时候会启动一个线程FileWatcherRunnable,监控flume的配置文件变化,配置文件内部加载用的是google的EventBus来驱动的

驱动领域:

flume的source有如下两个子接口:PollableSource和EventDrivenSource,前者需要自己去轮循的访问数据源,当前是否可以加载到数据,如果有则加载进来转换成flume的event,实现类有taildir、spollDir、jsmSource、kafkaSource等,该接口新增了一个process方法用于轮循调用,后者是一个事件驱动的Source,该接口不需要主动去访问数据源,仅需要接收数据推动过来的event并转换成flume的event即可,实现类有:scribeSource(该数据源用来打通Facebook的scribe数据收集工具)、AvroSource等

SourceRunner:

由于这两个source的存在,所以所以flume提供了两个sourceRunner来驱动source的运行,分别是PollableSourceRunner和EventDrivenSourceRunner,前者启动时自动启动一个PollingRunner线程用于定时轮循process方法

channelProcessor:

该类用于source到channel之间的数据发送,实现了一个source可以关联到多个channel,简单点如这2个接口,source的定义:setChannelProcessor(ChannelProcessor channelProcessor)指定一个ChannelProcessor ,ChannelProcessor 关联到一个final的ChannelSelector,selector关联到Channel:setChannels(List<Channel> channels)

ChannelProcessor:

关联到指定的ChannelSelector,ChannelSelector提供了两种selector方式,ReplicatingChannelSelector:将source的event复制到各个channel中,MultiplexingChannelSelector:根据头结点的header信息自动路由到对应的Channel中

Transaction及BasicTransactionSemantics

flume的Channel内部保证一个event的发送在一个事务完成,如果发送失败或者接收失败则回滚,当成功时才从channel中删除掉该event

SinkProcessor:

用过选择要发送的sink,什么意思呢?该类有两个实现:

LoadBalancingSinkProcessor:

负载均衡方式:提供了roud_bin算法和random算法、以及固定order算法的实现方式,将Channel中的event发送到多个sink上

FailoverSinkProcessor:

可以实现实现failover功能,具体流程类似LoadBalancingSinkProcessor,区别是FailoverSinkProcessor维护了一个PriorityQueue,用来根据权重选择sink

SinkRunner:

该类用于驱动一个sink,启动是内部开了一个线程PollingRunner,定时的调用SinkProcessor

上述是所有的核心概念及代码作用,下面描述下flume的运行流程:

1.系统启动时通过配置领域可以按照客户定义的配置加载一个flume

2.SourceRunner和SinkProcessor同时启动,一个往Channel中生产event,一个从Channel中消费event,内部是一个生产者消费者模式

3.通过一些辅助类,实现Channel到source及sink的多路分发及分层架构

 

下面是一个自己搭建的flume配置文件,供参考:

实现流程:

负载均衡+分发+落地到日志文件

1.负载均衡节点:

从两个文件源读数据,在event头里增加数据来源标识,复制到两个channel中,一个log打印,一个做负载均衡分发到另外两台机器的agent上,负载均衡算法采用roud_robin

loadBalancAgent.sources = taildirSrc

loadBalancAgent.channels = memoryChannel fileChannel

loadBalancAgent.sinks = loggerSink1 loggerSink2 loggerSink3

loadBalancAgent.sinkgroups = loadBalanceGroups

## taildirSrc config

loadBalancAgent.sources.taildirSrc.type = TAILDIR

loadBalancAgent.sources.taildirSrc.positionFile = /alidata1/admin/openSystem/flumetest/log/taildir_position.json

loadBalancAgent.sources.taildirSrc.filegroups = f1 f2

loadBalancAgent.sources.taildirSrc.filegroups.f1 = /alidata1/admin/dts-server-web/dts-server.log

loadBalancAgent.sources.taildirSrc.headers.f1.headerKey1 = dts-server-log

loadBalancAgent.sources.taildirSrc.filegroups.f2 = /alidata1/admin/flume/test.log

loadBalancAgent.sources.taildirSrc.headers.f2.headerKey1 = flume-test-log

loadBalancAgent.sources.taildirSrc.fileHeader = true

## replicating channel config

loadBalancAgent.sources.taildirSrc.selector.type = replicating

loadBalancAgent.sources.taildirSrc.channels = memoryChannel fileChannel

loadBalancAgent.sources.taildirSrc.selector.optional = fileChannel

## memory chanel config

loadBalancAgent.channels.memoryChannel.type = memory

loadBalancAgent.channels.memoryChannel.capacity = 10000

loadBalancAgent.channels.memoryChannel.transactionCapacity = 10000

loadBalancAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

loadBalancAgent.channels.memoryChannel.byteCapacity = 800000

## file channel config

loadBalancAgent.channels.fileChannel.type = file

loadBalancAgent.channels.fileChannel.checkpointDir = /alidata1/admin/openSystem/flumetest/log

loadBalancAgent.channels.fileChannel.dataDirs = /alidata1/admin/openSystem/flumetest/data

## loadbalance sink processor

loadBalancAgent.sinkgroups.loadBalanceGroups.sinks = loggerSink1 loggerSink2

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.type = load_balance

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.backoff = true

loadBalancAgent.sinkgroups.loadBalanceGroups.processor.selector = round_robin

## loggerSink1 config

loadBalancAgent.sinks.loggerSink1.type = avro

loadBalancAgent.sinks.loggerSink1.channel = memoryChannel

loadBalancAgent.sinks.loggerSink1.hostname = 10.253.42.162

loadBalancAgent.sinks.loggerSink1.port = 4141

## loggerSink2 config

loadBalancAgent.sinks.loggerSink2.type = avro

loadBalancAgent.sinks.loggerSink2.channel = memoryChannel

loadBalancAgent.sinks.loggerSink2.hostname = 10.139.53.6

loadBalancAgent.sinks.loggerSink2.port = 4141

## loggerSink3 config

loadBalancAgent.sinks.loggerSink3.type = file_roll

loadBalancAgent.sinks.loggerSink3.channel = fileChannel

loadBalancAgent.sinks.loggerSink3.sink.rollInterval = 0

loadBalancAgent.sinks.loggerSink3.sink.directory = /alidata1/admin/openSystem/flumetest/dtsServerLog

2.负载均衡节点1

接收avroSink并落地到文件中

dispatchAgent.sources= avroSrc

dispatchAgent.channels=memoryChannel

dispatchAgent.sinks=loggerSink

## avroSrc config

dispatchAgent.sources.avroSrc.type = avro

dispatchAgent.sources.avroSrc.channels = memoryChannel

dispatchAgent.sources.avroSrc.bind = 0.0.0.0

dispatchAgent.sources.avroSrc.port = 4141

## memoryChannel config

dispatchAgent.channels.memoryChannel.type = memory

dispatchAgent.channels.memoryChannel.capacity = 10000

dispatchAgent.channels.memoryChannel.transactionCapacity = 10000

dispatchAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

dispatchAgent.channels.memoryChannel.byteCapacity = 800000

## loggerSink config

dispatchAgent.sinks.loggerSink.type = logger

dispatchAgent.sinks.loggerSink.channel = memoryChannel

3.负载均衡节点2

dispatchAgent.sources= avroSrc

dispatchAgent.channels=memoryChannel

dispatchAgent.sinks=loggerSink

## avroSrc config

dispatchAgent.sources.avroSrc.type = avro

dispatchAgent.sources.avroSrc.channels = memoryChannel

dispatchAgent.sources.avroSrc.bind = 0.0.0.0

dispatchAgent.sources.avroSrc.port = 4141

## memoryChannel config

dispatchAgent.channels.memoryChannel.type = memory

dispatchAgent.channels.memoryChannel.capacity = 10000

dispatchAgent.channels.memoryChannel.transactionCapacity = 10000

dispatchAgent.channels.memoryChannel.byteCapacityBufferPercentage = 20

dispatchAgent.channels.memoryChannel.byteCapacity = 800000

## loggerSink config

dispatchAgent.sinks.loggerSink.type = logger

dispatchAgent.sinks.loggerSink.channel = memoryChannel

以上是关于flume原理及代码实现的主要内容,如果未能解决你的问题,请参考以下文章

SVD原理及代码实现

DNSLog原理及代码实现

十大排序算法(原理及代码实现细节)

浅析SkipList跳跃表原理及代码实现

Java 网络爬虫获取网页源代码原理及实现

OpenCV环境下实现图像任意角度旋转的原理及代码