数据流过程中一定大小窗口的topK问题

Posted MachineChen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据流过程中一定大小窗口的topK问题相关的知识,希望对你有一定的参考价值。

我们经常在大数据问题中遇到topK,但这里我们讨论的是在数据流的场景下数据中的topK,本人将在下面提出一些设计方式。

示意图如下:

数据流流入处理模块,模块中初始化了最小堆和最大堆,维护两个堆之间的关系和大小,保证模块中为最近一段时间内一定数量的数据,再通过堆的性质,获取这些数据中的topK。

代码实现如下:

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class TopKUtils<T> 

    static final int MAXIMUM_CAPACITY = 1 << 30;
    private ReentrantLock lock = new ReentrantLock(true); //公平锁
    private int initialCapacity;
    private int minHeapNun;
    private int maxHeapNum;
    private int curIndex;
    private Comparator<T> comparator;
    private Queue<T> minHeap;
    private Queue<T> maxHeap;
    private Object[] dataList;

    /**
     * 构造函数
     * @param initialCapacity   容量大小
     * @param loadFactor    topK所在容量大小的百分比位置
     * @param comparator    比较函数
     */
    public TopKUtils(int initialCapacity, float loadFactor, Comparator<T> comparator) 
        //判断参数是否有异常
        if (initialCapacity < 0)
            throw new IllegalArgumentException("Illegal initial capacity: " +
                    initialCapacity);
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        if (loadFactor <= 0 || loadFactor >= 1 || Float.isNaN(loadFactor))
            throw new IllegalArgumentException("Illegal load factor: " +
                    loadFactor);
        if (comparator == null)
            throw new IllegalArgumentException("Illegal comparator");
        this.initialCapacity = initialCapacity;
        this.minHeapNun = Math.round(initialCapacity * loadFactor);
        this.maxHeapNum = initialCapacity - minHeapNun;
        this.comparator = comparator;
        init();
    

    /**
     * 初始化代码
     */
    public void init() 
        final Comparator<T> comparator = this.comparator;
        //最大堆的比较函数,与最小堆相对应
        Comparator<T> AntiComparator = new Comparator<T>() 
            @Override
            public int compare(T c1, T c2) 
                return -comparator.compare(c1, c2);
            
        ;
        //初始化最小堆、最大堆,这里用优先级队列实现效果
        this.minHeap = new PriorityQueue<>(this.minHeapNun, comparator);
        this.maxHeap = new PriorityQueue<>(this.maxHeapNum, AntiComparator);
        //新建一个存储之前容量大小数据的临时空间,用于过期数据的剔除比较
        this.dataList = new Object[this.initialCapacity];
    

    public void push(T t) 
        try 
            //加锁,1s过期时间
            if (lock.tryLock(1, TimeUnit.SECONDS)) 
                try 
                    //获取临时数据中的当前位置数据
                    T curValue = (T) this.dataList[this.curIndex];
                    //curValue为空,则大小堆未满
                    if (curValue == null) 
                        if (this.minHeapNun > this.minHeap.size()) 
                            this.minHeap.add(t);
                         else 
                            addInOrder(t);
                        
                     else 
                        //curValue不为空则从小堆或者大堆中删除这个过期数据
                        boolean isFromMinHeap = this.minHeap.remove(curValue);
                        if (isFromMinHeap) 
                            T peekMaxHeap = this.maxHeap.peek();
                            this.maxHeap.remove();
                            this.minHeap.add(peekMaxHeap);

                            addInOrder(t);
                         else 
                            this.maxHeap.remove(curValue);

                            addInOrder(t);
                        
                    
                    //更新临时数据及当前下标
                    this.dataList[this.curIndex] = t;
                    this.curIndex = (this.curIndex + 1) % this.initialCapacity;
                 finally 
                    lock.unlock();
                
            
         catch (Exception e) 
            //这里的异常不能使程序退出
            e.printStackTrace();
        

    

    public void addInOrder(T t) 
        //判断新增的数据是否比最小堆的堆顶要大
        if (this.comparator.compare(this.minHeap.peek(), t) > 0) 
            this.maxHeap.add(t);
         else 
            T peekMinHeap = this.minHeap.peek();
            this.minHeap.remove();
            this.minHeap.add(t);
            this.maxHeap.add(peekMinHeap);
        
    

    public T getTopK() 
        if (this.minHeap.size() > 0) 
            return this.minHeap.peek();
         else if (this.maxHeap.size() > 0) 
            return this.maxHeap.peek();
        
        return null;
    

    public static void main(String[] args) 
        Comparator<Float> defaultComparator = new Comparator<Float>() 
            @Override
            public int compare(Float c1, Float c2) 
                return Float.compare(c1, c2);
            
        ;
        TopKUtils topKUtils = new TopKUtils<>(1000, 0.1f, defaultComparator);
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) 
            topKUtils.push((float) Math.random());
            float result = (float) topKUtils.getTopK();
//            System.out.println(result);
        
        long end = System.currentTimeMillis();
        System.out.println("代码运行时间:" + (end - start) + "ms");
    

因为优先级队列删除特定值操作的时间复杂度为O(n),删除堆顶的时间复杂度为O(1),重排的时间复杂度为O(logn),故每次操作的时间复杂度为O(n)。

这里还有改进的空间,可以维护一个大小为预设容量的hashmap,键为数据在最近一定数量的数据中的位置,值为其在优先级队列的下标,这样删除操作的时间复杂度可以下降到O(logn),整体时间复杂度为O(logn),但是需要重写优先级队列,并维护hashmap中对应的位置,这里仅仅提供一个思路。

以上是关于数据流过程中一定大小窗口的topK问题的主要内容,如果未能解决你的问题,请参考以下文章

堆排序应用之topK问题

多情景下的TopK问题

算法与数据结构,你一定要知道的

我可以将 div 高度编程为当前宽度的一定百分比吗? [复制]

用Javascript获取页面元素的位置

MapReduce TopK问题实际应用