自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展
Posted Instanceztt
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展相关的知识,希望对你有一定的参考价值。
一 引入
在我们的日常开发中,消息中间件已经成为了java研发工程师的一项必备技能,本文主要是基于对springboot原生组件的扩展开发,基于模板设计模式和静态代理模式,简化了队列路由的绑定,交由公共模板进行统一的绑定,并在公用模板中保证了消息的幂等性和消息的可靠性投递,将这些类似的代码抽离出来,让开发者只专注于业务逻辑的开发.
整体实现思路:
- 开发者申明路由交换机等基础元数据后,交由元数据解析器完成交换机,路由的申明及相应关系的绑定
- 开发者申明消息监听器后,通过消息侦听容器完成队列与监听器的绑定
- 开发者通过消息发射器,发布消息后,系统通过不通模板完成消息的发送
二 逻辑实现
2.1 元数据解析器的构建
申明基础信息接口
/**
* 用于定义交换机队列等核心参数 便于队列交换机等初始化
* @author likun
* @date 2022/6/17 11:22
*/
public interface MessageMetaData
/**
* 获取队列名称
* @return
*/
String getQueue();
/**
* 交换机类型
* @return
*/
ExchangeTypeEnum getExchangeType();
/**
* 队列配置
* @return
*/
default Map<String,Object> getQueueArgs()
return null;
;
/**
* 交换机配置
* @return
*/
default Map<String,Object> getExchangeArgs()
return null;
;
/**
* 消息扩展属性
* @return
*/
default MessageProperties getMessageProperties()
return null;
;
default void setMessageProperties(MessageProperties messageProperties)
;
定义不同的元数据模板
public abstract class FanoutMessageMetaData implements MessageMetaData
private MessageProperties messageProperties = null;
@Override
public ExchangeTypeEnum getExchangeType()
return ExchangeTypeEnum.FANOUT;
/**
* 交换机名称
* @return
*/
abstract public String getExchange();
@Override
public MessageProperties getMessageProperties()
return this.messageProperties;
@Override
public void setMessageProperties(MessageProperties messageProperties)
this.messageProperties=messageProperties;
public abstract class DirectMessageMetadata implements MessageMetaData
private MessageProperties messageProperties;
@Override
public ExchangeTypeEnum getExchangeType()
return ExchangeTypeEnum.DIRECT;
/**
* 交换机名称
* @return
*/
abstract public String getExchange();
@Override
public MessageProperties getMessageProperties()
return this.messageProperties;
@Override
public void setMessageProperties(MessageProperties messageProperties)
this.messageProperties=messageProperties;
申明元数据解析接口,定义解析规范
/**
* 判断是否支持当前交换机类型
* @author likun
* @date 2022/6/17 11:44
*/
public interface Support
Boolean support(ExchangeTypeEnum exchangeTypeEnum);
/**
* 核心参数解析器 解析核心参数并完成队列交换机的绑定
* @author likun
* @date 2022/6/17 11:42
*/
public interface MessageMetaDataResolver extends Support
/**
* 解析核心参数
* @param messageMetaData
*/
void resolve(MessageMetaData messageMetaData);
定义不同模板的解析器
@RequiredArgsConstructor
@Slf4j
public abstract class AbstractMessageMetadataResolver implements MessageMetaDataResolver
private final RabbitAdmin rabbitAdmin;
@Override
public void resolve(MessageMetaData messageMetaData)
if (support(messageMetaData.getExchangeType()))
doResolve(messageMetaData);
/**
* 下游子类实现
* @param messageMetaData
*/
abstract void doResolve(MessageMetaData messageMetaData);
/**
* 申明队列
*/
public void declareQueue(Queue queue)
rabbitAdmin.declareQueue(queue);
log.info("queue [] declared.",queue);
public void declareExchange(Exchange exchange)
rabbitAdmin.declareExchange(exchange);
log.info("exchange [] declared.",exchange);
public void declareBinding(Binding binding)
rabbitAdmin.declareBinding(binding);
log.info("binding [] declared.",binding);
public class DirectMessageMetadataResolver extends AbstractMessageMetadataResolver
public DirectMessageMetadataResolver(RabbitAdmin rabbitAdmin)
super(rabbitAdmin);
@Override
public Boolean support(ExchangeTypeEnum exchangeTypeEnum)
return ExchangeTypeEnum.DIRECT.equals(exchangeTypeEnum);
@Override
void doResolve(MessageMetaData messageMetaData)
DirectMessageMetadata directMessageMetadata = (DirectMessageMetadata) messageMetaData;
// 申明队列
Queue queue = new Queue(directMessageMetadata.getQueue(), true, false, false, directMessageMetadata.getQueueArgs());
declareQueue(queue);
// 申明交换机
DirectExchange exchange = new DirectExchange(directMessageMetadata.getExchange(), true, false, directMessageMetadata.getExchangeArgs());
declareExchange(exchange);
// 交换机绑定队列
Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();
declareBinding(binding);
public class FanoutMessageMetaDateResolver extends AbstractMessageMetadataResolver
public FanoutMessageMetaDateResolver(RabbitAdmin rabbitAdmin)
super(rabbitAdmin);
@Override
public Boolean support(ExchangeTypeEnum exchangeTypeEnum)
return ExchangeTypeEnum.FANOUT.equals(exchangeTypeEnum);
@Override
void doResolve(MessageMetaData messageMetaData)
FanoutMessageMetaData FanoutMessageMetaData = (FanoutMessageMetaData) messageMetaData;
// 申明队列
Queue queue = new Queue(FanoutMessageMetaData.getQueue(), true, false, false, FanoutMessageMetaData.getQueueArgs());
declareQueue(queue);
// 申明交换机
FanoutExchange exchange = new FanoutExchange(FanoutMessageMetaData.getExchange(), true, false, FanoutMessageMetaData.getExchangeArgs());
declareExchange(exchange);
// 队列绑定交换机
Binding binding = BindingBuilder.bind(queue).to(exchange);
declareBinding(binding);
申明静态代理器
/**
* 委派解析器
* @author likun
* @date 2022年06月17日 14:05
*/
@Slf4j
public class DelegatingMessageMetaDataResolver implements MessageMetaDataResolver
private final Map<ExchangeTypeEnum,MessageMetaDataResolver> messageMetaDataResolverMap = new ConcurrentHashMap<>();
public DelegatingMessageMetaDataResolver(Map<ExchangeTypeEnum,MessageMetaDataResolver> messageMetaDataResolverMap)
this.messageMetaDataResolverMap.putAll(messageMetaDataResolverMap);
@Override
public Boolean support(ExchangeTypeEnum exchangeTypeEnum)
return true;
@Override
public void resolve(MessageMetaData messageMetaData)
if (messageMetaData==null)
log.error("metaDate resolve must have messageMetaData but is null");
return;
MessageMetaDataResolver messageMetaDataResolver = messageMetaDataResolverMap.get(messageMetaData.getExchangeType());
if (messageMetaDataResolver==null)
log.error("messageMetaDataResolver is null");
return;
messageMetaDataResolver.resolve(messageMetaData);
元数据初始化器
/**
* 元数据核心参数初始化器
* @author likun
* @date 2022年06月17日 15:43
*/
@RequiredArgsConstructor
@Slf4j
public class MessageMetaDataInitalizer implements InitializingBean, Ordered
private final XlcpMqProperties xlcpMqProperties;
private final MessageMetaDataResolver messageMetaDataResolver;
@Override
public void afterPropertiesSet() throws Exception
log.info("autoCreateMq configured with []",xlcpMqProperties.getAutoCreateMq());
if (xlcpMqProperties.getAutoCreateMq())
Map<String, MessageMetaData> messageMetaDataMap = SpringContextHolder.getBeansOfType(MessageMetaData.class);
messageMetaDataMap.forEach((key,messageMetaData)->
log.info("Mq auto declared with metaDate []",messageMetaData);
messageMetaDataResolver.resolve(messageMetaData);
);
@Override
public int getOrder()
return Ordered.LOWEST_PRECEDENCE;
将静态代理器和初始器交由spring容器管理
2.2 动态消息监听器构建
申明抽象消息监听器
@Slf4j
public abstract class DynamicMessageListener<T> extends AbstractAdaptableMessageListener
private final MessageMetaData messageMetaData;
public static final String MQ_MESSAGE_ID_PREFIX = "mq:message:id";
public DynamicMessageListener(MessageMetaData messageMetaData)
this.messageMetaData=messageMetaData;
public void doExcute(Message message, Channel channel, Consumer<Object> consumer) throws IOException
MessageProperties messageProperties=message.getMessageProperties();
String messageId=messageProperties.getMessageId();
// 校验当前消息是否被消费
RedisTemplate redisTemplate=SpringContextHolder.getBean(RedisTemplate.class);
redisTemplate.setKeySerializer(new StringRedisSerializer());
String mqMessageMqKey = String.format("%s%s%s",MQ_MESSAGE_ID_PREFIX, StrPool.COLON,messageId);
if (redisTemplate.hasKey(mqMessageMqKey))
ackOk(message,channel);
log.info("当前消息已经被消费了,消息id为:",messageId);
return;
MessageConverter messageConverter=getMessageConverter();
Object convetMessag= messageConverter.fromMessage(message);
Boolean ackFalg = Boolean.FALSE;
try
consumer.accept(convetMessag);
// 存入redis中确保消息已经被消费了 保存30分钟
redisTemplate.opsForValue().set(mqMessageMqKey,"",30, TimeUnit.MINUTES);
catch (Exception exception)
exception.printStackTrace();
// 签收失败将消息重新投递到队列中
ackFalg=onError(message,channel);
finally
if (!ackFalg)
ackOk(message, channel);
protected Boolean onError(Message message,Channel channel) throws IOException
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
return Boolean.TRUE;
protected void ackOk(Message message,Channel channel) throws IOException
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
public MessageMetaData getMessageMetaData()
return messageMetaData;
申明单消息监听器模板和批量消息监听模板
public abstract class SingleDynamicMessageListener<T> extends DynamicMessageListener<T>
public SingleDynamicMessageListener(MessageMetaData messageMetaData)
super(messageMetaData);
public abstract void onMessage(T t);
@Override
public void onMessage(Message message, Channel channel) throws Exception
doExcute(message,channel,object->
onMessage((T)object);
);
public abstract class BatchDynamicMessageListener<T> extends DynamicMessageListener<T>
public BatchDynamicMessageListener(MessageMetaData messageMetaData)
super(messageMetaData);
@Override
public void onMessage(Message message, Channel channel) throws Exception
MessageConverter messageConverter=getMessageConverter();
doExcute(message,channel,(obj)->
if (message instanceof Collection)
onMessageBatch((Collection<T>) obj);
else
ArrayList<T> list = new ArrayList<>();
list.add((T)message);
onMessageBatch(list);
);
/**
* 批量消息
* @param collection
*/
abstract void onMessageBatch(Collection<T> collection);
申明动态容器监听器
@RequiredArgsConstructor
public class DynamicMessageListenerContainer extends SimpleRabbitListenerContainerFactory
private final int DEFAULT_PREFETCH_COUNT = 1;
private final ConnectionFactory connectionFactory;
/**
* 后缀
*/
public static final String LISTENER_CONTAINER_SUFFIX = "SimpleMessageListenerContainer";
private int prefetchCount = DEFAULT_PREFETCH_COUNT;
public SimpleMessageListenerContainer createListenerContainer(DynamicMessageListener dynamicMessageListener, MessageMetaData messageMetaData)
SimpleMessageListenerContainer simpleMessageListenerContainer=new SimpleMessageListenerContainer();
simpleMessageListenerContainer.setQueueNames(messageMetaData.getQueue());
simpleMessageListenerContainer.setMessageListener(dynamicMessageListener);
simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
simpleMessageListenerContainer.setPrefetchCount(prefetchCount);
//设置当前消息为手动签收
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return simpleMessageListenerContainer;
public void以上是关于自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展的主要内容,如果未能解决你的问题,请参考以下文章
自定义springboot组件--基于nacos和spring-cloud-loadbalancer实现灰度发布
uniapp在自定义模板中引入js之后在组件里怎么使用里面的方法
SpringBoot自定义注解+异步+观察者模式实现业务日志保存