Flume监控系统源码研究

Posted 泛小船

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume监控系统源码研究相关的知识,希望对你有一定的参考价值。

Flume作为一个数据收集工具,集成了各种通用的SourceSink,并提供了可自定义扩展的SourceSink接口,功能可以说是比较全面的。然而每个成功的系统背后都应该有一套系统监控

系统,来监控整个系统的运行状况。Flume也不例外。Flume集成了一个系统监控系统,展示了包括成功收集的日志数量,成功发送的日志数量,flume启动时间,停止时间,以及通道容量等监控数据,来帮助用户了解Flume的运行情况,分析数据收集瓶颈,解决数据丢失等问题。

Flume开发了两种监控方式:Http监控和Ganglia监控。Http的监控可以通过在flume启动命令中增加配置

-Dflume.monitoring.type=http-Dflume.monitoring.port=1234 

其中-Dflume.monitoring.type=http表示使用http方式来监控,后面的-Dflume.monitoring.port=1234表示我们需要启动的监控服务的端口号为1234,这个端口号可以自己随意配置。Flume的监控数据是以json格式展示。

Ganglia的监控可以通过在启动命令中增加配置

-Dflume.monitoring.type=ganglia-Dflume.monitoring.hosts=ip:port 

配置说明可以参考Http监控。

当然用户可以自定义监控类型,只需要扩展org.apache.flume.instrumentation.MonitorService接口并在conf文件夹的配置文件*.properties中注册flume.monitoring.type为自定义类型即可。


监控数据

Flume分为SourceChannelSink组件,监控系统分别监控三个组件并展示每个组件的监控数据。

配置Http监控,打开配置对应的IP,可以看到以JSON为格式的监控数据。其中Source组件的监控数据以SOURCE开头:

"SOURCE.src_example":{"KafkaEventGetTimer":"0","AppendBatchAcceptedCount":"0","EventAcceptedCount":"0","AppendReceivedCount":"0","StartTime": "1502945975203","AppendBatchReceivedCount":"0","KafkaCommitTimer":"0","EventReceivedCount": "0","Type":"SOURCE","KafkaEmptyCount":"0","AppendAcceptedCount": "0","OpenConnectionCount":"0","StopTime": "0"}

从上面可以看出这是一个KafkaSource,并且刚刚启动并没有接受数据。

从上面JSON能够得到的数据:OpenConnectionCount(打开的连接数)、Type(组件类型)、AppendBatchAcceptedCount(追加到channel中的批数量)、AppendBatchReceivedCountsource端刚刚追加的批数量)、EventAcceptedCount(成功放入channelevent数量)、AppendReceivedCountsource追加目前收到的数量)、StartTime(组件开始时间)、StopTime(组件停止时间)、EventReceivedCountsource端成功收到的event数量)、AppendAcceptedCount(放入channelevent数量)等。

CHANNELflume的一个通道组件,对数据有一个缓存的作用。能够得到的数据:

EventPutSuccessCount(成功放入channelevent数量)、ChannelFillPercentage(通道使用比例)、Type(组件类型)、EventPutAttemptCount(尝试放入将event放入channel的次数)、ChannelSize(目前在channel中的event数量)、StartTime(组件开始时间)、StopTime(组件停止时间)、EventTakeSuccessCount(从channel中成功取走的event数量)、ChannelCapacity(通道容量)、EventTakeAttemptCount(尝试从channel中取走event的次数)等。

SINK是数据即将离开flume的最后一个组件,它从channel中取走数据,然后发送到缓存系统或者持久化数据库。能得到数据:

BatchCompleteCount(完成的批数量)ConnectionFailedCount(连接失败数)、EventDrainAttemptCount(尝试提交的event数量)、ConnectionCreatedCount(创建连接数)、Type(组件类型)、BatchEmptyCount(批量取空的数量)、ConnectionClosedCount(关闭连接数量)、EventDrainSuccessCount(成功发送event的数量)、StartTime(组件开始时间)、StopTime(组件停止时间)、BatchUnderflowCount(正处于批量处理的batch数)等。


监控源码分析

分析启动脚本bin/flume-ng可以发现脚本以org.apache.flume.node.Application为入口。

Application实例初始化时会调用startAllComponents()方法,这个方法又会调用loadMonitoring()方法,在loadMonitoring的最后初始化MonitorService实例。

HTTPMetricsServerMonitorService的一个实现类,即Http方式的监控类。

分析该类发现它会启动一个jetty容器,实现AbstractHandlerhandle方法通过JMX获取监控数据并解析成JSON字符串发送给Web客户端。

GangliaServerGanglia监控的实现类。该类的实现比较复杂,主要思想也就是启动一个线程定时从JMX中获取监控数据并传输到Ganglia服务器。

既然无论HTTPMetricsServer还是GangliaServer都会通过JMXPollUtil.getAllMBeans()获取监控数据,那我们就进入这个方法。mbeanServer.queryMBeans(null,null)会取得所有的MBeans。那么我们就希望找到Flume里的MBean实例:ChannelCounterMBeanSinkCounterMBeanSourceCounterMBean

同时这些MBean的实现都会继承MonitoredCounterGroup类。阅读该类得出这个类的作用是初始化JMX平台并注册MBean。他还提供基本的多线程的统计数字累加方法。

所以现在整个监控系统的思路就很清楚了:系统启动是初始化JMX监控管理器,并注册SourceChannleSink组件。每次组件进行操作,监控对象调用MBean累加值。

同时监控服务器定期(Ganglia)或者每次查询时(Http)调用JMX监控服务获取监控数据并传输到服务器上。

整个过程的图如下图所示:


 

参考资料:http://blog.csdn.net/u014039577/article/details/51536753

 


以上是关于Flume监控系统源码研究的主要内容,如果未能解决你的问题,请参考以下文章

flume-ng 源码分析-整体架构之一启动篇

Flume源码分析flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用? ERROR hdfs.BucketWriter: Hit max consecutive und

Flume 源码解析:组件生命周期

flume+sparkStreaming实例 实时监控文件demo

Flume-NG源码分析-整体结构及配置载入分析

Flume源码分析01-概述及环境搭建