jstorm在使用kafka作为spout的时候多线程问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jstorm在使用kafka作为spout的时候多线程问题相关的知识,希望对你有一定的参考价值。

  jstorm在使用kafka作为spout的时候,高并发情况下会出现多线程报错问题
需要对这两个类进行适当的修改来避免上述问题:
  storm.kafka.PartitionManager
  storm.kafka.ExponentialBackoffMsgRetryManager
1.storm.kafka.PartitionManager的修改
//将变量
private SortedMap<Long, Long> _pending = new TreeMap();
//改为:
private SortedMap<Long, Long> _pending = Collections.synchronizedSortedMap(new TreeMap<Long, Long>());

/**----------------------------------------------------------------------------------------------------**/

//将方法
public long lastCompletedOffset() {
    return this._pending.isEmpty()?this._emittedToOffset.longValue():((Long)this._pending.firstKey()).longValue();
}
//改为:
public long lastCompletedOffset() {
    synchronized (_pending) {
        if (_pending.isEmpty()) {
            return _emittedToOffset;
        } else {
            return _pending.firstKey();
        }
    }
}

2.storm.kafka.ExponentialBackoffMsgRetryManager的修改
//
private Queue<ExponentialBackoffMsgRetryManager.MessageRetryRecord> waiting = new PriorityQueue(11, new ExponentialBackoffMsgRetryManager.RetryTimeComparator()); 
private Map<Long, ExponentialBackoffMsgRetryManager.MessageRetryRecord> records = new ConcurrentHashMap();
//改为:
private Queue<MessageRetryRecord> waiting = new PriorityBlockingQueue<MessageRetryRecord>(11, new RetryTimeComparator()); 
private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();

 







以上是关于jstorm在使用kafka作为spout的时候多线程问题的主要内容,如果未能解决你的问题,请参考以下文章

JStorm与Storm源码分析--SpoutOutputCollector与代理模式

Storm编程之wordcount(kafka--》Jstorm--》redis)

Storm编程之wordcount(kafka--》Jstorm--》redis)

Kafka和Storm环境下如何实现多租户?

JStorm:任务调度

JStorm中的并行( parallelismction )介绍