一种异步延迟队列的实现方式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一种异步延迟队列的实现方式相关的知识,希望对你有一定的参考价值。

作者:京东零售 张路瑶

1.应用场景

目前系统中有很多需要用到延时处理的功能:支付超时取消、排队超时、短信、微信等提醒延迟发送、token刷新、会员卡过期等等。通过延时处理,极大的节省系统的资源,不必轮询数据库处理任务。

目前大部分功能通过定时任务完成,定时任务还分使用quartz及xxljob两种类型轮询时间短,每秒执行一次,对数据库造成一定的压力,并且会有1秒的误差。轮询时间久,如30分钟一次,03:01插入一条数据,正常3:31执行过期,但是3:30执行轮询时,扫描3:00-3:30的数据,是扫描不到3:31的数据的,需要4:00的时候才能扫描到,相当于多延迟了29分钟!

2.延时处理方式调研

1.DelayQueue

1.实现方式:

jvm提供的延迟阻塞队列,通过优先级队列对不同延迟时间任务进行排序,通过condition进行阻塞、睡眠dealy时间 获取延迟任务。

当有新任务加入时,会判断新任务是否是第一个待执行的任务,若是,会解除队列睡眠,防止新加入的元素时需要执行的元素而不能正常被执行线程获取到。

2.存在的问题:

1.单机运行,系统宕机后,无法进行有效的重试

2.没有执行记录和备份

3.没有重试机制

4.系统重启时,会将任务清空!

5.不能分片消费

3.优势:实现简单,无任务时阻塞,节省资源,执行时间准确

2.延迟队列mq

实现方式:依赖mq,通过设置延迟消费时间,达到延迟消费功能。像rabbitMq、jmq都可以设置延迟消费时间。RabbitMq通过将消息设置过期时间,放入死信队列进行消费实现。

存在的问题:

1.时间设置不灵活,每个queue是固定的到期时间,每次新创建延时队列,需要创建新的消息队列

优点:依靠jmq,可以有效的监控、消费记录、重试,具备多机同时消费能力,不惧怕宕机

3.定时任务

通过定时任务轮询符合条件的数据

缺点:

1.必须要读业务数据库,对数据库造成一定的压力,

2.存在延时

3.一次扫描数据量过大时,占用过多的系统资源。

4\\. 无法分片消费

优点:

1.消费失败后,下次还能继续消费,具备重试能力,

2.消费能力稳定

4.redis

任务存储在redis中,使用redis的 zset队列根据score进行排序,程序通过线程不断获取队列数据消费,实现延时队列

优点:

1、查询redis相比较数据库快,set队列长度过大,会根据跳表结构进行查询,效率高

2、redis可根据时间戳进行排序,只需要查询当前时间戳内的分数的任务即可

3、无惧机器重启

4、分布式消费

缺点:

1.受限于redis性能,并发10W

2.多个命令无法保证原子性,使用lua脚本会要求所有数据都在一个redis分片上。

5\\. 时间轮

通过时间轮实现的延迟任务执行,也是基于jvm单机运行,如kafka、netty都有实现时间轮,redisson的看门狗也是通过netty的时间轮实现的。

缺点:不适合分布式服务的使用,宕机后,会丢失任务。

一种异步延迟队列的实现方式_数据库

3.实现目标

兼容目前在使用的异步事件组件,并提供更可靠,可重试、有记录、可监控报警、高性能的延迟组件。

•消息传输可靠性:消息进入到延迟队列后,保证至少被消费一次。

•Client支持丰富:支持多重语言。

•高可用性:支持多实例部署。挂掉一个实例后,还有后备实例继续提供服务。

•实时性:允许存在一定的时间误差。

•支持消息删除:业务使用方,可以随时删除指定消息。

•支持消费查询

•支持手动重试

•对当前异步事件的执行增加监控

4.架构设计

一种异步延迟队列的实现方式_redis_02

5.延迟组件实现方式

1.实现原理

目前选择使用jimdb通过zset实现延时功能,将任务id和对应的执行时间作为score存在在zset队列中,默认会按照score排序,每次取0-当前时间内的score的任务id,

发送延迟任务时,会根据时间戳+机器ip+queueName+sequence 生成唯一的id,构造消息体,加密后放入zset队列中。

通过搬运线程,将达到执行时间的任务移动到发布队列中,等待消费者获取。

监控方通过集成ump

消费记录通过redis备份+数据库持久化完成。

通过缓存实现的方式,只是实现的一种,可以通过参数控制使用哪一种实现方式,并可通过spi自由扩展。

2.消息结构

每个Job必须包含一下几个属性:

•Topic:Job类型,即QueueName

•Id:Job的唯一标识。用来检索和删除指定的Job信息。

•Delay:Job需要延迟的时间。单位:秒。(服务端会将其转换为绝对时间)

•Body:Job的内容,供消费者做具体的业务处理,以json格式存储。

•traceId:发送线程的traceId,待后续pfinder支持设置traceId后,可与发送线程公用同一个traceiD,便于日志追踪

具体结构如下图表示:

一种异步延迟队列的实现方式_数据库_03

TTR的设计目的是为了保证消息传输的可靠性。

3.数据流转及流程图

一种异步延迟队列的实现方式_redis_04

基于redis-disruptor方式进行发布、消费,可以作为消息来进行使用,消费者采用原有异步事件的disruptor无锁队列消费,不同应用、不同queue之间无锁

1.支持应用只发布,不消费,达到消息队列的功能。

2:支持分桶,针对大key问题,若事件多,可以设置延迟队列和任务队列桶的数量,减小因大key造成的redis阻塞问题。

3: 通过ducc配置,进行性能的扩展,目前只支持开启消费和关闭消费。

4: 支持设置超时时间配置,防止消费线程执行过久

瓶颈: 消费速度慢,生产速度过快,会导致ringbuffer队列占满,当前应用既是生产者也是消费者时,生产者会休眠,性能取决于消费速度,可通过水平扩展机器,直接提升性能。监控redis队列的长度,若不断增长,可考虑增加消费者,直接提高性能。

可能出现的情况: 因一个应用公用一个disruptor,拥有64个消费者线程,如果某一个事件消费过慢,导致64个线程都在消费这个事件,会导致其他事件无消费线程消费,生产者线程也被阻塞,导致所有事件的消费都被阻塞。

后期观察是否有这个性能瓶颈,可给每一个queue一个消费者线程池。

6.demo示例

增加配置文件

判断是否开启jd.event.enable:true

<dependency> <groupId>com.jd.car</groupId>
<artifactId>senna-event</artifactId>
<version>1.0-SNAPSHOT</version> </dependency>

配置

jd:
senna:
event:
enable: true
queue:
retryEventQueue:
bucketNum: 1
handleBean: retryHandle

消费代码:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@Component("retryHandle")
public class RetryQueueEvent extends EventHandler

@Override
protected void onHandle(String key, String eventType)
log.info("Handler开始消费:", key);


@Override
protected void onDelayHandle(String key, String eventType)
log.info("delayHandler开始消费:", key);

注解形式:

package com.jd.car.senna.admin.event;

import com.jd.car.senna.event.EventHandler;
import com.jd.car.senna.event.annotation.SennaEvent;
import lombok.extern.slf4j.Slf4j;

/**
* @author zhangluyao
* @description
* @create 2022-02-21-9:54 下午
*/
@Slf4j
@SennaEvent(queueName = "testQueue", bucketNum = 5,delayBucketNum = 5,delayEnable = true)
public class TestQueueEvent extends EventHandler

@Override
protected void onHandle(String key, String eventType)
log.info("Handler开始消费:", key);


@Override
protected void onDelayHandle(String key, String eventType)
log.info("delayHandler开始消费:", key);

发送代码

package com.jd.car.senna.admin.controller;

import com.jd.car.senna.event.queue.IEventQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;


/**
* @author zly
*/
@RestController
@Slf4j
public class DemoController

@Lazy
@Resource(name = "testQueue")
private IEventQueue eventQueue;

@ResponseBody
@GetMapping("/api/v1/demo")
public String demo()
log.info("发送无延迟消息");
eventQueue.push("no delay 5000 millseconds message 3");
return "ok";


@ResponseBody
@GetMapping("/api/v1/demo1")
public String demo1()
log.info("发送延迟5秒消息");
eventQueue.push(" delay 5000 millseconds message,name",1000*5L);
return "ok";


@ResponseBody
@GetMapping("/api/v1/demo2")
public String demo2()
log.info("发送延迟到2022-04-02 00:00:00执行的消息");
eventQueue.push(" delay message,name to 2022-04-02 00:00:00", new Date(1648828800000));
return "ok";


参考有赞设计:​​https://tech.youzan.com/queuing_delay/​

7.目前应用:

1.云修到店排队24小时后自动取消

2..美团请求token定时刷新。

3.质保卡延期24小时生成

5\\. 结算单延期生成

6.短信延迟发送

rabbitmq 实现延迟队列的两种方式

转载请注明出处


ps: 文章里面延迟队列=延时队列

什么是延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到智能设备。


RabbitMQ如何实现迟队列

方法一

AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

RabbitMQ针对队列中的消息过期时间有两种方法可以设置。

  • A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
  • B: 对消息进行单独设置,每条消息TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

  • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

队列出现dead letter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

综合上述两个特性,设置了TTL规则之后当消息在一个队列中变成死信时,利用DLX特性它能被重新转发到另一个Exchange或者Routing Key,这时候消息就可以重新被消费了。

设置方法:

第一步:设置TTL产生死信,有两种方式Per-Message TTL和 Queue TTL,第一种可以针对每一条消息设置一个过期时间使用于大多数场景,第二种针对队列设置过期时间、适用于一次性延时任务的场景

还有其他产生死信的方式比如消费者拒绝消费 basic.reject 或者 basic.nack ( 前提要设置消费者的属性requeue=false)

  • Per-Message TTL (对每一条消息设置一个过期时间)(官方文档

java client发送一条只能驻留60秒的消息到队列:

byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");//设置消息的过期时间为60秒
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
//这条消息发送到相应的队列之后,如果60秒内没有被消费,则变为死信
  • Queue TTL (对整个队列设置一个过期时间)

创建一个队列,队列的消息过期时间为30分钟(这个队列30分钟内没有消费者消费消息则删除,删除后队列内的消息变为死信)

java client方式:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

rabbitmqctl命令方式(.* 为所有队列, 可以替换为指定队列):
rabbitmqctl set_policy expiry ".*" '"expires":1800000' --apply-to queues

rabbitmqctl (Windows):
rabbitmqctl set_policy expiry ".*" """expires"":1800000" --apply-to queues

第二步:设置死信的转发规则(如果没有任何规则,则直接丢弃死信)

Java Client方式:
//声明一个直连模式的exchange
channel.exchangeDeclare("some.exchange.name", "direct");
//声明一个队列,当myqueue队列中有死信产生时,会转发到交换器some.exchange.name
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");

//如果设置死信会以路由键some-routing-key转发到some.exchange.name,如果没设默认为消息发送到本队列时用的routing key
//args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);

命令行方式(.* 为所有队列, 可以替换为指定队列):
设置 "dead-letter-exchange"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" '"dead-letter-exchange":"my-dlx"' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" """dead-letter-exchange"":""my-dlx""" --apply-to queues
设置 "dead-letter-routing-key"
rabbitmqctl:
rabbitmqctl set_policy DLX ".*" ' "dead-letter-routing-key":"my-routing-key"' --apply-to queues
rabbitmqctl (Windows):
rabbitmqctl set_policy DLX ".*" """dead-letter-routing-key"":""my-routing-key""" --apply-to queues

方法二

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

安装:

进入插件安装目录
rabbitmq-server/plugins/(可以查看一下当前已存在的插件)
下载插件
rabbitmq_delayed_message_exchange

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

(如果下载的文件名称不规则就手动重命名一下如:
rabbitmq_delayed_message_exchange-0.0.1.ez)

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(关闭插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

插件使用

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

发送消息的时候通过在header添加"x-delay"参数来控制消息的延时时间

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...

使用示例:

消息发送端:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send 
	// 队列名称
	private final static String EXCHANGE_NAME="delay_exchange";
	private final static String ROUTING_KEY="key_delay";

	@SuppressWarnings("deprecation")
	public static void main(String[] argv) throws Exception 
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.12.190");
		factory.setUsername("admin");
		factory.setPassword("admin");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

		// 声明x-delayed-type类型的exchange
		Map<String, Object> args = new HashMap<String, Object>();
		args.put("x-delayed-type", "direct");
		channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
				false, args);


		Map<String, Object> headers = new HashMap<String, Object>();
		//设置在2016/11/04,16:45:12向消费端推送本条消息
		Date now = new Date();
		Date timeToPublish = new Date("2016/11/04,16:45:12");

		String readyToPushContent = "publish at " + sf.format(now)
				+ " \\t deliver at " + sf.format(timeToPublish);

		headers.put("x-delay", timeToPublish.getTime() - now.getTime());

		AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
				.headers(headers);
		channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
				readyToPushContent.getBytes());

		// 关闭频道和连接
		channel.close();
		connection.close();
	

消息接收端:

import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv 

	// 队列名称
	private final static String QUEUE_NAME = "delay_queue";
	private final static String EXCHANGE_NAME="delay_exchange";

	public static void main(String[] argv) throws Exception,
			java.lang.InterruptedException 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("192.168.12.190");
		factory.setUsername("admin");
		factory.setPassword("admin");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

		channel.queueDeclare(QUEUE_NAME, true,false,false,null);
		channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
		channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
		SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
		try 
			System.out.println("****************WAIT***************");
			while(true)
				QueueingConsumer.Delivery delivery = queueingConsumer
						.nextDelivery(); //

				String message = (new String(delivery.getBody()));
				System.out.println("message:"+message);
				System.out.println("now:\\t"+sf.format(new Date()));
			

		 catch (Exception exception) 
			exception.printStackTrace();

		

	

启动接收端,启动发送端
运行结果:

****************WAIT***************
message:publish at 2016-11-04 16:44:16.887 	 deliver at 2016-11-04 16:45:12.000
now:	2016-11-04 16:45:12.023

结果显示在我们2016-11-04 16:45:12.023接收到了消息,距离我们设定的时间2016-11-04 16:45:12.023有23毫秒的延迟

Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用

Note :使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点

插件开发者:
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.

以上是关于一种异步延迟队列的实现方式的主要内容,如果未能解决你的问题,请参考以下文章

使用RabbitMQ的死信队列实现延迟消息

使用RabbitMQ的死信队列实现延迟消息

使用RabbitMQ的死信队列实现延迟消息

SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)

SpringBoot整合RabbitMQ实现死信队列

rabbitmq死信队列及延迟队列