时间轮存储设计与实现

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了时间轮存储设计与实现相关的知识,希望对你有一定的参考价值。

目录

一、实现思路

1. 存任务

2. 取任务

二、代码实现

1. 抽象类TimeWheel

2. 用数组实现

3. 用Map实现

4. 测试数组

5. 测试Map 

6. 小结 


      时间轮是一个环状的存储结构,它像一个时钟,我们可以给环设置指定数量的刻度,比如一个环60个刻度或者60*60刻度,存任务时根据index取到数组里,那么在取任务时可以根据时间戳来计算Index, 从而实现轮询时间轮里的任务。

        如下用数组作为存储任务的时间轮,也可以使用链表替代。

        Java中的列表底层结构是数组,可用Set或List,  下面以数组为存储单元详解实现思路。

一、实现思路

1. 存任务

        以ID做为计算时间轮中的index位置,假如时间轮的槽位有60个,可设置index=ID%60, 如果能拿到值,并不为空,那么存任务时,将当前任务添加到列表后。

        为了避免空指针异常,可以在new的时候给所有槽位初始化空数组。

2. 取任务

        由于我们是以时间的刻度作为存储的index, 那么可以直接使用当前时间戳计算出对应的环上的id。

       index= timestamp/1000 %60

        即每次pull前,需要计算一下index,相当于是秒表的转动,假如pull线程每隔1s休眠一次,那么在可以实现每秒从时间轮中取出一组任务,60s 一圈,每s都会去时间轮里拿任务。

二、代码实现

1. 抽象类TimeWheel

package com.bing.sh.timewheel;

import java.util.*;

public abstract class TimeWheel 

    // 每个槽位对应一个时间刻度
    protected int bufferSize = 60;


    public TimeWheel() 
    

    public TimeWheel(int size) 
        this.bufferSize = size;
    


    protected abstract void pushRingData(int ringId, Object obj);

    protected abstract Collection<?> removeRingData(int ringId);


    public void pusData(int ringId, Object obj) 
        int index = countIndex(ringId);
        this.pushRingData(index, obj);
    

    public Collection<?> removeData(int ringId) 
        int index = countIndex(ringId);
        return this.removeRingData(index);
    


    public int countIndex(int ringId) 
        if (ringId < 0) 
            ringId = -ringId;
        
        return ringId % bufferSize;
    


        使用ringId作为key来计算index, 存取前都要计算一遍 ,pushData用来存任务,removeData用来从时间轮里取任务。

2. 用数组实现

        创建EntryWheel实现类,继承TimeWheel, 可使用initBuffer()方法给时间轮初始化, 可用ReentrantReadWriteLock 加读写锁,也可不加,因为轮询的时候是单线程的取。

package com.bing.sh.timewheel;


import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class EntryWheel extends TimeWheel 

    /**
     * 采用数组的形式, 通过index来作为存储的key, value为一个List或者Set列表
     */
    private Object[] ringBuffer;

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();


    public EntryWheel() 
        super();
        initBuffer();
    

    public void initBuffer() 
        ringBuffer = new Object[bufferSize];
        for (int i = 0; i < bufferSize; i++) 
            ringBuffer[i] = new HashSet<>();
        
    


    public EntryWheel(int size) 
        super(size);
        initBuffer();
    

    @Override
    protected void pushRingData(int index, Object obj) 
        // 1. 根据ringId计算index
        // 2. 根据index存放obj
        try 
            lock.writeLock().lock();
            Set<Object> objects = (Set<Object>) ringBuffer[index];
            objects.add(obj);
            ringBuffer[index] = objects;
         finally 
            lock.writeLock().unlock();
        

    

    @Override
    protected Collection<?> removeRingData(int index) 
        try 
            lock.readLock().lock();
            Set<Object> objects = (Set<Object>) ringBuffer[index];
            ringBuffer[index] = new HashSet<>();
            return objects;
         finally 
            lock.readLock().unlock();
        
    


3. 用Map实现

        用Map替换为数组。

package com.bing.sh.timewheel;


import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MapWheel extends TimeWheel 

    private Map<Integer/** id**/, Set<Object>> ringDataMap;

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public MapWheel() 
        ringDataMap = new ConcurrentHashMap<>(bufferSize);
        initMap();
    

    private void initMap() 
        for (int i = 0; i < bufferSize; i++) 
            Set<Object> objectSet = new HashSet<>();
            ringDataMap.put(i, objectSet);
        
    

    public MapWheel(int size) 
        super(size);
        ringDataMap = new ConcurrentHashMap<>(bufferSize);
        initMap();
    

    @Override
    protected void pushRingData(int ringId, Object obj) 
        try 
            lock.writeLock().lock();
            Set<Object> objects = ringDataMap.get(ringId);
            objects.add(obj);
         finally 
            lock.writeLock().unlock();
        
    

    @Override
    protected Collection<?> removeRingData(int ringId) 
        try 
            lock.readLock().lock();
            Set<Object> objects = ringDataMap.get(ringId);
            ringDataMap.put(ringId, new HashSet<>());
            return objects;
         finally 
            lock.readLock().unlock();
        
    

4. 测试数组

package com.bing.sh.timewheel;

import org.junit.Before;
import org.junit.Test;

import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public class EntryWheelTests 

    Random random;


    @Before
    public void setup() 
        random = new Random();
    


    @Test
    public void testEntryWheel() 
        EntryWheel entryWheel = new EntryWheel();
        new Thread(() -> 
            while (true) 
                entryWheel.pusData(random.nextInt(60), random.nextInt(1000000));
                try 
                    Thread.sleep(10);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        ).start();

        new Thread(() -> 
            while (true) 
                int nowTime = (int) (System.currentTimeMillis() / 1000 % 60);
                Set<Object> objectSet = (Set<Object>) entryWheel.removeData(nowTime);
                System.out.println(nowTime + ">>>" + objectSet);
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
            
        ).start();

        try 
            TimeUnit.SECONDS.sleep(10);
         catch (InterruptedException e) 
            e.printStackTrace();
        
    



打印结果:

每秒读任务, 直到打印10次结束

5. 测试Map 

        用同样的方式测试Map实现的时间轮。

        可以发现,也能够实现按s读取, 每次读取到数组的大小是随机的。

6. 小结 

        1) 时间轮的size可根据实际开发需求设置,这里为了好理解,设置成60。

        2) 每次存取前可根据size和ringId 取模,或者采用其他算法也可以,只要散布在轮子上的数据均匀即可。

以上是关于时间轮存储设计与实现的主要内容,如果未能解决你的问题,请参考以下文章

时间轮(TimeWheel)的设计与实现

时间轮算法

时间轮在NettyKafka中的应用

时间轮在NettyKafka中的应用

Sencha touch 2、多物品轮播

返回一个整数数组中最大字数组的和(一维数组首尾相连)