rocketmq延时消息自定义配置;topic下tag使用
Posted 好大的月亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq延时消息自定义配置;topic下tag使用相关的知识,希望对你有一定的参考价值。
概述
使用的是开源版本的rocketmq4.9.4
rocketmq
也是支持延时消息的。
rocketmq
一般是4个部分:
nameserver
:保存路由信息broker
:保存消息- 生产者:生产消息
- 消费者:消费消息
延时消息的处理是在其中的broker
中。
但是rocketmq
不支持自定义延时消息,rabbitmq倒是可以,但也有延时时间上限.
rocketmq支持18个等级的延时时间
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker
在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX
,根据延迟level
的个数,创建对应数量的队列,也就是说18
个level
对应了18
个队列。注意,这并不是说这个内部主题只会有18
个队列,因为Broker
通常是集群模式部署的,因此每个节点都有18
个队列。
延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level
。例如:你想支持2
天的延迟,修改最后一个level
的值为2d
,这个时候依然是18
个level
;也可以增加一个2d
,这个时候总共就有19
个level
。
在检查某一个延时队列中的消息过期时,只会检查第一个队列元素,第一个没过期后面的元素就不会再去检测.
延时消息的流转过程
这边捞一张网图
增加一个延时队列等级
按照原理,broker中根据18个延时等级创建了18个队列来监控,那么只需要再增加延时等级个数,那么broker自然就会再新增一个队列来监控。
比如在broker的配置文件中增加一个延时等级为19的延时15秒的配置
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = localhost:9876
brokerIP1 = 192.168.0.89
brokerIP2 = 192.168.0.89
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 15s
测试结果成功
topic下tag使用
有时候同一个topic
下还想继续分组,那么此时可以使用tag
来进一步的区分。
坑点:同一个
consumeGroup
并且同一个topic
的订阅者,如果2
个实例订阅的不同的tag
,那么可能会发生消息丢失。
因为往topic
队列中存数据时是时按照全部队列去分配的,但是队列1
和队列5
分属不同的tag
,那么实例a
只订阅了tag_a
,因此被实例a
订阅的队列中只有tag_a
被消费了,tag_b
就还在队列中没有被消费。造成消息丢失的假象。
一般情况下都是同一个消费者启动多个实例,所以tag_a
和tag_b
都是有订阅的。
捞一张网图
tag的使用demo
使用的时候要注意,springboot
下默认的消费者监听了所有的tag
,所以如果没有具体的tag
消费者,那么就会被默认监听所有tag
的当前topic
所消费。同理,如果同时存在监听所有tag
和具体tag
的消费者,那么就会产生广播的效果。
举例说明
实例下名为Q
的topic
下有2个监听着,第一个监听tag=“*”
,第二个监听tag=“666”
。那么给Q topic
发送tag
为666
的消息时,这两者都会收到消息。
发送demo
就是在topic
后面拼接“:”
即可,发送的时候只能指定一个tag
,但是监听可以监听多个tag
监听多个tag
的则用“||”
分隔
发送消息
public void sendMsgTag(@RequestBody Map<String,Object> map)
String topic = "efg";
String topicTag = topic.concat(":").concat(((String) map.get("tag")));
//异步发送
org.apache.rocketmq.spring.core.RocketMQTemplate.asyncSend(topicTag, map, new SendCallback()
@Override
public void onSuccess(SendResult sendResult)
log.info("发送:,成功", topicTag);
@Override
public void onException(Throwable e)
log.info("发送:,失败", topicTag);
);
监听
mq:
consumerGroup: mq
tag: 666||777
package com.fchan.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
@RocketMQMessageListener(
consumerGroup = "$mq.consumerGroup",
topic = "efg",
//selectorType 默认就是tag
selectorType = SelectorType.TAG,
selectorExpression = "$mq.tag"
)
@Slf4j
public class MqListenerTag implements RocketMQListener<String>
@Override
public void onMessage(String s)
log.info("时间收到了mq消息:", LocalDateTime.now(), s);
以上是关于rocketmq延时消息自定义配置;topic下tag使用的主要内容,如果未能解决你的问题,请参考以下文章