理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理

Posted 尧字节

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 相关的知识,希望对你有一定的参考价值。

基于ThriftSource,MemoryChannel,HdfsSink三个组件,对Flume数据传输的事务进行分析,如果使用的是其他组件,Flume事务具体的处理方式将会不同。

Flume的事务处理原理: 

Flume在对Channel进行Put和Take操作的时候,必须要用事物包住,比如:

  1. Channel ch = new MemoryChannel();
  2. Transaction txn = ch.getTransaction();
  3. //事物开始
  4. txn.begin();
  5. try {
  6.    Event eventToStage = EventBuilder.withBody(\"Hello Flume!\",
  7.    Charset.forName(\"UTF-8\"));
  8.    //往临时缓冲区Put数据
  9.    ch.put(eventToStage);
  10.    //或者ch.take()
  11.    //将这些数据提交到channel中
  12.     txn.commit();
  13. } catch (Throwable t) {
  14.   txn.rollback();
  15.   if (t instanceof Error) {
  16.     throw (Error)t;
  17.   }
  18. } finally {
  19.   txn.close();
  20. }
Put事务流程

Put事务可以分为以下阶段:

  • doPut:将批数据先写入临时缓冲区putList
  • doCommit:检查channel内存队列是否足够合并。
  • doRollback:channel内存队列空间不足,抛弃数据   (这个地方个人理解可能会存在数据丢失)

我们从Source数据接收到写入Channel这个过程对Put事物进行分析。

技术分享

ThriftSource会spawn多个Worker线程(ThriftSourceHandler)去处理数据,Worker处理数据的接口,我们只看batch批量处理这个接口:

  1. @Override
  2. public Status appendBatch(List<ThriftFlumeEvent> events) throws TException { 
  3.       List<Event> flumeEvents = Lists.newArrayList(); 
  4.       for(ThriftFlumeEvent event : events) { 
  5.         flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); 
  6.       } 
  7.       //ChannelProcessor,在Source初始化的时候传进来.将数据写入对应的Channel
  8.       getChannelProcessor().processEventBatch(flumeEvents); 
  9.         ... 
  10.       return Status.OK; 
  11.     }


事务逻辑都在processEventBatch这个方法里:

  1. public void processEventBatch(List<Event> events) {
  2.     ...
  3.     //预处理每行数据,有人用来做ETL嘛
  4.     events = interceptorChain.intercept(events);
  5.     ...
  6.     //分类数据,划分不同的channel集合对应的数据
  7.     // Process required channels
  8.     Transaction tx = reqChannel.getTransaction();
  9.     ...
  10.         //事务开始,tx即MemoryTransaction类实例
  11.         tx.begin();
  12.         List<Event> batch = reqChannelQueue.get(reqChannel);
  13.         for (Event event : batch) {
  14.           // 这个put操作实际调用的是transaction.doPut
  15.           reqChannel.put(event);
  16.         }
  17.         //提交,将数据写入Channel的队列中
  18.         tx.commit();
  19.       } catch (Throwable t) {
  20.         //回滚
  21.         tx.rollback();
  22.         ...
  23.       }
  24.     }
  25.     ...
  26.   }

 

每个Worker线程都拥有一个Transaction实例,保存在Channel(BasicChannelSemantics)里的ThreadLocal变量currentTransaction.

那么,事务到底做了什么?
技术分享

实际上,Transaction实例包含两个双向阻塞队列LinkedBlockingDeque(感觉没必要用双向队列,每个线程写自己的putList,又不是多个线程?),分别为:

  • putList
  • takeList

对于Put事物操作,当然是只用到putList了。putList就是一个临时的缓冲区,数据会先put到putList,最后由commit方法会检查channel是否有足够的缓冲区,有则合并到channel的队列。
channel.put -> transaction.doPut:

  1. protected void doPut(Event event) throws InterruptedException {
  2.       //计算数据字节大小
  3.       int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
  4.       //写入临时缓冲区putList
  5.       if (!putList.offer(event)) {
  6.         throw new ChannelException(
  7.           \"Put queue for MemoryTransaction of capacity \" +
  8.             putList.size() + \" full, consider committing more frequently, \" +
  9.             \"increasing capacity or increasing thread count\");
  10.       }
  11.       putByteCounter += eventByteSize;
  12.     }

transaction.commit:

  1.     @Override
  2.     protected void doCommit() throws InterruptedException {
  3.       //检查channel的队列剩余大小是否足够
  4.       ...
  5.       int puts = putList.size();
  6.       ...
  7.       synchronized(queueLock) {
  8.         if(puts > 0 ) {
  9.           while(!putList.isEmpty()) {
  10.             //写入到channel的队列
  11.             if(!queue.offer(putList.removeFirst())) {
  12.               throw new RuntimeException(\"Queue add failed, this shouldn\‘t be able to happen\");
  13.             }
  14.           }
  15.         }
  16.         //清除临时队列
  17.         putList.clear();
  18.         ...
  19.       }
  20.       ...
  21.     }

如果在事务期间出现异常,比如channel剩余空间不足,则rollback:

  1. @Override
  2.     protected void doRollback() {
  3.     ...
  4.         //抛弃数据,没合并到channel的内存队列
  5.         putList.clear();
  6.       ...
  7.     }




Take事务

 

Take事务分为以下阶段:

  • doTake:先将数据取到临时缓冲区takeList
  • 将数据发送到下一个节点
  • doCommit:如果数据全部发送成功,则清除临时缓冲区takeList
  • doRollback:数据发送过程中如果出现异常,rollback将临时缓冲区takeList中的数据归还给channel内存队列。

技术分享

Sink其实是由SinkRunner线程调用Sink.process方法来了处理数据的。我们从HdfsEventSink的process方法说起,Sink类都有个process方法,用来处理传输数据的逻辑。:

  1. public Status process() throws EventDeliveryException {
  2.     ...
  3.     Transaction transaction = channel.getTransaction();
  4.     ...
  5.     //事务开始
  6.     transaction.begin();
  7.     ...
  8.       for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
  9.         //take数据到临时缓冲区,实际调用的是transaction.doTake
  10.         Event event = channel.take();
  11.         if (event == null) {
  12.           break;
  13.         }
  14.         ...
  15.       //写数据到HDFS
  16.       bucketWriter.append(event);
  17.       ...
  18.       // flush all pending buckets before committing the transaction
  19.       for (BucketWriter bucketWriter : writers) {
  20.         bucketWriter.flush();
  21.       }
  22.       //commit
  23.       transaction.commit();
  24.       ...
  25.     } catch (IOException eIO) {
  26.       transaction.rollback();
  27.       ...
  28.     } finally {
  29.       transaction.close();
  30.     }
  31.   }

 

大致流程图:
技术分享

接着看看channel.take,作用是将数据放到临时缓冲区,实际调用的是transaction.doTake:

  1. protected Event doTake() throws InterruptedException {
  2.       ...
  3.       //从channel内存队列取数据
  4.       synchronized(queueLock) {
  5.         event = queue.poll();
  6.       }
  7.       ...
  8.       //将数据放到临时缓冲区
  9.       takeList.put(event);
  10.       ...
  11.       return event;
  12.     }

接着,HDFS写线程bucketWriter将take到的数据写到HDFS,如果批数据都写完了,则要commit了:

  1. protected void doCommit() throws InterruptedException {
  2.     ...
  3.     takeList.clear();
  4.     ...
  5. }

 

很简单,其实就是清空takeList而已。如果bucketWriter在写数据到HDFS的时候出现异常,则要rollback:

  1. protected void doRollback() {
  2.       int takes = takeList.size();
  3.       //检查内存队列空间大小,是否足够takeList写回去
  4.       synchronized(queueLock) {
  5.         Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), \"Not enough space in memory channel \" +
  6.             \"queue to rollback takes. This should never happen, please report\");
  7.         while(!takeList.isEmpty()) {
  8.           queue.addFirst(takeList.removeLast());
  9.         }
  10.         ...
  11.       }
  12.       ...
  13.     }

 

读完代码可见 

batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。

即一次性你可以处理batchSize个event,这个一次性就是指在一个事务中。

这个参数值越大,每个事务提交的范围就越大,taskList的清空等操作次数会减少,因此性能肯定会提升,但是可能在出错时,回滚的返回也会变大。

接下来看一下
 

内存通道中的内部类MemoryTransaction:

 private class MemoryTransaction extends BasicTransactionSemantics {
    private LinkedBlockingDeque takeList;
    private LinkedBlockingDeque putList;
    private final ChannelCounter channelCounter;
    private int putByteCounter = 0;
    private int takeByteCounter = 0;

    public MemoryTransaction(int transCapacity, ChannelCounter counter) {
      putList = new LinkedBlockingDeque(transCapacity);
      takeList = new LinkedBlockingDeque(transCapacity);

      channelCounter = counter;
    }

可见transactionCapacity参数其实 

就是putList和takeList的容量大小。在flume1.5版本中SpillableMemoryChannel的putList和takeList的长度为largestTakeTxSize和largestPutTxSize参数,该参数值为5000












以上是关于理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理 的主要内容,如果未能解决你的问题,请参考以下文章

epoch iteration和batchsize区别

高可用Hadoop平台-Flume NG实战图解篇

深度学习基本功1:网络训练小技巧之理解Batch SizeIterations和Epochs

Flume-ng的原理和使用

OracleBulkCopy 给 BatchSize 和不给 BatchSize 有啥区别

小烨收藏Flume NG概述