扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题

Posted Instanceztt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题相关的知识,希望对你有一定的参考价值。

一 引言

最近在项目中发现消息的延迟消费是通过redis的过期消息监听,存在很大的安全问题,由于服务的宕机或其他问题会导致消息的丢失,本想系采用延迟队列和死信队列完成消息的延迟消费,但这种方案存在一定的局限性,当队列中第一个消息未过期时,后面过期了的消息也不会投递到死信队列中,这样会造成消息的阻塞,这种方案对时间精度要求不是很高时,可以采用,但时间精度要求比较高时就会存在一定的局限性
rabbitmq官方给我提供了rabbitmq_delayed_message_exchange用于解决消息阻塞问题,本文重点介绍基于自定义springboot组件扩展mq实现延迟消息的消费问题.自定义mq组件扩展可以参考笔者的这篇文章自定义springboot组件–基于模板模式对原生springboot的rabbitmq组件进行扩展

二 引入mq延迟插件

2.1 下载插件

登录mq管理平台查看对应的版本号

在 RabbitMQ 的 3.5.7 版本之后,提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列 ,同时需保证 Erlang/OPT 版本为 18.0 之后。
我这里 MQ 的版本是 3.9.11,现在去 GitHub 上根据版本号下载插件
点击下载插件
根据自己的版本号自行下载即可

2.2 制作包含延迟插件的镜像

将下载好的插件安装mq,这里我们采用Dockerfile制作相应的镜像,避免每次启动docker时都去拷贝插件进行相应的插件部署

FROM rabbitmq:management
COPY ./rabbitmq_delayed_message_exchange-3.9.0.ez /plugins
#开启插件
RUN rabbitmq-plugins enable rabbitmq_delayed_message_exchange
#开启webUI
RUN rabbitmq-plugins enable rabbitmq_management

发布到自己的镜像仓库

三 扩展mq组件,集成延迟插件

申明自定义元数据

package com.cncloud.cncloud.common.mq.metadata;

import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;

/**
 * @author likun
 * @date 2022年11月21日 17:21
 */
public abstract class CustomMessageMetadata implements MessageMetadata

	@Override
	public ExchangeTypeEnum getExchangeType() 
		return ExchangeTypeEnum.CUSTOM;
	

	/**
	 * 获取交换机名
	 * @return
	 */
	public abstract String getExchange();

申明解析器

package com.cncloud.cncloud.common.mq.metadata.resolver;

import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;

import java.util.HashMap;
import java.util.Map;

/**
 * @author likun
 * @date 2022年11月21日 17:23
 */
public class CustomMessageMetadataResolver extends AbstractMessageMetadataResolver
	public CustomMessageMetadataResolver(RabbitAdmin rabbitAdmin) 
		super(rabbitAdmin);
	

	@Override
	public boolean isSupport(MessageMetadata messageMetadata) 
		return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());
	

	@Override
	public boolean doResolve(MessageMetadata messageMetadata) 
		CustomMessageMetadata customMessageMetadata = (CustomMessageMetadata) messageMetadata;
		Map<String, Object> args = messageMetadata.getQueueArgs();
		if (args==null)
			args=new HashMap();
			args.put("x-delayed-type", "direct");
		

		CustomExchange exchange = new CustomExchange(customMessageMetadata.getExchange(), "x-delayed-message", true, false, args);
		Queue queue = new Queue(customMessageMetadata.getQueue(), true, false, false, customMessageMetadata.getQueueArgs());

		Binding binding = BindingBuilder.bind(queue).to(exchange).with(messageMetadata.getQueue()).noargs();
		declareQueue(queue);
		declareExchange(exchange);
		declareBinding(binding);
		return true;
	

申明消息发射器

package com.cncloud.cncloud.common.mq.producer;

import com.alibaba.fastjson.JSON;
import com.cncloud.cncloud.admin.api.entity.MqSendLog;
import com.cncloud.cncloud.admin.api.feign.RemoteUserService;
import com.cncloud.cncloud.common.core.constant.SecurityConstants;
import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import com.cncloud.cncloud.common.core.util.SpringContextHolder;
import com.cncloud.cncloud.common.core.util.UniqueIdUtil;
import com.cncloud.cncloud.common.mq.common.Constant;
import com.cncloud.cncloud.common.mq.common.ExchangeTypeEnum;
import com.cncloud.cncloud.common.mq.metadata.CustomMessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.MessageMetadata;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.lang.NonNull;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;

/**
 * @author likun
 * @date 2021/6/23 15:22
 */
public class CustomMessageDelivery extends AbstractMessageDelivery<CustomMessageMetadata> 

    public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate) 
        super(messageMetadataResolver, rabbitTemplate);
    

    public CustomMessageDelivery(@NonNull MessageMetadataResolver messageMetadataResolver, @NonNull RabbitTemplate rabbitTemplate, @NonNull MessageNotDeliveredCallback messageNotDeliveredCallback, boolean autoDeclareMQ) 
        super(messageMetadataResolver, rabbitTemplate, messageNotDeliveredCallback, autoDeclareMQ);
    

    @Override
    public boolean isSupport(MessageMetadata messageMetadata) 
        return ExchangeTypeEnum.CUSTOM.equals(messageMetadata.getExchangeType());
    

    @Override
    protected <T> boolean doDeliver(CustomMessageMetadata messageMetadata, T data)
    	// 保存消息投递记录
		Message message = createMessage(data);
		String msgClsName = data.getClass().getName();
		String msgContent = JSON.toJSONString(data);
		MqSendLog mqSendLog = new MqSendLog();
		mqSendLog.setId(UniqueIdUtil.genId());
		mqSendLog.setExchange(messageMetadata.getExchange());
		mqSendLog.setMsgContent(msgContent);
		mqSendLog.setMsgClsName(msgClsName);
		mqSendLog.setMsgid(message.getMessageProperties().getCorrelationId());
		mqSendLog.setQueue(messageMetadata.getQueue());

		Date date = new Date(System.currentTimeMillis() + 1000 * 60 * Constant.MSG_TIMEOUT);
		Instant instant = date.toInstant();
		ZoneId zoneId = ZoneId.systemDefault();
		LocalDateTime localDateTime = instant.atZone(zoneId).toLocalDateTime();
		mqSendLog.setTrytime(localDateTime);
		RemoteUserService remoteUserService = SpringContextHolder.getBean(RemoteUserService.class);
		remoteUserService.saveMqSendLog(mqSendLog, SecurityConstants.FROM_IN);
		rabbitTemplate.convertAndSend(messageMetadata.getExchange(), messageMetadata.getQueue(),message,new CorrelationData(message.getMessageProperties().getCorrelationId()));

		return true;
    

    @Override
	public Message createMessage(Object data)
		String msgId = String.valueOf(UniqueIdUtil.genId());
		SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.setCorrelationId(msgId);
		// 自定义MessageProperties扩展
		if (data instanceof DelayMessage)
			DelayMessage delayMessage = (DelayMessage) data;
			Message message = simpleMessageConverter.toMessage(data, messageProperties);
			message.getMessageProperties().setHeader("x-delay", delayMessage==null?0L:delayMessage.getDelayTime());
			return message;
		
		Message message = simpleMessageConverter.toMessage(data, messageProperties);
		return message;
	

完善相应的静态代理

四 客户端实现消息的延迟消费

申明自定义message,必须实现DelayMessage, Serializable这两个接口

DelayMessage 用于组件中获得延迟时间,Serializable接口便于消息投递时消息的序列化

package com.cncloud.cncloud.common.core.message.base;

/**
 * @author likun
 * @date 2022年11月21日 17:43
 */
public interface DelayMessage 
	/**
	 * 获得延迟执行时间
	 * @return
	 */
	Long getDelayTime();

package com.cncloud.cncloud.common.core.message.dto;

import com.cncloud.cncloud.common.core.message.base.DelayMessage;
import lombok.Data;

import java.io.Serializable;

/**
 * @author likun
 * @date 2022年11月21日 17:47
 */
@Data
public class TestDelayMessage implements DelayMessage, Serializable 

	private static final long serialVersionUID = -516414641899843714L;

	/**
	 * 消息
	 */
	private String msg;

	/**
	 * 延迟时间
	 */
	private Long millisecond;

	@Override
	public Long getDelayTime() 
		return this.millisecond;
	

申明队列的元数据中响应的配置

public class DelayMessageMetadata extends CustomMessageMetadata 
	@Override
	public String getExchange() 
		return MessageMetadataConstant.DelayMessageMetadataConstant.EXCHANGE;
	

	@Override
	public String getQueue() 
		return MessageMetadataConstant.DelayMessageMetadataConstant.QUEUE;
	

package com.cncloud.cncloud.admin.message.config;


import com.cncloud.cncloud.admin.producer.matadata.CallDeadMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.CallTtlMessageMetadata;
import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author likun
 * @date 2021/6/23 16:14
 */
@Configuration
public class MessageConfig 
	/**
	 * 通话延迟队列元数据
	 * @return
	 */
	@Bean
	public CallTtlMessageMetadata callTtlMessageMetadata()
		return new CallTtlMessageMetadata();
	

	/**
	 * 通话死信队列元数据
	 * @return
	 */
	@Bean
	public CallDeadMessageMetadata callDeadMessageMetadata()
		return new CallDeadMessageMetadata();
	

	@Bean
	public DelayMessageMetadata delayMessageMetadata()
		return new DelayMessageMetadata();
	

申明消息监听器

package com.cncloud.cncloud.admin.message.consum;

import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.mq.consumer.SimpleDynamicMessageListener;
import com.cncloud.cncloud.common.mq.metadata.resolver.MessageMetadataResolver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @author likun
 * @date 2022年11月21日 17:54
 */
@Component
@Slf4j
public class DelayMessageListener extends SimpleDynamicMessageListener<TestDelayMessage> 
	public DelayMessageListener(MessageMetadataResolver messageMetadataResolver) 
		super(new DelayMessageMetadata(), messageMetadataResolver);
	

	@Override
	public void onMessage(TestDelayMessage testDelayMessage) 
		log.info("监听到客户端消息:,延迟时间为:",testDelayMessage.getMsg(),testDelayMessage.getDelayTime());
	


五 延迟消息测试

package com.cncloud.cncloud.admin.controller;


import com.cncloud.cncloud.admin.producer.matadata.DelayMessageMetadata;
import com.cncloud.cncloud.common.core.message.dto.TestDelayMessage;
import com.cncloud.cncloud.common.core.util.R;
import com.cncloud.cncloud.common.mq.producer.MessageDelivery;
import com.cncloud.cncloud.common.security.annotation.Inner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind以上是关于扩展自定义mq组件,使用rabbitmq_delayed_message_exchange延迟组件,完善消息延迟消息精度问题的主要内容,如果未能解决你的问题,请参考以下文章

element-ui组件扩展 - 自定义confirm弹窗

vue自定义指令clickoutside使用以及扩展用法

编辑器扩展基础3-自定义PropertyDrawer

Unity3D编辑器扩展——扩展自己的组件

使用 Swagger 的扩展组件Plugin 机制自定义API文档的生成

flex 自定义组件