数据流过程中一定大小窗口的topK问题
Posted MachineChen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了数据流过程中一定大小窗口的topK问题相关的知识,希望对你有一定的参考价值。
我们经常在大数据问题中遇到topK,但这里我们讨论的是在数据流的场景下数据中的topK,本人将在下面提出一些设计方式。
示意图如下:
数据流流入处理模块,模块中初始化了最小堆和最大堆,维护两个堆之间的关系和大小,保证模块中为最近一段时间内一定数量的数据,再通过堆的性质,获取这些数据中的topK。
代码实现如下:
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TopKUtils<T>
static final int MAXIMUM_CAPACITY = 1 << 30;
private ReentrantLock lock = new ReentrantLock(true); //公平锁
private int initialCapacity;
private int minHeapNun;
private int maxHeapNum;
private int curIndex;
private Comparator<T> comparator;
private Queue<T> minHeap;
private Queue<T> maxHeap;
private Object[] dataList;
/**
* 构造函数
* @param initialCapacity 容量大小
* @param loadFactor topK所在容量大小的百分比位置
* @param comparator 比较函数
*/
public TopKUtils(int initialCapacity, float loadFactor, Comparator<T> comparator)
//判断参数是否有异常
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || loadFactor >= 1 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
if (comparator == null)
throw new IllegalArgumentException("Illegal comparator");
this.initialCapacity = initialCapacity;
this.minHeapNun = Math.round(initialCapacity * loadFactor);
this.maxHeapNum = initialCapacity - minHeapNun;
this.comparator = comparator;
init();
/**
* 初始化代码
*/
public void init()
final Comparator<T> comparator = this.comparator;
//最大堆的比较函数,与最小堆相对应
Comparator<T> AntiComparator = new Comparator<T>()
@Override
public int compare(T c1, T c2)
return -comparator.compare(c1, c2);
;
//初始化最小堆、最大堆,这里用优先级队列实现效果
this.minHeap = new PriorityQueue<>(this.minHeapNun, comparator);
this.maxHeap = new PriorityQueue<>(this.maxHeapNum, AntiComparator);
//新建一个存储之前容量大小数据的临时空间,用于过期数据的剔除比较
this.dataList = new Object[this.initialCapacity];
public void push(T t)
try
//加锁,1s过期时间
if (lock.tryLock(1, TimeUnit.SECONDS))
try
//获取临时数据中的当前位置数据
T curValue = (T) this.dataList[this.curIndex];
//curValue为空,则大小堆未满
if (curValue == null)
if (this.minHeapNun > this.minHeap.size())
this.minHeap.add(t);
else
addInOrder(t);
else
//curValue不为空则从小堆或者大堆中删除这个过期数据
boolean isFromMinHeap = this.minHeap.remove(curValue);
if (isFromMinHeap)
T peekMaxHeap = this.maxHeap.peek();
this.maxHeap.remove();
this.minHeap.add(peekMaxHeap);
addInOrder(t);
else
this.maxHeap.remove(curValue);
addInOrder(t);
//更新临时数据及当前下标
this.dataList[this.curIndex] = t;
this.curIndex = (this.curIndex + 1) % this.initialCapacity;
finally
lock.unlock();
catch (Exception e)
//这里的异常不能使程序退出
e.printStackTrace();
public void addInOrder(T t)
//判断新增的数据是否比最小堆的堆顶要大
if (this.comparator.compare(this.minHeap.peek(), t) > 0)
this.maxHeap.add(t);
else
T peekMinHeap = this.minHeap.peek();
this.minHeap.remove();
this.minHeap.add(t);
this.maxHeap.add(peekMinHeap);
public T getTopK()
if (this.minHeap.size() > 0)
return this.minHeap.peek();
else if (this.maxHeap.size() > 0)
return this.maxHeap.peek();
return null;
public static void main(String[] args)
Comparator<Float> defaultComparator = new Comparator<Float>()
@Override
public int compare(Float c1, Float c2)
return Float.compare(c1, c2);
;
TopKUtils topKUtils = new TopKUtils<>(1000, 0.1f, defaultComparator);
long start = System.currentTimeMillis();
for (int i = 0; i < 100000; i++)
topKUtils.push((float) Math.random());
float result = (float) topKUtils.getTopK();
// System.out.println(result);
long end = System.currentTimeMillis();
System.out.println("代码运行时间:" + (end - start) + "ms");
因为优先级队列删除特定值操作的时间复杂度为O(n),删除堆顶的时间复杂度为O(1),重排的时间复杂度为O(logn),故每次操作的时间复杂度为O(n)。
这里还有改进的空间,可以维护一个大小为预设容量的hashmap,键为数据在最近一定数量的数据中的位置,值为其在优先级队列的下标,这样删除操作的时间复杂度可以下降到O(logn),整体时间复杂度为O(logn),但是需要重写优先级队列,并维护hashmap中对应的位置,这里仅仅提供一个思路。
以上是关于数据流过程中一定大小窗口的topK问题的主要内容,如果未能解决你的问题,请参考以下文章