合并多个事件流
Posted
技术标签:
【中文标题】合并多个事件流【英文标题】:Combine multiple streams of events 【发布时间】:2012-11-16 11:34:45 【问题描述】:假设我有 N 个数据事件流,我想将它们组合成一个,使用一些 for of ordering(例如 timestamp)。假设EventStream
被定义为:
class EventStream
Event peek();
Event next();
现在我想采用 N 个事件流,将它们包装在一个流中,这将强制排序。但是,我不想简单地遍历所有流并将它们添加到priorityQueue
- 我不希望内存中的所有事件,因为我会很快耗尽堆空间。我想要一种动态方法,其中每个next()
之后的组合流计算出下一个事件应该是什么。我可以每次扫描 N 个流并找出下一个值是什么,但有更好的方法吗?
【问题讨论】:
听起来你想要一个排序堆而不排序。 【参考方案1】:您可以避免缓存所有内容并在流中进行过多的查找,只需查看它们的头部,并且仅在需要时这样做。我建议你写一个类似这个的MergedEventStream
:
public class MergedEventStream implements EventStream
private ArrayList<EventStream> merged = new ArrayList<EventStream>();
private int nextIndex = -1;
public MergedEventStream(Collection<EventStream> toMerge)
merged.addAll(toMerge);
findNext();
public Event peek()
if (nextIndex == -1 && findNext() == false)
throw new NoSuchElementException();
else
Event e = merged.get(nextIndex).peek();
return e;
public Event peek()
if (nextIndex == -1 && findNext() == false)
throw new NoSuchElementException();
else
Event e = merged.get(nextIndex).next();
findNext();
return e;
/**
* iterates over merged, and for each stream with an available event,
* adds it to a sorted TreeMap<Event, Integer> (sorting by any event field; integer
* is stream index in arrayList)
* if set is not empty, returns 'true', and sets nextIndex to the stream index
* otherwise, returns 'false', and sets nextIndex to -1
*/
private boolean findNext()
// ...
您可以通过将 TreeMap 保留为实例属性并仅刷新您从中提取的那些流来提高效率。
【讨论】:
【参考方案2】:使用 MinHeap 存储每个事件流中的一个事件。
在next()
上,从堆中弹出顶部事件(时间最早的值)。
然后从检索事件的同一 EventStream 中推送一个事件。
所以MinHeap中的每个EventStream只会有一个Event。
您需要将 EventStream 的引用与事件一起存储在 MinHeap 中。
这个next()
实现将使用 O(log n),其中 'n' 是 EventStream 的数量。
注意:预计 EventStream 已经对事件进行了排序。 Next() 总是返回最旧的事件。
【讨论】:
这不正是我在回答中建议的吗?【参考方案3】:你的方法很好。除非 N 很大,否则应该没问题。
如果 N 真的很大,你可以将每个流的第一个事件存储在一个排序集合中,与它来自的流相关联,每次你从这个排序集合中删除一个项目时,你会添加下一个它来自的流。
【讨论】:
#2 和我的建议是一样的——你打败了我以上是关于合并多个事件流的主要内容,如果未能解决你的问题,请参考以下文章