扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题
Posted Instanceztt
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题相关的知识,希望对你有一定的参考价值。
一 引言
最近在项目中发现消息的延迟消费是通过redis的过期消息监听,存在很大的安全问题,由于服务的宕机或其他问题会导致消息的丢失,本想系采用延迟队列和死信队列完成消息的延迟消费,但这种方案存在一定的局限性,当队列中第一个消息未过期时,后面过期了的消息也不会投递到死信队列中,这样会造成消息的阻塞,这种方案对时间精度要求不是很高时,可以采用,但时间精度要求比较高时就会存在一定的局限性
rabbitmq官方给我提供了rabbitmq_delayed_message_exchange用于解决消息阻塞问题,本文重点介绍基于自定义springboot组件扩展mq实现延迟消息的消费问题.自定义mq组件扩展可以参考笔者的这篇文章自定义springboot组件–基于模板模式对原生springboot的rabbitmq组件进行扩展
二 引入mq延迟插件
2.1 下载插件
登录mq管理平台查看对应的版本号
在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
我这里 MQ 的版本是 3.9.11,现在去 GitHub 上根据版本号下载插件
点击下载插件
根据自己的版本号自行下载即可
2.2 制作包含延迟插件的镜像
将下载好的插件安装mq,这里我们采用Dockerfile制作相应的镜像,避免每次启动docker时都去拷贝插件进行相应的插件部署
FROM rabbitmq:management
COPY ./rabbitmq_delayed_message_exchange-3.9.0.ez /plugins
#开启插件
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#开启webUI
RUN rabbitmq-plugins enable rabbitmq_management
发布到自己的镜像仓库
三 扩展mq组件,集成延迟插件
申明自定义元数据
package com.cncloud.cncloud.common.mq.metadata;
import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
/**
* @author likun
* @date 2022年11月21日 17:21
*/
public abstract class CustomMessageMetadata implements MessageMetadata
@Override
public ExchangeTypeEnum getExchangeType()
return ExchangeTypeEnum.CUSTOM;
/**
* 获取交换机名
* @return
*/
public abstract String getExchange();
申明解析器
package com.cncloud.cncloud.common.mq.metadata.resolver;
import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import java.util.HashMap;
import java.util.Map;
/**
* @author likun
* @date 2022年11月21日 17:23
*/
public class CustomMessageMetadataResolver extends AbstractMessageMetadataResolver
public CustomMessageMetadataResolver(RabbitAdmin rabbitAdmin)
super(rabbitAdmin);
@Override
public boolean isSupport(MessageMetadata messageMetadata)
return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());
@Override
public boolean doResolve(MessageMetadata messageMetadata)
CustomMessageMetadata customMessageMetadata = (CustomMessageMetadata) messageMetadata;
Map<String, Object> args = messageMetadata.getQueueArgs();
if (args==null)
args=new HashMap();
args.put("x-delayed-type", "direct");
CustomExchange exchange = new CustomExchange(customMessageMetadata.getExchange(), "x-delayed-message", true, false, args);
Queue queue = new Queue(customMessageMetadata.getQueue(), true, false, false, customMessageMetadata.getQueueArgs());
Binding binding = BindingBuilder.bind(queue).to(exchange).with(messageMetadata.getQueue()).noargs();
declareQueue(queue);
declareExchange(exchange);
declareBinding(binding);
return true;
申明消息发射器
package com.cncloud.cncloud.common.mq.producer;
import com.alibaba.fastjson.JSON;
import com.cncloud.cncloud.admin.api.entity.MqSendLog;
import com.cncloud.cncloud.admin.api.feign.RemoteUserService;
import com.cncloud.cncloud.common.core.constant.SecurityConstants;
import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import com.cncloud.cncloud.common.core.util.SpringContextHolder;
import com.cncloud.cncloud.common.core.util.UniqueIdUtil;
import com.cncloud.cncloud.common.mq.common.Constant;
import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.lang.NonNull;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
/**
* @author likun
* @date 2021/6/23 15:22
*/
public class CustomMessageDelivery extends AbstractMessageDelivery<CustomMessageMetadata>
public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate)
super(messageMetadataResolver, rabbitTemplate);
public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate, @NonNull MessageNotDeliveredCallback messageNotDeliveredCallback, boolean autoDeclareMQ)
super(messageMetadataResolver, rabbitTemplate, messageNotDeliveredCallback, autoDeclareMQ);
@Override
public boolean isSupport(MessageMetadata messageMetadata)
return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());
@Override
protected <T> boolean doDeliver(CustomMessageMetadata messageMetadata, T data)
// 保存消息投递记录
Message message = createMessage(data);
String msgClsName = data.getClass().getName();
String msgContent = JSON.toJSONString(data);
MqSendLog mqSendLog = new MqSendLog();
mqSendLog.setId(UniqueIdUtil.genId());
mqSendLog.setExchange(messageMetadata.getExchange());
mqSendLog.setMsgContent(msgContent);
mqSendLog.setMsgClsName(msgClsName);
mqSendLog.setMsgid(message.getMessageProperties().getCorrelationId());
mqSendLog.setQueue(messageMetadata.getQueue());
Date date = new Date(System.currentTimeMillis() + 1000 * 60 * Constant.MSG_TIMEOUT);
Instant instant = date.toInstant();
ZoneId zoneId = ZoneId.systemDefault();
LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime();
mqSendLog.setTrytime(localDateTime);
RemoteUserService remoteUserService = SpringContextHolder.getBean(RemoteUserService.class);
remoteUserService.saveMqSendLog(mqSendLog, SecurityConstants.FROM_IN);
rabbitTemplate.convertAndSend(messageMetadata.getExchange(), messageMetadata.getQueue(),message,new CorrelationData(message.getMessageProperties().getCorrelationId()));
return true;
@Override
public Message createMessage(Object data)
String msgId = String.valueOf(UniqueIdUtil.genId());
SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setCorrelationId(msgId);
// 自定义MessageProperties扩展
if (data instanceof DelayMessage)
DelayMessage delayMessage = (DelayMessage) data;
Message message = simpleMessageConverter.toMessage(data, messageProperties);
message.getMessageProperties().setHeader("x-delay", delayMessage==null?0L:delayMessage.getDelayTime());
return message;
Message message = simpleMessageConverter.toMessage(data, messageProperties);
return message;
完善相应的静态代理
四 客户端实现消息的延迟消费
申明自定义message,必须实现DelayMessage, Serializable这两个接口
DelayMessage 用于组件中获得延迟时间,Serializable接口便于消息投递时消息的序列化
package com.cncloud.cncloud.common.core.message.base;
/**
* @author likun
* @date 2022年11月21日 17:43
*/
public interface DelayMessage
/**
* 获得延迟执行时间
* @return
*/
Long getDelayTime();
package com.cncloud.cncloud.common.core.message.dto;
import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import lombok.Data;
import java.io.Serializable;
/**
* @author likun
* @date 2022年11月21日 17:47
*/
@Data
public class TestDelayMessage implements DelayMessage, Serializable
private static final long serialVersionUID = -516414641899843714L;
/**
* 消息
*/
private String msg;
/**
* 延迟时间
*/
private Long millisecond;
@Override
public Long getDelayTime()
return this.millisecond;
申明队列的元数据中响应的配置
public class DelayMessageMetadata extends CustomMessageMetadata
@Override
public String getExchange()
return MessageMetadataConstant.DelayMessageMetadataConstant.EXCHANGE;
@Override
public String getQueue()
return MessageMetadataConstant.DelayMessageMetadataConstant.QUEUE;
package com.cncloud.cncloud.admin.message.config;
import com.cncloud.cncloud.admin.producer.matadata.CallDeadMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.CallTtlMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author likun
* @date 2021/6/23 16:14
*/
@Configuration
public class MessageConfig
/**
* 通话延迟队列元数据
* @return
*/
@Bean
public CallTtlMessageMetadata callTtlMessageMetadata()
return new CallTtlMessageMetadata();
/**
* 通话死信队列元数据
* @return
*/
@Bean
public CallDeadMessageMetadata callDeadMessageMetadata()
return new CallDeadMessageMetadata();
@Bean
public DelayMessageMetadata delayMessageMetadata()
return new DelayMessageMetadata();
申明消息监听器
package com.cncloud.cncloud.admin.message.consum;
import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.mq.consumer.SimpleDynamicMessageListener;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @author likun
* @date 2022年11月21日 17:54
*/
@Component
@Slf4j
public class DelayMessageListener extends SimpleDynamicMessageListener<TestDelayMessage>
public DelayMessageListener(MessageMetadataResolver messageMetadataResolver)
super(new DelayMessageMetadata(), messageMetadataResolver);
@Override
public void onMessage(TestDelayMessage testDelayMessage)
log.info("监听到客户端消息:,延迟时间为:",testDelayMessage.getMsg(),testDelayMessage.getDelayTime());
五 延迟消息测试
package com.cncloud.cncloud.admin.controller;
import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.core.util.R;
import com.cncloud.cncloud.common.mq.producer.MessageDelivery;
import com.cncloud.cncloud.common.security.annotation.Inner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind以上是关于扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题的主要内容,如果未能解决你的问题,请参考以下文章