Flume监控系统源码研究
Posted 泛小船
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume监控系统源码研究相关的知识,希望对你有一定的参考价值。
Flume作为一个数据收集工具,集成了各种通用的Source和Sink,并提供了可自定义扩展的Source和Sink接口,功能可以说是比较全面的。然而每个成功的系统背后都应该有一套系统监控
系统,来监控整个系统的运行状况。Flume也不例外。Flume集成了一个系统监控系统,展示了包括成功收集的日志数量,成功发送的日志数量,flume启动时间,停止时间,以及通道容量等监控数据,来帮助用户了解Flume的运行情况,分析数据收集瓶颈,解决数据丢失等问题。
Flume开发了两种监控方式:Http监控和Ganglia监控。Http的监控可以通过在flume启动命令中增加配置
-Dflume.monitoring.type=http-Dflume.monitoring.port=1234
其中-Dflume.monitoring.type=http表示使用http方式来监控,后面的-Dflume.monitoring.port=1234表示我们需要启动的监控服务的端口号为1234,这个端口号可以自己随意配置。Flume的监控数据是以json格式展示。
Ganglia的监控可以通过在启动命令中增加配置
-Dflume.monitoring.type=ganglia-Dflume.monitoring.hosts=ip:port
配置说明可以参考Http监控。
当然用户可以自定义监控类型,只需要扩展org.apache.flume.instrumentation.MonitorService接口并在conf文件夹的配置文件*.properties中注册flume.monitoring.type为自定义类型即可。
监控数据
Flume分为Source,Channel,Sink组件,监控系统分别监控三个组件并展示每个组件的监控数据。
配置Http监控,打开配置对应的IP,可以看到以JSON为格式的监控数据。其中Source组件的监控数据以SOURCE开头:
"SOURCE.src_example":{"KafkaEventGetTimer":"0","AppendBatchAcceptedCount":"0","EventAcceptedCount":"0","AppendReceivedCount":"0","StartTime": "1502945975203","AppendBatchReceivedCount":"0","KafkaCommitTimer":"0","EventReceivedCount": "0","Type":"SOURCE","KafkaEmptyCount":"0","AppendAcceptedCount": "0","OpenConnectionCount":"0","StopTime": "0"}
从上面可以看出这是一个Kafka的Source,并且刚刚启动并没有接受数据。
从上面JSON能够得到的数据:OpenConnectionCount(打开的连接数)、Type(组件类型)、AppendBatchAcceptedCount(追加到channel中的批数量)、AppendBatchReceivedCount(source端刚刚追加的批数量)、EventAcceptedCount(成功放入channel的event数量)、AppendReceivedCount(source追加目前收到的数量)、StartTime(组件开始时间)、StopTime(组件停止时间)、EventReceivedCount(source端成功收到的event数量)、AppendAcceptedCount(放入channel的event数量)等。
CHANNEL是flume的一个通道组件,对数据有一个缓存的作用。能够得到的数据:
EventPutSuccessCount(成功放入channel的event数量)、ChannelFillPercentage(通道使用比例)、Type(组件类型)、EventPutAttemptCount(尝试放入将event放入channel的次数)、ChannelSize(目前在channel中的event数量)、StartTime(组件开始时间)、StopTime(组件停止时间)、EventTakeSuccessCount(从channel中成功取走的event数量)、ChannelCapacity(通道容量)、EventTakeAttemptCount(尝试从channel中取走event的次数)等。
SINK是数据即将离开flume的最后一个组件,它从channel中取走数据,然后发送到缓存系统或者持久化数据库。能得到数据:
BatchCompleteCount(完成的批数量)、ConnectionFailedCount(连接失败数)、EventDrainAttemptCount(尝试提交的event数量)、ConnectionCreatedCount(创建连接数)、Type(组件类型)、BatchEmptyCount(批量取空的数量)、ConnectionClosedCount(关闭连接数量)、EventDrainSuccessCount(成功发送event的数量)、StartTime(组件开始时间)、StopTime(组件停止时间)、BatchUnderflowCount(正处于批量处理的batch数)等。
监控源码分析
分析启动脚本bin/flume-ng可以发现脚本以org.apache.flume.node.Application为入口。
Application实例初始化时会调用startAllComponents()方法,这个方法又会调用loadMonitoring()方法,在loadMonitoring的最后初始化MonitorService实例。
HTTPMetricsServer是MonitorService的一个实现类,即Http方式的监控类。
分析该类发现它会启动一个jetty容器,实现AbstractHandler的handle方法通过JMX获取监控数据并解析成JSON字符串发送给Web客户端。
GangliaServer是Ganglia监控的实现类。该类的实现比较复杂,主要思想也就是启动一个线程定时从JMX中获取监控数据并传输到Ganglia服务器。
既然无论HTTPMetricsServer还是GangliaServer都会通过JMXPollUtil.getAllMBeans()获取监控数据,那我们就进入这个方法。mbeanServer.queryMBeans(null,null)会取得所有的MBeans。那么我们就希望找到Flume里的MBean实例:ChannelCounterMBean、SinkCounterMBean和SourceCounterMBean。
同时这些MBean的实现都会继承MonitoredCounterGroup类。阅读该类得出这个类的作用是初始化JMX平台并注册MBean。他还提供基本的多线程的统计数字累加方法。
所以现在整个监控系统的思路就很清楚了:系统启动是初始化JMX监控管理器,并注册Source、Channle和Sink组件。每次组件进行操作,监控对象调用MBean累加值。
同时监控服务器定期(Ganglia)或者每次查询时(Http)调用JMX监控服务获取监控数据并传输到服务器上。
整个过程的图如下图所示:
参考资料:http://blog.csdn.net/u014039577/article/details/51536753
以上是关于Flume监控系统源码研究的主要内容,如果未能解决你的问题,请参考以下文章
Flume源码分析flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用? ERROR hdfs.BucketWriter: Hit max consecutive und