谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)
Posted 飘渺Jam
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)相关的知识,希望对你有一定的参考价值。
前言
在开发中经常会遇到延时任务的需求,例如在12306购买车票,若生成订单30分钟未支付则自动取消;还有在线商城完成订单后48小时不评价 ,自动5星好评。像这类在某事件触发后一段时间内执行的需求任务我们称之为 延时任务。
那么如何实现延迟任务呢?
第一反应是利用cron方案来实现:
启动一个cron定时任务,每隔一段时间执行一次,比如30分钟,找到那些超时的数据,直接更新状态,或者拿出来执行一些操作。如果数据量比较大,需要分页查询,分页update,这将是一个for循环更新操作。
cron方案是很常见的一种方案,但是常见的不一定是最好的,主要有以下几个问题:
当数据量大的时候轮询效率低;
时效性不够好,如果每小时轮询一次,最差的情况时间误差会达到1小时;
如果通过增加cron轮询频率来减少时间误差,则会出现轮询低效和重复计算的问题;
既然cron方案不是很理想,那就请出我们今天的主角,使用RocketMQ的延时消息解决。在创建订单的时候发送一条延时消息到RocketMQ,30分钟后消费者消费消息去检查订单的状态,如果发现订单未支付则取消订单释放库存。
实现
RocketMQ延迟队列的核心思路是:所有的延迟消息由producer发出之后,都会存放到同一个topic(SCHEDULE_TOPIC_XXXX)下,不同的延迟级别会对应不同的队列序号,当延迟时间到之后,由定时线程读取转换为普通的消息存的真实指定的topic下,此时对于consumer端此消息才可见,从而被consumer消费。
注意: RocketMQ不支持任意时间的延时,只支持以下几个固定的延时等级
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
下面我们结合SprintBoot利用RocketMQ发送延时消息
引入RocketMQ组件
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
增加RocketMQ的配置
rocketmq:
name-server: 172.31.0.44:9876
producer:
group: delay-group
编写生产者
@Component
@Slf4j
public class DelayProduce
@Autowired
private RocketMQTemplate rocketMQTemplatet;
public void sendDelayMessage(String topic,String message,int delayLevel)
SendResult sendResult = rocketMQTemplatet.syncSend(topic, MessageBuilder.withPayload(message).build(), 2000, delayLevel);
log.info("sendtime is ", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
log.info("sendResult is",sendResult);
编写消费者
@Slf4j
@Component
@RocketMQMessageListener(
topic = "delay-topic",
consumerGroup = "delay-group"
)
public class DelayConsumer implements RocketMQListener<String>
@Override
public void onMessage(String message)
log.info("received message time is ", DateTimeFormatter.ofPattern("yyyy年MM月dd日 HH:mm:ss").format(LocalDateTime.now()));
log.info("received message is ",message);
测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class DelayProduceTest
@Autowired
private DelayProduce delayProduce;
@Test
public void sendDelayMessage()
delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",5);
这里delayLevel设置成5,对应RocketMQ的延时等级就是1分钟后投递消息。
运行结果
修改延时级别
RocketMQ的延迟等级可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持1天的延迟,修改最后一个level的值为1d,这个时候依然是18个level;也可以增加一个1d,这个时候总共就有19个level。
打开RocketMQ的配置文件,修改
messageDelayLevel
属性
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
storePathRootDir = /app/rocketmq/data
messageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
这次将延时等级1修改成了90s,生产者发送消息后需要90s后再进行消息投递。修改完成后重启RocketMQ。
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &
使用延时等级1发送消息
public void sendDelayMessage()
delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知录",1);
测试
通过比对发送时间与消费时间证明延时等级修改生效。
以上,希望对你有所帮助。
文末送书
趁此机会,本次联合机械工业出版社再给大家送两本新上市的 《RocketMQ技术内幕:RocketMQ架构设计与实现原理(第2版)》。
老规矩:在本文留言,随机从留言中抽取2本。朋友圈公布中奖名单(微信在最后),给你免费包邮到家!
这是一本指导你如何在实践中让RocketMQ实现高性能、高可用、高吞吐量和低延迟的著作。作者是RocketMQ官方认定的“优秀布道师”和技术专家,持续在RocketMQ领域深耕。本书从源码的角度分析了RocketMQ的技术架构和实现原理,第1版获得了良好的口碑,是RocketMQ领域的标志性作品,第2版做了大幅度更新,在进入源码分析之前,首先通过图文的方式,提炼出RocketMQ的核心工作机制,降低源码阅读的难度,引发思考。
一定要记得添加我好友,以免错过中奖通知
以上是关于谁让你再使用cron发送延时消息,你直接给他一jio!(文末送书)的主要内容,如果未能解决你的问题,请参考以下文章