flume监控分析
Posted ty_laurel
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume监控分析相关的知识,希望对你有一定的参考价值。
flume 监控分析
由启动flume时可以发现,flume 入口函数位于flume-ng-node目录中的Application.java文件中:
Info: Sourcing environment configuration script /home/bjtianye1/apache-flume-1.7.0-bin/conf/flume-env.sh
Info: Including Hadoop libraries found via (/home/hadoop/hadoop//bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/hbase/hbase/bin/hbase) for HBASE access
Info: Including Hive libraries found via () for Hive access
+ exec /home/jdk//bin/java -Xmx20m -Dflume.root.logger=DEBUG,console,LOGFILE -Xms100m -Xmx2000m -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=10.172.152.66:8666 -DisGanglia3=true -cp '/home/bjtianye1/apache-flume-1.7.0-bin/conf:/home/hadoop/hadoop-2.5.2/lib/native org.apache.flume.node.Application -f conf/flume.conf -n agent1
根据命令中含有no-reload-conf(默认为动态加载)参数,决定采用那种加载配置文件方式:一、没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件;二、有此参数,则只在启动时加载一次配置文件。实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。
main方法首先解析各个传入的参数,然后调用handleConfigurationEvent方法,在该方法中调用startAllComponents方法,在该方法中调用loadMonitoring方法加载监控的一些信息,并开始监控服务monitorServer.start()(monitorServer实现了接口MonitorService,该接口中只有两个方法start和stop)
private void loadMonitoring()
...
monitorServer.configure(context);来加载监控服务的配置信息;
monitorServer.start();启动监控服务
...
查看该start 方法的具体实现:
monitorServer有两种,GangliaServer和HTTPMetricsServer,都实现了MonitorService接口。
/**
* Start this server, causing it to poll JMX at the configured frequency.
*/
@Override
public void start()
try
socket = new DatagramSocket();
hostname = InetAddress.getLocalHost().getHostName();
catch (SocketException ex)
logger.error("Could not create socket for metrics collection.");
throw new FlumeException(
"Could not create socket for metrics collection.", ex);
catch (Exception ex2)
logger.warn("Unknown error occured", ex2);
for (HostInfo host : hosts)
addresses.add(new InetSocketAddress(
host.getHostName(), host.getPortNumber()));
collectorRunnable.server = this;
if (service.isShutdown() || service.isTerminated())
/*
* 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
* (注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,
* 一个新线程会代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定
* 的时间不会有多个线程是活动的。与其他等效的 newScheduledThreadPool(1) 不同,
* 可保证无需重新配置此方法所返回的执行程序即可使用其他的线程
*
*/
service = Executors.newSingleThreadScheduledExecutor();
/* scheduleWithFixedDelay(Runnable command, 要执行的任务
* long initialDelay, 首次执行的延迟时间
* long delay, 一次执行终止和下一次执行开始之间的延迟
* TimeUnit unit) initialDelay 和 delay 参数的时间单位
* 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次
* 执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,
* 只能通过执行程序的取消或终止方法来终止该任务。
*/
service.scheduleWithFixedDelay(collectorRunnable, 0,
pollFrequency, TimeUnit.SECONDS);
start()方法中会调用collectorRunnable(一个实现了Runnable的类GangliaCollector )
/**GangliaServer.java
* Worker which polls JMX for all mbeans with
* @link javax.management.ObjectName within the flume namespace:
* org.apache.flume. All attributes of such beans are sent to the all hosts
* specified by the server that owns it's instance.
* 这些bean的所有属性都发送到拥有它的实例的服务器指定的所有主机
*/
protected class GangliaCollector implements Runnable
private GangliaServer server;
@Override
public void run()
try
Map<String, Map<String, String>> metricsMap =
JMXPollUtil.getAllMBeans(); //具体的监控数据从这获得,通过JMX方式得到
for (String component : metricsMap.keySet())
Map<String, String> attributeMap = metricsMap.get(component);
for (String attribute : attributeMap.keySet())
if (isGanglia3)
server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
+ attribute,
attributeMap.get(attribute));
else
server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
+ attribute,
attributeMap.get(attribute));
server.sendToGangliaNodes(); //发送metrics数据到ganglia节点
catch (Throwable t)
logger.error("Unexpected error", t);
flume 监控目录操作