合并多个事件流

Posted

技术标签:

【中文标题】合并多个事件流【英文标题】:Combine multiple streams of events 【发布时间】:2012-11-16 11:34:45 【问题描述】:

假设我有 N 个数据事件流,我想将它们组合成一个,使用一些 for of ordering(例如 timestamp)。假设EventStream 被定义为:

class EventStream

    Event peek();

    Event next();

现在我想采用 N 个事件流,将它们包装在一个流中,这将强制排序。但是,我不想简单地遍历所有流并将它们添加到priorityQueue - 我不希望内存中的所有事件,因为我会很快耗尽堆空间。我想要一种动态方法,其中每个next() 之后的组合流计算出下一个事件应该是什么。我可以每次扫描 N 个流并找出下一个值是什么,但有更好的方法吗?

【问题讨论】:

听起来你想要一个排序堆而不排序。 【参考方案1】:

您可以避免缓存所有内容并在流中进行过多的查找,只需查看它们的头部,并且仅在需要时这样做。我建议你写一个类似这个的MergedEventStream

public class MergedEventStream implements EventStream 

    private ArrayList<EventStream> merged = new ArrayList<EventStream>();
    private int nextIndex = -1;

    public MergedEventStream(Collection<EventStream> toMerge) 
        merged.addAll(toMerge);
        findNext();
    

    public Event peek() 
        if (nextIndex == -1 && findNext() == false) 
           throw new NoSuchElementException();
         else 
           Event e = merged.get(nextIndex).peek();
           return e;
        
    

    public Event peek() 
        if (nextIndex == -1 && findNext() == false) 
           throw new NoSuchElementException();
         else 
           Event e = merged.get(nextIndex).next();
           findNext();
           return e;
        
    

    /**
     * iterates over merged, and for each stream with an available event,
     * adds it to a sorted TreeMap<Event, Integer> (sorting by any event field; integer
     * is stream index in arrayList)
     * if set is not empty, returns 'true', and sets nextIndex to the stream index
     * otherwise, returns 'false', and sets nextIndex to -1
     */
    private boolean findNext() 
        // ...
    

您可以通过将 TreeMap 保留为实例属性并仅刷新您从中提取的那些流来提高效率。

【讨论】:

【参考方案2】:

使用 MinHeap 存储每个事件流中的一个事件。

next() 上,从堆中弹出顶部事件(时间最早的值)。

然后从检索事件的同一 EventStream 中推送一个事件。

所以MinHeap中的每个EventStream只会有一个Event。

您需要将 EventStream 的引用与事件一起存储在 MinHeap 中。

这个next() 实现将使用 O(log n),其中 'n' 是 EventStream 的数量。

注意:预计 EventStream 已经对事件进行了排序。 Next() 总是返回最旧的事件。

【讨论】:

这不正是我在回答中建议的吗?【参考方案3】:

你的方法很好。除非 N 很大,否则应该没问题。

如果 N 真的很大,你可以将每个流的第一个事件存储在一个排序集合中,与它来自的流相关联,每次你从这个排序集合中删除一个项目时,你会添加下一个它来自的流。

【讨论】:

#2 和我的建议是一样的——你打败了我

以上是关于合并多个事件流的主要内容,如果未能解决你的问题,请参考以下文章

2018-06-25Js事件流+JQuary基本语法

Fink| CEP

大数据计算引擎之Flink Flink CEP复杂事件编程

如何在流分析组中将多条记录与字符串和空值合并

flume案例支持

JavaScript的事件