Flume NG源代码分析支持执行时动态改动配置的配置模块

Posted cxchanpin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume NG源代码分析支持执行时动态改动配置的配置模块相关的知识,希望对你有一定的参考价值。

在上一篇中讲了Flume NG配置模块主要的接口的类,PropertiesConfigurationProvider提供了基于properties配置文件的静态配置的能力,这篇细说一下PollingPropertiesFileConfigurationProvider提供的执行时动态改动配置并生效的能力。


要实现动态改动配置文件并生效,主要有两个待实现的功能

1. 观察配置文件是否改动

2. 假设改动,将改动的内容通知给观察者


对于第一点,监控配置文件是否改动,Flume NG定义了一个FileWatcherRunnable对象来监控配置文件,启动了一个单独的线程採用定时轮询的方式来监控,轮询频率是30毫秒一次。比較file.lastModified属性与lastChange时间戳,当file.lastModified > lastChange时表示文件被改动

public class FileWatcherRunnable implements Runnable {

    private final File file;
    private final CounterGroup counterGroup;

    private long lastChange;

    public FileWatcherRunnable(File file, CounterGroup counterGroup) {
      super();
      this.file = file;
      this.counterGroup = counterGroup;
      this.lastChange = 0L;
    }

    @Override
    public void run() {
      LOGGER.debug("Checking file:{} for changes", file);

      counterGroup.incrementAndGet("file.checks");

      long lastModified = file.lastModified();

      if (lastModified > lastChange) {
        LOGGER.info("Reloading configuration file:{}", file);

        counterGroup.incrementAndGet("file.loads");

        lastChange = lastModified;

        try {
          eventBus.post(getConfiguration());
        } catch (Exception e) {
          LOGGER.error("Failed to load configuration data. Exception follows.",
              e);
        } catch (NoClassDefFoundError e) {
          LOGGER.error("Failed to start agent because dependencies were not " +
              "found in classpath. Error follows.", e);
        } catch (Throwable t) {
          // caught because the caller does not handle or log Throwables
          LOGGER.error("Unhandled error", t);
        }
      }
    }
  }

// PollingPropertiesFileConfigurationProvider.start()启动一个单独的线程来监控properties配置文件
 public void start() {
    LOGGER.info("Configuration provider starting");

    Preconditions.checkState(file != null,
        "The parameter file must not be null");

    executorService = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                .build());

    FileWatcherRunnable fileWatcherRunnable =
        new FileWatcherRunnable(file, counterGroup);

    executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
        TimeUnit.SECONDS);

    lifecycleState = LifecycleState.START;

    LOGGER.debug("Configuration provider started");
  }

对于第二点,利用Guava EventBus提供的公布订阅模式机制,将配置改动封装成事件传递给Application。来又一次载入配置

// FileWatcherRunnable.run方法 公布配置改动的事件
   eventBus.post(getConfiguration());

// Application.main方法来注冊事件订阅
      Application application;
      if(reload) {
        EventBus eventBus = new EventBus(agentName + "-event-bus");
        PollingPropertiesFileConfigurationProvider configurationProvider =
            new PollingPropertiesFileConfigurationProvider(agentName,
                configurationFile, eventBus, 30);
        components.add(configurationProvider);
        application = new Application(components);
        eventBus.register(application);
      }


// Application类採用@Subscribe标注来定义订阅方法,即配置改动后会运行handleConfigurationEvent方法,这种方法是线程安全的

@Subscribe
  public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
    stopAllComponents();
    startAllComponents(conf);
  }



以上是关于Flume NG源代码分析支持执行时动态改动配置的配置模块的主要内容,如果未能解决你的问题,请参考以下文章

Flume-NG源码分析-整体结构及配置载入分析

Flume案例——日志分析采集系统

基础组件1Flume入门Agent

Flume-NG源码分析-整体结构及配置载入分析

Flume NG 学习笔记Sinks和Channel配置

flume-ng 源码分析-整体架构之一启动篇