Flume 源码解析:组件生命周期

Posted Python数据平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Flume 源码解析:组件生命周期相关的知识,希望对你有一定的参考价值。

Apache Flume 是数据仓库体系中用于做实时 ETL 的工具。它提供了丰富的数据源和写入组件,这些组件在运行时都由 Flume 的生命周期管理机制进行监控和维护。本文将对这部分功能的源码进行解析。

项目结构

Flume 的源码可以从 GitHub 上下载。它是一个 Maven 项目,我们将其导入到 IDE 中以便更好地进行源码阅读。以下是代码仓库的基本结构:

 
   
   
 
  1. /flume-ng-node

  2. /flume-ng-code

  3. /flume-ng-sdk

  4. /flume-ng-sources/flume-kafka-source

  5. /flume-ng-channels/flume-kafka-channel

  6. /flume-ng-sinks/flume-hdfs-sink

程序入口

Flume Agent 的入口 main 函数位于 flume-ng-node 模块的 org.apache.flume.node.Application 类中。下列代码是该函数的摘要:

 
   
   
 
  1. public class Application {

  2.  public static void main(String[] args) {

  3.    CommandLineParser parser = new GnuParser();

  4.    if (isZkConfigured) {

  5.      if (reload) {

  6.        PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider;

  7.        components.add(zookeeperConfigurationProvider);

  8.      } else {

  9.        StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider;

  10.        application.handleConfigurationEvent();

  11.      }

  12.    } else {

  13.      // PropertiesFileConfigurationProvider

  14.    }

  15.    application.start();

  16.    Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {

  17.      @Override

  18.      public void run() {

  19.        appReference.stop();

  20.      }

  21.    });

  22.  }

  23. }

启动过程说明如下:

  1. 使用 commons-cli 对命令行参数进行解析,提取 Agent 名称、配置信息读取方式及其路径信息;

  2. 配置信息可以通过文件或 ZooKeeper 的方式进行读取,两种方式都支持热加载,即我们不需要重启 Agent 就可以更新配置内容:

    • 基于文件的配置热加载是通过一个后台线程对文件进行轮询实现的;

    • 基于 ZooKeeper 的热加载则是使用了 Curator 的 NodeCache 模式,底层是 ZooKeeper 原生的监听(Watch)特性。

  3. 如果配置热更新是开启的(默认开启),配置提供方 ConfigurationProvider 就会将自身注册到 Agent 程序的组件列表中,并在 Application#start 方法调用后,由 LifecycleSupervisor 类进行启动和管理,加载和解析配置文件,从中读取组件列表。

  4. 如果热更新未开启,则配置提供方将在启动时立刻读取配置文件,并由 LifecycleSupervisor 启动和管理所有组件。

  5. 最后, main 会调用 Runtime#addShutdownHook,当 JVM 关闭时(SIGTERM 或者 Ctrl+C), Application#stop 会被用于关闭 Flume Agent,使各组件优雅退出。

配置重载

PollingPropertiesFileConfigurationProvider 类中,当文件内容更新时,它会调用父类的 AbstractConfigurationProvider#getConfiguration 方法,将配置内容解析成 MaterializedConfiguration 实例,这个对象实例中包含了数据源(Source)、目的地(Sink)、以及管道(Channel)组件的所有信息。随后,这个轮询线程会通过 Guava 的 EventBus 机制通知 Application类配置发生了更新,从而触发 Application#handleConfigurationEvent 方法,重新加载所有的组件。

 
   
   
 
  1. // Application 类

  2. @Subscribe

  3. public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {

  4.  stopAllComponents();

  5.  startAllComponents(conf);

  6. }

  7. // PollingPropertiesFileConfigurationProvider$FileWatcherRunnable 内部类

  8. @Override

  9. public void run() {

  10.  eventBus.post(getConfiguration());

  11. }

启动组件

组件启动的流程位于 Application#startAllComponents 方法中。这个方法接收到新的组件信息后,首先将启动所有的 Channel,然后启动 SinkSource

 
   
   
 
  1. private void startAllComponents(MaterializedConfiguration materializedConfiguration) {

  2.  this.materializedConfiguration = materializedConfiguration;

  3.  for (Entry<String, Channel> entry :

  4.      materializedConfiguration.getChannels().entrySet()) {

  5.    supervisor.supervise(entry.getValue(),

  6.        new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);

  7.  }

  8.  //  等待所有管道启动完毕

  9.  for (Channel ch : materializedConfiguration.getChannels().values()) {

  10.    while (ch.getLifecycleState() != LifecycleState.START

  11.        && !supervisor.isComponentInErrorState(ch)) {

  12.      Thread.sleep(500);

  13.    }

  14.  }

  15.  // 相继启动目的地和数据源组件

  16. }

LifecycleSupervisor 类(代码中的 supervisor 变量)可用于管理实现了 LifecycleAware 接口的组件。该类会初始化一个 MonitorRunnable,每三秒轮询一次组件状态,通过 LifecycleAware#startstop 方法,保证其始终处于 desiredState 变量所指定的状态。

 
   
   
 
  1. public static class MonitorRunnable implements Runnable {

  2.  @Override

  3.  public void run() {

  4.    if (!lifecycleAware.getLifecycleState().equals(

  5.        supervisoree.status.desiredState)) {

  6.      switch (supervisoree.status.desiredState) {

  7.        case START:

  8.          lifecycleAware.start();

  9.          break;

  10.        case STOP:

  11.          lifecycleAware.stop();

  12.      }

  13.    }

  14.  }

  15. }

停止组件

当 JVM 关闭时,钩子函数会调用 Application#stop 方法,进而调用 LifecycleSupervisor#stop。该方法首先停止所有的 MonitorRunnable 线程,将组件目标状态置为 STOP,并调用 LifecycleAware#stop方法命其优雅终止。

 
   
   
 
  1. public class LifecycleSupervisor implements LifecycleAware {

  2.  @Override

  3.  public synchronized void stop() {

  4.    monitorService.shutdown();

  5.    for (final Entry<LifecycleAware, Supervisoree> entry :

  6.        supervisedProcesses.entrySet()) {

  7.      if (entry.getKey().getLifecycleState().equals(LifecycleState.START)) {

  8.        entry.getValue().status.desiredState = LifecycleState.STOP;

  9.        entry.getKey().stop();

  10.      }

  11.    }

  12.  }

  13. }

Source 与 SourceRunner

对于单个组件的生命周期,我们以 KafkaSource 为例:

 
   
   
 
  1. public class KafkaSource extends AbstractPollableSource {

  2.  @Override

  3.  protected void doStart() throws FlumeException {

  4.    consumer = new KafkaConsumer<String, byte[]>(kafkaProps);

  5.    it = consumer.poll(1000).iterator();

  6.  }

  7.  @Override

  8.  protected void doStop() throws FlumeException {

  9.    consumer.close();

  10.  }

  11. }

KafkaSource 被定义成轮询式的数据源,也就是说我们需要使用一个线程不断对其进行轮询,查看是否有数据可以供处理:

 
   
   
 
  1. public class PollableSourceRunner extends SourceRunner {

  2.  @Override

  3.  public void start() {

  4.    source.start();

  5.    runner = new PollingRunner();

  6.    runnerThread = new Thread(runner);

  7.    runnerThread.start();

  8.    lifecycleState = LifecycleState.START;

  9.  }

  10.  @Override

  11.  public void stop() {

  12.    runnerThread.interrupt();

  13.    runnerThread.join();

  14.    source.stop();

  15.    lifecycleState = LifecycleState.STOP;

  16.  }

  17.  // 轮询线程

  18.  public static class PollingRunner implements Runnable {

  19.    @Override

  20.    public void run() {

  21.      while (!shouldStop.get()) {

  22.        source.process();

  23.      }

  24.    }

  25.  }

  26. }

AbstractPollableSourceSourceRunner 都实现了 LifecycleAware 接口,因此都有 startstop方法。但是,只有 SourceRunner 会由 LifecycleSupervisor 管理, PollableSource 则是附属于 SourceRunner 的一个组件。我们可以在 AbstractConfigurationProvider#loadSources 中看到配置关系:

 
   
   
 
  1. private void loadSources(Map<String, SourceRunner> sourceRunnerMap) {

  2.  Source source = sourceFactory.create();

  3.  Configurables.configure(source, config);

  4.  sourceRunnerMap.put(comp.getComponentName(),

  5.      SourceRunner.forSource(source));

  6. }

参考资料

  • https://github.com/apache/flume

  • https://flume.apache.org/FlumeUserGuide.html

  • https://kafka.apache.org/0100/javadoc/index.html


以上是关于Flume 源码解析:组件生命周期的主要内容,如果未能解决你的问题,请参考以下文章

Android Lifecycle源码解析

3. Jetpack源码解析---用Lifecycles管理生命周期

3. Jetpack源码解析---用Lifecycles管理生命周期

导航上的片段生命周期重叠

CVB生命周期(APIView源码解析)

Flume-ng源码解析之Channel组件