如何为@RabbitListener 注解编写集成测试?

Posted

技术标签:

【中文标题】如何为@RabbitListener 注解编写集成测试?【英文标题】:How to write an integration test for @RabbitListener annotation? 【发布时间】:2016-03-30 23:30:19 【问题描述】:

我的问题实际上是对

的后续问题

RabbitMQ Integration Test and Threading

它声明包装“你的听众”并传入一个 CountDownLatch ,最终所有线程都将合并。如果我们手动创建和注入消息侦听器但对于 @RabbitListener 注释,则此答案有效...我不确定如何传入 CountDownLatch。该框架在幕后自动神奇地创建消息侦听器。

还有其他方法吗?

【问题讨论】:

【参考方案1】:

在@Gary Russell 的帮助下,我得到了答案并使用了以下解决方案。

结论:我必须承认我对这个解决方案漠不关心(感觉就像是一个 hack)但这是我唯一可以开始工作的事情,一旦你完成了最初的一次性设置和其实理解'工作流程'并没有那么痛苦。基本上归结为定义 (2) @Beans 并将它们添加到您的集成测试配置中。

下面发布的示例解决方案并附有说明。请随时提出对此解决方案的改进建议。

1. 定义一个 ProxyListenerBPP,它在 spring 初始化期间将监听指定的 clazz(即我们的包含 @RabbitListener 的测试类)和 注入我们在下一步中定义的自定义 CountDownLatchListenerInterceptor 建议。

import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactoryBean;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.Ordered;
import org.springframework.core.PriorityOrdered;

/**
 * Implements BeanPostProcessor bean... during spring initialization we will
 * listen for a specified clazz 
 * (i.e our @RabbitListener annotated class) and 
 * inject our custom CountDownLatchListenerInterceptor advice
 * @author sjacobs
 *
 */
public class ProxyListenerBPP implements BeanPostProcessor, BeanFactoryAware, Ordered, PriorityOrdered

    private BeanFactory beanFactory;
    private Class<?> clazz;
    public static final String ADVICE_BEAN_NAME = "wasCalled";

    public ProxyListenerBPP(Class<?> clazz) 
        this.clazz = clazz;
    


    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException 
        this.beanFactory = beanFactory;
    

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException 
        return bean;
    

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException 

        if (clazz.isAssignableFrom(bean.getClass())) 
            ProxyFactoryBean pfb = new ProxyFactoryBean();
            pfb.setProxyTargetClass(true); // CGLIB, false for JDK proxy (interface needed)
            pfb.setTarget(bean);
            pfb.addAdvice(this.beanFactory.getBean(ADVICE_BEAN_NAME, Advice.class));
            return pfb.getObject();
        
        else 
            return bean;
        
    

    @Override
    public int getOrder() 
        return Ordered.LOWEST_PRECEDENCE - 1000; // Just before @RabbitListener post processor
    

2. 创建 MethodInterceptor 通知 impl,它将保存对 CountDownLatch 的引用。 CountDownLatch 需要在集成测试线程和@RabbitListener 中的异步工作线程中引用。因此,我们可以稍后在@RabbitListener 异步线程完成执行后立即释放回集成测试线程。无需轮询。

import java.util.concurrent.CountDownLatch;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;

/**
 * AOP MethodInterceptor that maps a <b>Single</b> CountDownLatch to one method and invokes 
 * CountDownLatch.countDown() after the method has completed execution. The motivation behind this 
 * is for integration testing purposes of Spring RabbitMq Async Worker threads to be able to merge
 * the Integration Test thread after an Async 'worker' thread completed its task. 
 * @author sjacobs
 *
 */
public class CountDownLatchListenerInterceptor implements MethodInterceptor 

    private CountDownLatch  countDownLatch =  new CountDownLatch(1);

    private final String methodNameToInvokeCDL ;

    public CountDownLatchListenerInterceptor(String methodName) 
        this.methodNameToInvokeCDL = methodName;
    

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable 
        String methodName = invocation.getMethod().getName();

        if (this.methodNameToInvokeCDL.equals(methodName) ) 

            //invoke async work 
            Object result = invocation.proceed();

            //returns us back to the 'awaiting' thread inside the integration test
            this.countDownLatch.countDown();

            //"reset" CountDownLatch for next @Test (if testing for more async worker)
            this.countDownLatch = new CountDownLatch(1);

            return result;
         else
            return invocation.proceed();
    


    public CountDownLatch getCountDownLatch() 
        return countDownLatch;
    

3.接下来将以下@Bean(s)添加到您的集成测试配置中

public class SomeClassThatHasRabbitListenerAnnotationsITConfig extends BaseIntegrationTestConfig 

    // pass into the constructor the test Clazz that contains the @RabbitListener annotation into the constructor
    @Bean
    public static ProxyListenerBPP listenerProxier()  // note static
        return new ProxyListenerBPP(SomeClassThatHasRabbitListenerAnnotations.class);
    

     // pass the method name that will be invoked by the async thread in SomeClassThatHasRabbitListenerAnnotations.Class
    // I.E the method name annotated with @RabbitListener or @RabbitHandler
    // in our example 'listen' is the method name inside SomeClassThatHasRabbitListenerAnnotations.Class
    @Bean(name=ProxyListenerBPP.ADVICE_BEAN_NAME)
    public static Advice wasCalled() 
        String methodName = "listen";  
        return new CountDownLatchListenerInterceptor( methodName );
    

    // this is the @RabbitListener bean we are testing
    @Bean
    public SomeClassThatHasRabbitListenerAnnotations rabbitListener() 
         return new SomeClassThatHasRabbitListenerAnnotations();
    


4. 最后在集成@Test调用...通过rabbitTemplate发送消息触发异步线程...现在调用CountDownLatch#await(...)方法得到来自拦截器并确保传入 TimeUnit 参数,以便在长时间运行的进程或出现问题的情况下超时。一旦异步集成测试线程被通知(唤醒),现在我们终于可以开始实际测试/验证/验证异步工作的结果。

@ContextConfiguration(classes= SomeClassThatHasRabbitListenerAnnotationsITConfig.class  )
public class SomeClassThatHasRabbitListenerAnnotationsIT extends BaseIntegrationTest

    @Inject 
    private CountDownLatchListenerInterceptor interceptor;

    @Inject
    private RabbitTemplate rabbitTemplate;

    @Test
    public void shouldReturnBackAfterAsyncThreadIsFinished() throws Exception 

     MyObject payload = new MyObject();
     rabbitTemplate.convertAndSend("some.defined.work.queue", payload);
        CountDownLatch cdl = interceptor.getCountDownLatch();      

        // wait for async thread to finish
        cdl.await(10, TimeUnit.SECONDS);    // IMPORTANT: set timeout args. 

        //Begin the actual testing of the results of the async work
        // check the database? 
        // download a msg from another queue? 
        // verify email was sent...
        // etc... 

【讨论】:

随时打开'new feature' JIRA Issue,以便我们在路线图上添加一些用于测试支持的钩子。【参考方案2】:

@RabbitListener 有点棘手,但最简单的方法是建议听众。

使用custom listener container factory 只需让您的测试用例将建议添加到工厂。

建议是MethodInterceptor;调用将有 2 个参数;频道和(未转换的)Message。必须在创建容器之前注入建议。

或者,使用registry 获取对容器的引用并稍后添加建议(但您必须调用initialize() 以强制应用新建议)。

另一种方法是使用简单的BeanPostProcessor 在将侦听器类注入容器之前代理它。这样,您将在 任何转换后看到方法参数;您还可以验证侦听器返回的任何结果(用于请求/回复场景)。

如果您不熟悉这些技术,我可以尝试找时间为您创建一个快速示例。

编辑

我发布了pull request 向EnableRabbitIntegrationTests 添加示例。这将添加一个具有 2 个带注释的侦听器方法的侦听器 bean,一个 BeanPostProcessor 在将侦听器 bean 注入侦听器容器之前代理它。一个Advice 被添加到代理中,当收到预期的消息时,该代理对锁存器进行计数。

【讨论】:

感谢您的快速响应。一个例子将不胜感激 Here you go. 感谢您的回答。就我个人而言,我不太喜欢将 AOP、反射 api 等引入业务逻辑,尤其是测试和集成测试。我希望测试的可解读性尽可能直观(KISS 原则)。是否有可能创建一个新注释“EnableRabbitCountDownLatch”的增强,它接受参数 int countDown 并创建一个 countDownLatch bean,以后可以注入到我们的测试中?我猜注释可以放在配置中,或者作为“EnableRabbit”的一部分,我不确定最好的位置。 所以在用 RabbitListener 注解的方法完成执行后会触发 countDown()。或者这个请求是否过于具体的用例?我真的很喜欢新的“RabbitListener”抽象,它使 messageListeners 的创建变得简单,但看起来它在集成测试期间需要付出代价 &gt;introducing AOP, reflection api, etc... into business logic。它不会触及您的业务逻辑,它是侦听器容器和侦听器之间的极轻量级垫片。第一次测试在通话前倒计时;之后的第二个,但你可以做任何你想做的事。我们可以考虑为框架添加测试支持,例如@RabbitIntegrationTest 但我们很可能会使用这种技术来实现它;我们不想用测试工件污染主线代码。当然,完整的集成测试将验证您的侦听器下游所做的一切。

以上是关于如何为@RabbitListener 注解编写集成测试?的主要内容,如果未能解决你的问题,请参考以下文章

如何为 xunit 集成测试编写 GraphQL 变异查询

RabbitMQ笔记十三:使用@RabbitListener注解消费消息

如何为具有spring存储库代码的@Component类编写集成测试?

Spring Boot中@RabbitListener注解使用RabbitMQ

如何为班级编写@Bean? [复制]

如何为 VS Express 版本编写 SourceControl 插件 [关闭]