Flume 使用学习小结

Posted 阿里云云栖号

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume 使用学习小结相关的知识,希望对你有一定的参考价值。

来这里找志同道合的小伙伴!


Flume 使用学习小结


正文
Flume 使用学习小结


1

概   述



在做埋点数据离线存储到odps中,用到了Flume。一边使用,一边学习了下Flume。其中感受到Flume确实易伸缩、易扩展。其中的组件都可以根据自己的业务特点方便的自定义使用。


Flume可进行大量日志数据采集、聚合和并转移到存储中,并提供数据在流转中的事务机制;


可适用场景:日志--->Flume--->实时计算(如MQ+Storm) 、日志--->Flume--->离线计算(如ODPS、HDFS、HBase)、日志--->Flume--->ElasticSearch等。


2

Flume架构


Flume主要分为 Source、Channel、Sink三个组件,他们包含在一个Agent中,一个Agent相当于一个独立的application。数据从源头经过Agent的这几个组件最后到达目的地。一个Flume 服务可同时运行多个Agent,大致架构可参照下图:


Flume 使用学习小结


对照这个图,作一些说明:


Event 一条日志数据在Flume中对应一个Event对象。不过给他添加了header属性,就是一个Map,放 一些额外信息,可以针对每条Event做特殊处理,比如Channel的选择。这些额外的键值对可以在Event从Source到Channel之间的interceptor(拦截器)中set。


Source 负责日志流入,比如从文件、网络、MQ等数据源流入数据。


Channel 负责数据聚合/暂存,以供Sink消费掉,事务机制主要在这里实现

Sink 负责数据转移到存储,比如从Channel拿到日志后直接存储到ODPS、ElasticSearch等。


拦截器 如果配置了拦截器,则Event从Source 进入Channel前,经过拦截器链做过滤或其他处理;如识别不需要的数据等。


选择器 Flume默认实现有ReplicatingChannelSelector(复制,Event可同时发往多个Channel)和MultiplexingChannelSelector(复用,可根据header中某个字段值,发往不同的Channel)。


下图是个简单的多Channel、Sink情况;Flume还包含一些其他的高级的特性和使用方法,有时间可以继续研究。


Flume 使用学习小结


3

Flume实际使用


现在做的埋点数据导入ODPS的情况是,每天夜里1点左右把前一天的日志文件copy到Flume监控的目录,Flume处理新加入的文件。最终数据存储到ODPS。


遇到的问题:


日志数据中包含空行等不正确格式的记录,导致从Channel中take日志记录后保存到ODPS失败;失败的操作被事务回滚,结果是数据流传在这个地方错误循环下去。


日志数据按实际生成的日期为分区保存在OPDS表的分区中,导入日志数据的日期为实际日期的后一天。


在尝试了几种方法后,最后选择自定义了一个拦截器实现


(UaLogFilteringInterceptor),能很好的达到目前的需求,他主要做如下两件事:


过滤掉不需要、不规范的数据,并且把过滤掉的这些数据存储到指定的文件里,每天一个文件(如果有异常记录)。


在每条Event的header中加入qt值(qt值为前一天的日期,格式为yyyyMMdd),每条Event根据该值保持到ODPS的对应表分区中。


这里顺带说下ODPS的Flume插件,他主要根据ODPS的特点自定义实现了一个Sink,在Flume的配置文件中配置使用该Sink,配置好该Sink的各个配置项,主要包含连接ODPS和使用对应表的。

4

说说Flume的事务


主要用到事务实现有针对MemoryChannel的MemoryTransaction和FileChannel的FileBackedTransaction;



Event从Source PUT到Channel和从Channel Take到Sink后落地,这两个步骤都包裹在事务中;我这里说下MemoryTransaction大致实现。


MemoryTransaction 主要用到了两个双向阻塞队列(LinkedBlockingDeque)putList和takeList作为缓冲区,同时配合使用MemoryChannel中的LinkedBlockingDeque queue;队列的大小通过Flume的配置初始化好;


  • PUT事务


    1. 批量数据循环PUT到putList中


    2. Commit,把putList队列中数据offer到queue队列中,然后释放信号量,清空(clear)putList队列


    3. Rollback,清空(clear)putList队列 这里其实没有做太多事。


  • Take事务



    1. 检查takeList队列大小是否够用,从queue队列中poll


    2. Event到takeList队列中


    3. Commit,表明被Sink正确消费掉,清空(clear)takeList队列


    4. Rollback,异常出现,则把takeList队列中的Event返还到queue队列顶部


Flume 使用学习小结


点击“阅读原文”可查看原文。




想和这群聪明人共事吗?加入阿里云云栖社区(全职/兼职):yqeditor@list.alibaba-inc.com


投稿或入驻云栖社区,请联系:yqeditor@list.alibaba-inc.com


2016,为了实现更多技术梦想,云栖社区与你携手并行。

yunqiinsight

长按二维码,一网打尽所有深度技术文章


戳原文,更有料!

以上是关于Flume 使用学习小结的主要内容,如果未能解决你的问题,请参考以下文章

RxJava学习小结

python学习小结

python学习小结

python学习小结

UML学习小结

javascript 学习小结 by FungLeo