flume监控分析

Posted ty_laurel

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume监控分析相关的知识,希望对你有一定的参考价值。

flume 监控分析

由启动flume时可以发现,flume 入口函数位于flume-ng-node目录中的Application.java文件中:

 
  1. Info: Sourcing environment configuration script /home/bjtianye1/apache-flume-1.7.0-bin/conf/flume-env.sh
  2. Info: Including Hadoop libraries found via (/home/hadoop/hadoop//bin/hadoop) for HDFS access
  3. Info: Including HBASE libraries found via (/home/hbase/hbase/bin/hbase) for HBASE access
  4. Info: Including Hive libraries found via () for Hive access
  5. + exec /home/jdk//bin/java -Xmx20m -Dflume.root.logger=DEBUG,console,LOGFILE -Xms100m -Xmx2000m -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=10.172.152.66:8666 -DisGanglia3=true -cp '/home/bjtianye1/apache-flume-1.7.0-bin/conf:/home/hadoop/hadoop-2.5.2/lib/native org.apache.flume.node.Application -f conf/flume.conf -n agent1

根据命令中含有no-reload-conf(默认为动态加载)参数,决定采用那种加载配置文件方式:一、没有此参数,会动态加载配置文件,默认每30秒加载一次配置文件,因此可以动态修改配置文件;二、有此参数,则只在启动时加载一次配置文件。实现动态加载功能采用了发布订阅模式,使用guava中的EventBus实现。

main方法首先解析各个传入的参数,然后调用handleConfigurationEvent方法,在该方法中调用startAllComponents方法,在该方法中调用loadMonitoring方法加载监控的一些信息,并开始监控服务monitorServer.start()(monitorServer实现了接口MonitorService,该接口中只有两个方法start和stop)

 
  1. private void loadMonitoring()
  2. ...
  3. monitorServer.configure(context);来加载监控服务的配置信息;
  4. monitorServer.start();启动监控服务
  5. ...

查看该start 方法的具体实现: 

monitorServer有两种,GangliaServer和HTTPMetricsServer,都实现了MonitorService接口。

 
  1. /**
  2. * Start this server, causing it to poll JMX at the configured frequency.
  3. */
  4. @Override
  5. public void start()
  6. try
  7. socket = new DatagramSocket();
  8. hostname = InetAddress.getLocalHost().getHostName();
  9. catch (SocketException ex)
  10. logger.error("Could not create socket for metrics collection.");
  11. throw new FlumeException(
  12. "Could not create socket for metrics collection.", ex);
  13. catch (Exception ex2)
  14. logger.warn("Unknown error occured", ex2);
  15. for (HostInfo host : hosts)
  16. addresses.add(new InetSocketAddress(
  17. host.getHostName(), host.getPortNumber()));
  18. collectorRunnable.server = this;
  19. if (service.isShutdown() || service.isTerminated())
  20. /*
  21. * 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
  22. * (注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,
  23. * 一个新线程会代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定
  24. * 的时间不会有多个线程是活动的。与其他等效的 newScheduledThreadPool(1) 不同,
  25. * 可保证无需重新配置此方法所返回的执行程序即可使用其他的线程
  26. *
  27. */
  28. service = Executors.newSingleThreadScheduledExecutor();
  29. /* scheduleWithFixedDelay(Runnable command, 要执行的任务
  30. * long initialDelay, 首次执行的延迟时间
  31. * long delay, 一次执行终止和下一次执行开始之间的延迟
  32. * TimeUnit unit) initialDelay 和 delay 参数的时间单位
  33. * 创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次
  34. * 执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,
  35. * 只能通过执行程序的取消或终止方法来终止该任务。
  36. */
  37. service.scheduleWithFixedDelay(collectorRunnable, 0,
  38. pollFrequency, TimeUnit.SECONDS);

start()方法中会调用collectorRunnable(一个实现了Runnable的类GangliaCollector )

 
  1. /**GangliaServer.java
  2. * Worker which polls JMX for all mbeans with
  3. * @link javax.management.ObjectName within the flume namespace:
  4. * org.apache.flume. All attributes of such beans are sent to the all hosts
  5. * specified by the server that owns it's instance.
  6. * 这些bean的所有属性都发送到拥有它的实例的服务器指定的所有主机
  7. */
  8. protected class GangliaCollector implements Runnable
  9. private GangliaServer server;
  10. @Override
  11. public void run()
  12. try
  13. Map<String, Map<String, String>> metricsMap =
  14. JMXPollUtil.getAllMBeans(); //具体的监控数据从这获得,通过JMX方式得到
  15. for (String component : metricsMap.keySet())
  16. Map<String, String> attributeMap = metricsMap.get(component);
  17. for (String attribute : attributeMap.keySet())
  18. if (isGanglia3)
  19. server.createGangliaMessage(GANGLIA_CONTEXT + component + "."
  20. + attribute,
  21. attributeMap.get(attribute));
  22. else
  23. server.createGangliaMessage31(GANGLIA_CONTEXT + component + "."
  24. + attribute,
  25. attributeMap.get(attribute));
  26. server.sendToGangliaNodes(); //发送metrics数据到ganglia节点
  27. catch (Throwable t)
  28. logger.error("Unexpected error", t);
  29. flume 监控目录操作

    flume-ng源码分析-核心组件分析

    Flume日志收集系统架构详解

    如何监控flume的重复读取数据的问题

    Flume基础:自定义 Interceptor

    Flume日志收集系统架构详解--转