我们如何使用@RabbitListener 连接到消息处理之前/之后

Posted

技术标签:

【中文标题】我们如何使用@RabbitListener 连接到消息处理之前/之后【英文标题】:How do we hook into before/After message processing using @RabbitListener 【发布时间】:2016-04-03 11:48:04 【问题描述】:

问题:我正在从 MessageListener 接口 impl 迁移到 @RabbitListener。我有这样的逻辑,我在由多个类继承的 MessageListener 上进行“pre”和“post”消息处理

示例:

public AbstractMessageListener implements MessageListener 

     @Override
     public void onMessage(Message message) 

          //do some pre message processing

          process(Message message);

          // do some post message processing
     

     protected abstract void process(Message message);


问题: 有没有一种方法可以使用@RabbitListener 注释实现类似的东西,我可以继承前/后消息处理逻辑,而无需重新实现或调用内部的前/后消息处理每个子 @RabbitListener 注释并一直为子 @RabbitListener 维护可自定义的方法签名?还是这太贪心了?

期望结果示例:

public class SomeRabbitListenerClass 

    @RabbitListener( id = "listener.mypojo",queues = "$rabbitmq.some.queue")
   public void listen(@Valid MyPojo myPojo) 
      //...
   


public class SomeOtherRabbitListenerClass 

    @RabbitListener(id = "listener.orders",queues ="$rabbitmq.some.other.queue")
   public void listen(Order order, @Header("order_type") String orderType) 
      //...
   

这两个@RabbitListener(s) 使用相同的继承前/后消息处理

我看到@RabbitListener 注释中有一个“containerFactory”参数,但我已经在配置中声明了一个参数......而且我真的很确定如何使用自定义 containerFactory 实现我想要的继承。


更新答案:这就是我最终要做的。

建议定义:

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;

/**
 * AOP Around advice wrapper. Every time a message comes in we can do 
 * pre/post processing by using this advice by implementing the before/after methods.
 * @author sjacobs
 *
 */
public class RabbitListenerAroundAdvice implements MethodInterceptor 

    /**
     * place the "AroundAdvice" around each new message being processed.
     */
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable 

        Message message = (Message) invocation.getArguments()[1];

        before(message)
        Object result = invocation.proceed();
        after(message);

        return  result;
    

声明 bean: 在您的 rabbitmq 配置中将通知声明为 Spring bean 并将其传递给 rabbitListenerContainerFactory#setAdviceChain(...)

//...

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() 
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory( cachingConnectionFactory() );
        factory.setTaskExecutor(threadPoolTaskExecutor());
        factory.setMessageConverter(jackson2JsonMessageConverter());   

        factory.setAdviceChain(rabbitListenerAroundAdvice());

        return factory;
    

    @Bean
    public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() 
        return new RabbitListenerAroundAdvice();
    

// ...

【问题讨论】:

【参考方案1】:

更正

您可以使用SimpleRabbitListenerContainerFactory 中的建议链将环绕建议应用于为@RabbitListener 创建的侦听器;这两个参数是ChannelMessage

如果只需要在调用监听器之前采取行动,可以将MessagePostProcessor(s)添加到容器afterReceivePostProcessors中。

【讨论】:

我更正了我的答案;我忘了我们在容器工厂上暴露了adviceChain 属性。 我明白了,我们可以通过扩展AbstractRabbitListenerContainerFactoryConfigurer来配置容器,如果是这样,是不是最好把Simple/DirectRabbitListenerContainerFactoryConfigurer设置成非final,这样我们就可以扩展了? 不要在 cmets 中针对旧答案提出新问题;开始一个新问题。为什么需要扩展配置器? 正确,完全不需要。我们可以简单地重新定义容器监听 bean 或者使用brave tracing 方式。 gist.github.com/mhewedy/7de5d3aad0b7a06b1353b9ccbc1a54f5【参考方案2】:

这里不可能继承,因为 POJO 方法上的注释处理和 MessageListener 实现是完全不同的故事。

使用MessageListener,您可以完全控制目标行为和container

使用注解,您只需处理 POJO、无框架代码。特定的MessageListener 是在后台创建的。而那个完全基于带注释的方法。

我想说我们可以使用 Spring AOP 框架实现您的要求。

查看最近的问题及其对此事的回答:How to write an integration test for @RabbitListener annotation?

【讨论】:

以上是关于我们如何使用@RabbitListener 连接到消息处理之前/之后的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ笔记十三:使用@RabbitListener注解消费消息

我们如何使用 Azure AD 使用 DB Visualizer 连接到雪花

如何使用 sapnwrfc 库连接到 SAP?

如何使用 ngrok 连接到 Visual FoxPro 数据库

如何统一使用光子网络连接到我们自己的专用服务器?(自托管)

如何使用 Java 8 连接到 Sybase Advantage Local DB