flume之hdfsSink分析

Posted ty_laurel

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume之hdfsSink分析相关的知识,希望对你有一定的参考价值。

概述

前边分析了flume的 Source 和 MemoryChannel 两个组件,接下来分析下第三个大组件 Sink。Sink组件主要用于从Channel 中拉取数据至下一个flume agent 或者目的存储对象(如HDFS)。

要分析Sink,就来先看下Sink接口的定义:

 
  1. public interface Sink extends LifecycleAware, NamedComponent
  2. /**
  3. * 设置Channel
  4. */
  5. public void setChannel(Channel channel);
  6. /**
  7. * 返回具体sink的channel
  8. */
  9. public Channel getChannel();
  10. /**
  11. * 请求的Sink尝试从连接的Channel消费数据data。这个方法应该在一个事务范围内从Channel消费。
  12. * 成功分发事务应该被提交。失败应该回退。
  13. * 如果有 1个或多个Event被成功分发,则为READY;
  14. * 如果没有数据从Channel取回放至sink,则为BACKOFF
  15. * 在任何类型的故障传递数据到下一跳目的地的情况下,抛出异常EventDeliveryException
  16. */
  17. public Status process() throws EventDeliveryException;
  18. public static enum Status
  19. READY, BACKOFF

从入口Application类出发

从前边的分析可知,flume系统的入口为 Application 类,在该类中会依次启动 Channel、Sink、Source三个组件。从启动代码分析可以发现Sink组件启动调用对象 Sink运行器SinkRunner的start方法,该操作发生在MonitorRunnable线程中调用lifecycleAware.start()方法启动一个Sink组件,eclipse查看该方法具体Sink相关实现如下: 

  进入SinkRunner类中的 start 方法如下:

 
  1. @Override
  2. public void start()
  3. SinkProcessor policy = getPolicy(); //获取
  4. policy.start();
  5. runner = new PollingRunner();
  6. runner.policy = policy;
  7. runner.counterGroup = counterGroup;
  8. runner.shouldStop = new AtomicBoolean(); //以原子方式创建 Boolean值,默认为false
  9. runnerThread = new Thread(runner);
  10. runnerThread.setName("SinkRunner-PollingRunner-" +
  11. policy.getClass().getSimpleName());
  12. runnerThread.start(); //启动线程
  13. lifecycleState = LifecycleState.START;

在该start方法中首先会获取一个SinkProcessor,指定线程的属性(policy、counterGroup 、shouldStop )并且启动它。然后会创建一个线程PollingRunner,调用线程的run方法:

 
  1. @Override
  2. public void run()
  3. logger.debug("Polling sink runner starting");
  4. while (!shouldStop.get())
  5. try
  6. if (policy.process().equals(Sink.Status.BACKOFF))
  7. counterGroup.incrementAndGet("runner.backoffs");
  8. Thread.sleep(Math.min(
  9. counterGroup.incrementAndGet("runner.backoffs.consecutive")
  10. * backoffSleepIncrement, maxBackoffSleep));
  11. else
  12. counterGroup.set("runner.backoffs.consecutive", 0L);
  13. catch (InterruptedException e)
  14. ......
  15. logger.debug("Polling runner exiting. Metrics:", counterGroup);

在run方法中可以发现使用while循环(直到设置shouldStop为true结束循环)调用SinkProcessor中的process方法进行下一步的处理。

Sink处理器

SinkProcessor就是Sink处理器,那么SinkRunner运行器和SinlkProcessor处理器有什么不同呢?其实SinkRunner实际上主要就是运行Sink的(Sink启动入口首先就是调用该对象,相比于Source也有其SourceRunner),而 SinkProcessor 决定究竟哪个 Sink 应该从自己对应的 Channel 中拉取事件。 
为什么需要SinkProcessor呢? 
Flume可以聚合线程到Sink组,每个Sink组可以包含一个或多个Sink,如果一个Sink没有定义Sink组,那么该Sink可以被认为是在一个组内,且该Sink是组内的唯一成员。Flume会为每一个Sink组实例化一个SinkRunner运行器,来运行该 Sink 组。如下Sink组件框架图: 

  SinkRunner运行器运行一个Sink Group(如图组内有Sink1、Sink2、Sink3),Sink运行器仅仅是一个询问 Sink组来处理下一批事件的线程。每个Sink组有一个SinkProcessor处理器,该处理器将调用组中某一个Sink的process方法处理事件。 
了解了Sink处理器,接下来查看下都有哪些SinkProcessor,如下图有两种实现, 

    1. 基于抽象类AbstractSinkProcessor实现: 
      实现的子类有FailoverSinkProcessor和LoadBalancingSinkProcessor,适用于配置有Sink组的情况;FailoverSinkProcessor是故障转移处理器,从Sink组中以优先级的顺序选择Sink,直至失败再选择组中第二优先级高的Sink处理;LoadBalancingSinkProcessor是负载均衡处理器,Sink选择顺序支持Random(随机)或者Round-robin(轮询)。
  • 2.是flume系统默认的Sink处理器类DefaultSinkProcessor,只接受一个单一的Sink,没有任何额外的处理(相比于第一种)传递process的处理结果。

若没有配置Sink组,采用的默认就是DefaultSinkProcessor类中的process,该方法中因为不需要做任何的额外处理,代码也是十分的简单,直接调用Sink 的process方法(也就是配置中具体定义的sink,比如写入HDFS中,那就是调用hdfsSink):

 
  1. @Override
  2. public Status process() throws EventDeliveryException
  3. return sink.process();

HDFSEventSink process方法

HDFSEventSink 的process方法是Sink组件的核心代码,其中实现了Sink的event事务处理。每一种具体的sink都必须实现process方法,目前1.7版本自带如下: 

在HDFSEventSink.java中

 
  1. /**
  2. * 从channel拉数据发送到HDFS。每个事务可以取出batchSize个events.
  3. * 找到event对应的存储桶bucket。确保文件打开。序列化数据写入到HDFS上的文件中。
  4. * 这个方法不是线程安全的。
  5. */
  6. public Status process() throws EventDeliveryException
  7. // 获取管道channel
  8. Channel channel = getChannel();
  9. Transaction transaction = channel.getTransaction(); //getTransaction获取或创建事务Transaction
  10. List<BucketWriter> writers = Lists.newArrayList();
  11. transaction.begin(); //事务开始
  12. try
  13. int txnEventCount = 0;
  14. //从channel中取出batchSize个event
  15. for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++)
  16. Event event = channel.take();
  17. if (event == null) flume1.8 使用指南学习感悟

    每日一题Flume HDFS Sink小文件处理(顺丰)

    理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理

    flume 监控hive日志文件

    日志收集系统之Apache Flume

    重磅:Flume1-7结合kafka讲解