flume之hdfsSink分析
Posted ty_laurel
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume之hdfsSink分析相关的知识,希望对你有一定的参考价值。
概述
前边分析了flume的 Source 和 MemoryChannel 两个组件,接下来分析下第三个大组件 Sink。Sink组件主要用于从Channel 中拉取数据至下一个flume agent 或者目的存储对象(如HDFS)。
要分析Sink,就来先看下Sink接口的定义:
public interface Sink extends LifecycleAware, NamedComponent
/**
* 设置Channel
*/
public void setChannel(Channel channel);
/**
* 返回具体sink的channel
*/
public Channel getChannel();
/**
* 请求的Sink尝试从连接的Channel消费数据data。这个方法应该在一个事务范围内从Channel消费。
* 成功分发事务应该被提交。失败应该回退。
* 如果有 1个或多个Event被成功分发,则为READY;
* 如果没有数据从Channel取回放至sink,则为BACKOFF
* 在任何类型的故障传递数据到下一跳目的地的情况下,抛出异常EventDeliveryException
*/
public Status process() throws EventDeliveryException;
public static enum Status
READY, BACKOFF
从入口Application类出发
从前边的分析可知,flume系统的入口为 Application 类,在该类中会依次启动 Channel、Sink、Source三个组件。从启动代码分析可以发现Sink组件启动调用对象 Sink运行器SinkRunner的start方法,该操作发生在MonitorRunnable线程中调用lifecycleAware.start()方法启动一个Sink组件,eclipse查看该方法具体Sink相关实现如下:
@Override
public void start()
SinkProcessor policy = getPolicy(); //获取
policy.start();
runner = new PollingRunner();
runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean(); //以原子方式创建 Boolean值,默认为false
runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
runnerThread.start(); //启动线程
lifecycleState = LifecycleState.START;
在该start方法中首先会获取一个SinkProcessor,指定线程的属性(policy、counterGroup 、shouldStop )并且启动它。然后会创建一个线程PollingRunner,调用线程的run方法:
@Override
public void run()
logger.debug("Polling sink runner starting");
while (!shouldStop.get())
try
if (policy.process().equals(Sink.Status.BACKOFF))
counterGroup.incrementAndGet("runner.backoffs");
Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* backoffSleepIncrement, maxBackoffSleep));
else
counterGroup.set("runner.backoffs.consecutive", 0L);
catch (InterruptedException e)
......
logger.debug("Polling runner exiting. Metrics:", counterGroup);
在run方法中可以发现使用while循环(直到设置shouldStop为true结束循环)调用SinkProcessor中的process方法进行下一步的处理。
Sink处理器
SinkProcessor就是Sink处理器,那么SinkRunner运行器和SinlkProcessor处理器有什么不同呢?其实SinkRunner实际上主要就是运行Sink的(Sink启动入口首先就是调用该对象,相比于Source也有其SourceRunner),而 SinkProcessor 决定究竟哪个 Sink 应该从自己对应的 Channel 中拉取事件。
为什么需要SinkProcessor呢?
Flume可以聚合线程到Sink组,每个Sink组可以包含一个或多个Sink,如果一个Sink没有定义Sink组,那么该Sink可以被认为是在一个组内,且该Sink是组内的唯一成员。Flume会为每一个Sink组实例化一个SinkRunner运行器,来运行该 Sink 组。如下Sink组件框架图:
了解了Sink处理器,接下来查看下都有哪些SinkProcessor,如下图有两种实现,
-
- 基于抽象类AbstractSinkProcessor实现:
实现的子类有FailoverSinkProcessor和LoadBalancingSinkProcessor,适用于配置有Sink组的情况;FailoverSinkProcessor是故障转移处理器,从Sink组中以优先级的顺序选择Sink,直至失败再选择组中第二优先级高的Sink处理;LoadBalancingSinkProcessor是负载均衡处理器,Sink选择顺序支持Random(随机)或者Round-robin(轮询)。
- 基于抽象类AbstractSinkProcessor实现:
-
2.是flume系统默认的Sink处理器类DefaultSinkProcessor,只接受一个单一的Sink,没有任何额外的处理(相比于第一种)传递process的处理结果。
若没有配置Sink组,采用的默认就是DefaultSinkProcessor类中的process,该方法中因为不需要做任何的额外处理,代码也是十分的简单,直接调用Sink 的process方法(也就是配置中具体定义的sink,比如写入HDFS中,那就是调用hdfsSink):
@Override
public Status process() throws EventDeliveryException
return sink.process();
HDFSEventSink process方法
HDFSEventSink 的process方法是Sink组件的核心代码,其中实现了Sink的event事务处理。每一种具体的sink都必须实现process方法,目前1.7版本自带如下:
在HDFSEventSink.java中
/**
* 从channel拉数据发送到HDFS。每个事务可以取出batchSize个events.
* 找到event对应的存储桶bucket。确保文件打开。序列化数据写入到HDFS上的文件中。
* 这个方法不是线程安全的。
*/
public Status process() throws EventDeliveryException
// 获取管道channel
Channel channel = getChannel();
Transaction transaction = channel.getTransaction(); //getTransaction获取或创建事务Transaction
List<BucketWriter> writers = Lists.newArrayList();
transaction.begin(); //事务开始
try
int txnEventCount = 0;
//从channel中取出batchSize个event
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++)
Event event = channel.take();
if (event == null) flume1.8 使用指南学习感悟