flume使用MemoryChannel报错:Space for commit to queue couldn‘t be acquired

Posted 请叫我大师兄_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flume使用MemoryChannel报错:Space for commit to queue couldn‘t be acquired相关的知识,希望对你有一定的参考价值。

在数据量大的时候,flume日志中出这么个错误:


Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
        at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at org.apache.flume.channel.ChannelProcessor.executeChannelTransaction(ChannelProcessor.java:245)

A。简单了解一点点flume的原理:

flume中三大件,source、channel、sink,串起来

0,维持一个队列,队列的两端分别是source和sink。
1,source使用doPut方法往putList插入Event
2,sink使用doTake方法从queue中获取event放入takeList,并且提供rollback方法,用于回滚。
3,commit方法作用是把putList中的event一次性写到queue;
下面表示了Event在一个使用了MemoryChannel的agent中数据流向:

source ---> putList ---> queue ---> takeList ---> sink

中间的三个阶段,是在channel里面的。

报这个错的原因:就是source在拿到数据,往channel里面写的时候,没地方了,中间是个queue,比作池子,流入太快,流出慢,然后,慢慢的,池子就满了,再流就溢出了。

现在要解决问题:

1,是流入和流出的速度差;

2,增大池子以延迟池子流满的时间!

先检查一下flume的配置,是不是source的batchSize比sink的batchSize大,改改让流入慢点,给流出争取时间,一般sink都是要把数据写出去,比如写es,写文件,写kafka,写Hadoop等,这个操作比较耗时。

B。MemoryChannel的几个参数

  //定义队列中一次允许的事件总数
  private static final Integer defaultCapacity = 100;

  //定义一个事务中允许的事件总数
  private static final Integer defaultTransCapacity = 100;

  //将物理内存转换成槽(slot)数,默认是100
  private static final double byteCapacitySlotSize = 100;

  //定义队列中事件所使用空间的最大字节数(默认是JVM最大可用内存的0.8)
  private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);

  //定义byteCapacity和预估Event大小之间的缓冲区百分比:
  private static final Integer defaultByteCapacityBufferPercentage = 20;

  //添加或者删除一个event的超时时间,单位秒:
  private static final Integer defaultKeepAlive = 3;

  // maximum items in a transaction queue
  private volatile Integer transCapacity;
  private volatile int keepAlive;
  private volatile int byteCapacity;
  private volatile int lastByteCapacity;
  private volatile int byteCapacityBufferPercentage;
  private ChannelCounter channelCounter;
Flume中Memory Channel参数说明
capacity在 channel 中 最多能保存多少个 event。默认是100
transCapacity在每次从source中获取数据或者将数据sink出去的一次事务操作中,最多处理的 event 数。默认是100
byteCapacity在 channel 中最多能容纳 所有event body的总字节数。默认是 JVM最大可用内存(-Xmx )的80% 。(大师兄不建议改这个参数)
byteCapacityBufferPercentage计算event header跟最大可用内存的字节占比。默认是20(大师兄不建议改这个参数)
keep-alive尝试添加或者删除一个event的超时时间,单位为秒。默认是3

能改的参数就capacity和transCapacity了,增加capacity的大小,就是调整channel缓存数据的能力,在增大这个配置的值的时候,flume的jvm参数,必须得跟着一起加大,不然,稍微积压一下,数据都积压到channel里面,channel full 之前,已经把heap占满了,然后就又会出OOM的异常了。还有就是增加channel sink的对儿,这个也需要同时扩大flume的jvm的内存大小,不然也会有OOM的风险。

以上是关于flume使用MemoryChannel报错:Space for commit to queue couldn‘t be acquired的主要内容,如果未能解决你的问题,请参考以下文章

flume从Kafka消费数据到HDFS

flume从kafka读取数据到hdfs中的配置

消费Kafka数据Flume

消费Kafka数据Flume

Flume整合Kafka完成实时数据采集

使用Flume