我们如何使用@RabbitListener 连接到消息处理之前/之后
Posted
技术标签:
【中文标题】我们如何使用@RabbitListener 连接到消息处理之前/之后【英文标题】:How do we hook into before/After message processing using @RabbitListener 【发布时间】:2016-04-03 11:48:04 【问题描述】:问题:我正在从 MessageListener 接口 impl 迁移到 @RabbitListener。我有这样的逻辑,我在由多个类继承的 MessageListener 上进行“pre”和“post”消息处理
示例:
public AbstractMessageListener implements MessageListener
@Override
public void onMessage(Message message)
//do some pre message processing
process(Message message);
// do some post message processing
protected abstract void process(Message message);
问题: 有没有一种方法可以使用@RabbitListener 注释实现类似的东西,我可以继承前/后消息处理逻辑,而无需重新实现或调用内部的前/后消息处理每个子 @RabbitListener 注释并一直为子 @RabbitListener 维护可自定义的方法签名?还是这太贪心了?
期望结果示例:
public class SomeRabbitListenerClass
@RabbitListener( id = "listener.mypojo",queues = "$rabbitmq.some.queue")
public void listen(@Valid MyPojo myPojo)
//...
public class SomeOtherRabbitListenerClass
@RabbitListener(id = "listener.orders",queues ="$rabbitmq.some.other.queue")
public void listen(Order order, @Header("order_type") String orderType)
//...
这两个@RabbitListener(s) 使用相同的继承前/后消息处理
我看到@RabbitListener 注释中有一个“containerFactory”参数,但我已经在配置中声明了一个参数......而且我真的很确定如何使用自定义 containerFactory 实现我想要的继承。
更新答案:这就是我最终要做的。
建议定义:
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
/**
* AOP Around advice wrapper. Every time a message comes in we can do
* pre/post processing by using this advice by implementing the before/after methods.
* @author sjacobs
*
*/
public class RabbitListenerAroundAdvice implements MethodInterceptor
/**
* place the "AroundAdvice" around each new message being processed.
*/
@Override
public Object invoke(MethodInvocation invocation) throws Throwable
Message message = (Message) invocation.getArguments()[1];
before(message)
Object result = invocation.proceed();
after(message);
return result;
声明 bean: 在您的 rabbitmq 配置中将通知声明为 Spring bean 并将其传递给 rabbitListenerContainerFactory#setAdviceChain(...)
//...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory()
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory( cachingConnectionFactory() );
factory.setTaskExecutor(threadPoolTaskExecutor());
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setAdviceChain(rabbitListenerAroundAdvice());
return factory;
@Bean
public RabbitListenerAroundAdvice rabbitListenerAroundAdvice()
return new RabbitListenerAroundAdvice();
// ...
【问题讨论】:
【参考方案1】:更正
您可以使用SimpleRabbitListenerContainerFactory 中的建议链将环绕建议应用于为@RabbitListener
创建的侦听器;这两个参数是Channel
和Message
。
如果只需要在调用监听器之前采取行动,可以将MessagePostProcessor
(s)添加到容器afterReceivePostProcessors
中。
【讨论】:
我更正了我的答案;我忘了我们在容器工厂上暴露了adviceChain
属性。
我明白了,我们可以通过扩展AbstractRabbitListenerContainerFactoryConfigurer
来配置容器,如果是这样,是不是最好把Simple/DirectRabbitListenerContainerFactoryConfigurer
设置成非final,这样我们就可以扩展了?
不要在 cmets 中针对旧答案提出新问题;开始一个新问题。为什么需要扩展配置器?
正确,完全不需要。我们可以简单地重新定义容器监听 bean 或者使用brave tracing
方式。 gist.github.com/mhewedy/7de5d3aad0b7a06b1353b9ccbc1a54f5【参考方案2】:
这里不可能继承,因为 POJO 方法上的注释处理和 MessageListener
实现是完全不同的故事。
使用MessageListener
,您可以完全控制目标行为和container
。
使用注解,您只需处理 POJO、无框架代码。特定的MessageListener
是在后台创建的。而那个完全基于带注释的方法。
我想说我们可以使用 Spring AOP 框架实现您的要求。
查看最近的问题及其对此事的回答:How to write an integration test for @RabbitListener annotation?
【讨论】:
以上是关于我们如何使用@RabbitListener 连接到消息处理之前/之后的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQ笔记十三:使用@RabbitListener注解消费消息
我们如何使用 Azure AD 使用 DB Visualizer 连接到雪花