golang实现本地延迟队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了golang实现本地延迟队列相关的知识,希望对你有一定的参考价值。

参考技术A 有个服务会大量使用延迟消息,进行事件处理。随着业务量不断上涨。在晚间、节假日等流量高峰期消息延迟消息队列限流会导致事件丢失,影响业务。与下游沟通后给上调到了最大限流值,问题依然存在,于是决定自己搞一套降级方案。

下游服务触发限流时,能降级部分流量到本地延迟队列,把业务损失降到最低。

本地延迟队列承接部分mq流量

流程如下:

1. 使用zset 存储延迟消息,其中:score为执行时间,value为消息体

2. 启动协程轮询zset,获取score最小的10条数据,协程执行间隔时间xs

        如果最小分值小于等于当前时间戳,则发送消息

        若最小分值大于当前时间戳,sleep等待执行

需要对key进行hash,打散到多个分片中,避免大key和热key问题,官方大key定义

因此,需保证每个key中value数量n<5000,单个value大小不超过 10240/n kb

假设承接10w qps,如何处理?

10w qps延迟120s时,最开始消息队列会积累100000*120=12000000条消息

假如每条消息大小500b,需占用存储6000000kb = 6000Mb = 6GB

为避免大key问题,每个zset存放4000个元素,需要哈希到3000(3000是key的数量,可配置)个zset中。

整个集群假设500台实例,每个处理qps平均在200左右。

单实例消费能力计算:

遍历每个zset,针对每个zset起goroutine处理,此示例中需要 起3000个

但是每秒能处理成功的只有200个,其他都在空跑

综上:

将redis key分片数n和每次处理的消息数m进行动态配置,便于调整

当流量上涨时,调大分片数n和单实例单分片并发数m即可,假如消费间隔200ms,集群处理能力为n*m*5 qps

n = (qps * 120) / 4000

若qps=q,则计算公式如下

zadd = q

zRange = 500 * 5 * n / 500

zRemove = q

setNx = 500 * 5 * n

若10w qps,则

读qps = 15000 + 500*3000*5 =7515000,写 20w

pros

redis 读写性能好,可支持较大并发量,zrange可直接取出到达执行时间的消息

cons

redis 大key问题导致对数据量有一定的限制

分片数量扩缩容会漏消费,会导致事件丢失,业务有损

key分片数量过多时,redis读写压力较大

机器资源浪费,3000个协程,单实例同一秒只有200个针对处理,其他都在空跑

流程如下:

使用带缓冲的channel来实现延迟队列,channel中存放的数据为消息体(包括执行时间),channel能保证先进先出

从channel中取出数据后,判断是否到达执行时间

到达,同步发送mq

未到达,sleep 剩余执行时间,然后再次执行

从channel读出的数据如果未到达执行时间,无法再次放入channel中,需要协程sleep(执行时间-当前时间)

10w qps延迟120s时,最开始消息队列会积累100000*120=12000000条消息,假设每条消息大小500b,需要6G存储空间

channel 大小 = (qps*120)/ c , c=集群实例数,c=500 => channel大小为24000,占用12M内存

要处理10w qps,分摊到每个机器的处理速度为 100000/500 = 200,假设单协程处理10qps,开20个即可。

pros:

本地存储,相比redis,读写速度更快;协程数量少,开销低;资源利用率较方案一高

cons:

稳定性不如redis,实例故障可能导致数据丢失;worker池和channel扩缩容依赖服务重启,成本高速度慢

综上,我们以10w qps为例,对比两种方案在以下指标差异,选择方案二。

附上demo

DelayQueue延迟队列-实现缓存

延迟阻塞队列DelayQueue

DelayQueue 是一个支持延时获取元素的阻塞队列,
内部采用优先队列 PriorityQueue 存储元素,
同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素。

使用场景

  • 缓存系统:当能够从延迟队列DelayQueue中获取到元素时,说明缓存已经过期
  • 定时任务调度:一分钟后发送短信

基于延迟队列,实现一个缓存系统

延迟队列中添加的元素,实现了Delayed接口

public class CacheItem implements Delayed{
    private long expireTime;
    
    private long currentTime;
    
    private String key;
    
    public String getKey() {
        return key;
    }
    
    public CacheItem(String key,long expireTime) {
        this.key = key;
        this.expireTime = expireTime;
        this.currentTime = System.currentTimeMillis();
    }

    /**
     * 比较方法,用于排序
     * 过期时间长的放队尾,时间短的放队首
     */
    @Override
    public int compareTo(Delayed o) {
        if(this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS))
            return 1;
        if(this.getDelay(TimeUnit.MICROSECONDS) > o.getDelay(TimeUnit.MICROSECONDS))
            return -1;
        return 0;
    }

    /**
     * 计算剩余的过期时间
     * 大于0说明没有过期
     */
    @Override
    public long getDelay(TimeUnit unit) {
        
        return expireTime - unit.MILLISECONDS.toSeconds(System.currentTimeMillis()-currentTime);
        
    }

}

缓存实现

public class DelayQueueDemo {
    static class Cache implements Runnable{
        private Map<String,String> itemMap = new HashMap<>();
    
        private DelayQueue<CacheItem> delayQueue = new DelayQueue<>();
        
        private boolean stop = false;
        
        // 初始化后就开始检测
        public Cache() {
            new Thread(this).start();
        }
        
        public void add(String key,String value,long expireTime) {
            CacheItem item = new CacheItem(key,expireTime);
            itemMap.put(key, value);
            delayQueue.add(item);
            
        }
        
        public String get(String key) {
            return itemMap.get(key);
        }
        
        public void shutdown() {
            stop = true;
        }
        
        // 开启多线程,检测缓存是否过期
        @Override
        public void run() {
            while(!stop) {
                CacheItem item = delayQueue.poll();
                if(item != null) {
                    // 缓存过期
                     itemMap.remove(item.getKey());
                     System.out.println("delete expired key:"+item.getKey());
                }
            }
            System.out.println("Cache stop");
        }
    }
    
    public static void main(String[] args) throws Exception{
        Cache cache = new Cache();
        cache.add("a", "1", 1);
        cache.add("b", "2", 2);
        cache.add("c", "3", 2);
        cache.add("d", "4", 4);
        cache.add("e", "5", 6);
        
        while(true) {
            String a = cache.get("a");
            String b = cache.get("b");
            String c = cache.get("c");
            String d = cache.get("d");
            String e = cache.get("e");
            
            if(a == null && b == null && c == null && d == null && e == null) {
                break;
            }
        }
        
        TimeUnit.SECONDS.sleep(1);
        cache.shutdown();
    }
    
}

延迟队列实现原理部分说明

  • 可重入锁 ReentrantLock
  • 优先队列 PriorityQueue

参考连接

以上是关于golang实现本地延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

基于2PC和延迟更新完成分布式消息队列多条事务Golang版本

基于Redisson实现延迟队列

MQ-死信队列实现消息延迟

RabbitMQ实现延迟消费(延迟队列)

DelayQueue延迟队列-实现缓存

消息队列 - 死信、延迟、重试队列