时间轮存储设计与实现
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了时间轮存储设计与实现相关的知识,希望对你有一定的参考价值。
目录
时间轮是一个环状的存储结构,它像一个时钟,我们可以给环设置指定数量的刻度,比如一个环60个刻度或者60*60刻度,存任务时根据index取到数组里,那么在取任务时可以根据时间戳来计算Index, 从而实现轮询时间轮里的任务。
如下用数组作为存储任务的时间轮,也可以使用链表替代。
Java中的列表底层结构是数组,可用Set或List, 下面以数组为存储单元详解实现思路。
一、实现思路
1. 存任务
以ID做为计算时间轮中的index位置,假如时间轮的槽位有60个,可设置index=ID%60, 如果能拿到值,并不为空,那么存任务时,将当前任务添加到列表后。
为了避免空指针异常,可以在new的时候给所有槽位初始化空数组。
2. 取任务
由于我们是以时间的刻度作为存储的index, 那么可以直接使用当前时间戳计算出对应的环上的id。
index= timestamp/1000 %60
即每次pull前,需要计算一下index,相当于是秒表的转动,假如pull线程每隔1s休眠一次,那么在可以实现每秒从时间轮中取出一组任务,60s 一圈,每s都会去时间轮里拿任务。
二、代码实现
1. 抽象类TimeWheel
package com.bing.sh.timewheel;
import java.util.*;
public abstract class TimeWheel
// 每个槽位对应一个时间刻度
protected int bufferSize = 60;
public TimeWheel()
public TimeWheel(int size)
this.bufferSize = size;
protected abstract void pushRingData(int ringId, Object obj);
protected abstract Collection<?> removeRingData(int ringId);
public void pusData(int ringId, Object obj)
int index = countIndex(ringId);
this.pushRingData(index, obj);
public Collection<?> removeData(int ringId)
int index = countIndex(ringId);
return this.removeRingData(index);
public int countIndex(int ringId)
if (ringId < 0)
ringId = -ringId;
return ringId % bufferSize;
使用ringId作为key来计算index, 存取前都要计算一遍 ,pushData用来存任务,removeData用来从时间轮里取任务。
2. 用数组实现
创建EntryWheel实现类,继承TimeWheel, 可使用initBuffer()方法给时间轮初始化, 可用ReentrantReadWriteLock 加读写锁,也可不加,因为轮询的时候是单线程的取。
package com.bing.sh.timewheel;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class EntryWheel extends TimeWheel
/**
* 采用数组的形式, 通过index来作为存储的key, value为一个List或者Set列表
*/
private Object[] ringBuffer;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public EntryWheel()
super();
initBuffer();
public void initBuffer()
ringBuffer = new Object[bufferSize];
for (int i = 0; i < bufferSize; i++)
ringBuffer[i] = new HashSet<>();
public EntryWheel(int size)
super(size);
initBuffer();
@Override
protected void pushRingData(int index, Object obj)
// 1. 根据ringId计算index
// 2. 根据index存放obj
try
lock.writeLock().lock();
Set<Object> objects = (Set<Object>) ringBuffer[index];
objects.add(obj);
ringBuffer[index] = objects;
finally
lock.writeLock().unlock();
@Override
protected Collection<?> removeRingData(int index)
try
lock.readLock().lock();
Set<Object> objects = (Set<Object>) ringBuffer[index];
ringBuffer[index] = new HashSet<>();
return objects;
finally
lock.readLock().unlock();
3. 用Map实现
用Map替换为数组。
package com.bing.sh.timewheel;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MapWheel extends TimeWheel
private Map<Integer/** id**/, Set<Object>> ringDataMap;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public MapWheel()
ringDataMap = new ConcurrentHashMap<>(bufferSize);
initMap();
private void initMap()
for (int i = 0; i < bufferSize; i++)
Set<Object> objectSet = new HashSet<>();
ringDataMap.put(i, objectSet);
public MapWheel(int size)
super(size);
ringDataMap = new ConcurrentHashMap<>(bufferSize);
initMap();
@Override
protected void pushRingData(int ringId, Object obj)
try
lock.writeLock().lock();
Set<Object> objects = ringDataMap.get(ringId);
objects.add(obj);
finally
lock.writeLock().unlock();
@Override
protected Collection<?> removeRingData(int ringId)
try
lock.readLock().lock();
Set<Object> objects = ringDataMap.get(ringId);
ringDataMap.put(ringId, new HashSet<>());
return objects;
finally
lock.readLock().unlock();
4. 测试数组
package com.bing.sh.timewheel;
import org.junit.Before;
import org.junit.Test;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class EntryWheelTests
Random random;
@Before
public void setup()
random = new Random();
@Test
public void testEntryWheel()
EntryWheel entryWheel = new EntryWheel();
new Thread(() ->
while (true)
entryWheel.pusData(random.nextInt(60), random.nextInt(1000000));
try
Thread.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
).start();
new Thread(() ->
while (true)
int nowTime = (int) (System.currentTimeMillis() / 1000 % 60);
Set<Object> objectSet = (Set<Object>) entryWheel.removeData(nowTime);
System.out.println(nowTime + ">>>" + objectSet);
try
Thread.sleep(1000);
catch (InterruptedException e)
e.printStackTrace();
).start();
try
TimeUnit.SECONDS.sleep(10);
catch (InterruptedException e)
e.printStackTrace();
打印结果:
每秒读任务, 直到打印10次结束
5. 测试Map
用同样的方式测试Map实现的时间轮。
可以发现,也能够实现按s读取, 每次读取到数组的大小是随机的。
6. 小结
1) 时间轮的size可根据实际开发需求设置,这里为了好理解,设置成60。
2) 每次存取前可根据size和ringId 取模,或者采用其他算法也可以,只要散布在轮子上的数据均匀即可。
以上是关于时间轮存储设计与实现的主要内容,如果未能解决你的问题,请参考以下文章