jstorm在使用kafka作为spout的时候多线程问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了jstorm在使用kafka作为spout的时候多线程问题相关的知识,希望对你有一定的参考价值。
jstorm在使用kafka作为spout的时候,高并发情况下会出现多线程报错问题
需要对这两个类进行适当的修改来避免上述问题:
storm.kafka.PartitionManager
storm.kafka.ExponentialBackoffMsgRetryManager
1.storm.kafka.PartitionManager的修改
//将变量
private SortedMap<Long, Long> _pending = new TreeMap();
//改为:
private SortedMap<Long, Long> _pending = Collections.synchronizedSortedMap(new TreeMap<Long, Long>());
/**----------------------------------------------------------------------------------------------------**/
//将方法
public long lastCompletedOffset() {
return this._pending.isEmpty()?this._emittedToOffset.longValue():((Long)this._pending.firstKey()).longValue();
}
//改为:
public long lastCompletedOffset() {
synchronized (_pending) {
if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.firstKey();
}
}
}
2.storm.kafka.ExponentialBackoffMsgRetryManager的修改
//将 private Queue<ExponentialBackoffMsgRetryManager.MessageRetryRecord> waiting = new PriorityQueue(11, new ExponentialBackoffMsgRetryManager.RetryTimeComparator()); private Map<Long, ExponentialBackoffMsgRetryManager.MessageRetryRecord> records = new ConcurrentHashMap(); //改为: private Queue<MessageRetryRecord> waiting = new PriorityBlockingQueue<MessageRetryRecord>(11, new RetryTimeComparator()); private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
以上是关于jstorm在使用kafka作为spout的时候多线程问题的主要内容,如果未能解决你的问题,请参考以下文章
JStorm与Storm源码分析--SpoutOutputCollector与代理模式
Storm编程之wordcount(kafka--》Jstorm--》redis)