在使用 JMS 消息之前保持 spring 上下文处于活动状态

Posted

技术标签:

【中文标题】在使用 JMS 消息之前保持 spring 上下文处于活动状态【英文标题】:Keep spring context alive until JMS messages are consumed 【发布时间】:2018-08-23 07:25:36 【问题描述】:

我有一个与 JMS - Spring BootActiveMQ 相关的非常标准的设置。它工作正常,直到我尝试做一个简单的集成测试。经过一番调查,我发现在消费第一条 JMS 消息后,Spring 上下文和嵌入式代理都关闭了,无论在消费期间,都会触发另一个事件。我能够通过在测试设置中添加useShutdownHook=false 连接选项来解决代理问题,即

spring.activemq.broker-url = vm://broker?async=false&broker.persistent=false&broker.useShutdownHook=false

我正在寻找的基本上是一种强制测试“保持活动状态”直到所有 JMS 消息都被消耗的方法(在这种情况下它们只有两条)。我了解整个设置的异步性质,但在测试期间,获取这些消息的所有生成和使用结果会很有帮助。

下面是我的设置,虽然很简单。

@EnableJms
public class ActiveMqConfig 

    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) 
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setMessageConverter(messageConverter);
        return jmsTemplate;
    

    @Bean
    public MessageConverter messageConverter() 
        MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
        messageConverter.setTargetType(MessageType.TEXT);
        messageConverter.setTypeIdPropertyName("_type");
        return messageConverter;
    

然后我有一个消息驱动的 POJO 来监听给定的事件:

@JmsListener(destination = "events")
public void applicationSubmitted(MyType event) 
    // do some work with the event here

    jmsTemplate.convertAndSend("commands", mymessage);

还有一个:

@JmsListener(destination = "commands")
public void onCommand(TextMessage textMessage) 


我尝试过并且有效的一件事是在发送消息后添加延迟,即sleep(200)。但是,这是非常不可靠的,并且还会减慢测试速度,因为执行时间可能少于 50 毫秒。下面是测试本身。除非等待未注释,否则我永远不会到达第二个事件侦听器,因为应用程序上下文关闭,测试结束并且消息被“遗忘”。

@SpringBootTest
class MyEventIntegrationTest extends Specification 

    @Autowired
    JmsTemplate jmsTemplate

    def "My event is successfully handled"() 

        given:
        def event = new MyEvent()

        when:
        jmsTemplate.convertAndSend("events", event)
        // sleep(200)

        then:
        1 == 1
    

【问题讨论】:

你到底想用你的测试来测试什么?您是否正在尝试检查消息是否已收到? 间接地,即我想测试出现的消息序列是否会导致我期望的结果(保存在数据库中的东西)。 【参考方案1】:

嗯,这是测试基于异步消息交换的系统时的标准问题。通常,它会在您跳过的测试部分解决 - then 部分。

问题是,在您的测试中,您通常希望系统做一些有用的事情,例如在数据库中进行更改,向另一个系统发送一个休息调用,在另一个队列中发送一条消息等。我们可以通过不断检查结果等待一段时间直到它发生 - 如果结果在我们的时间窗口内实现已设置 - 那么我们可以假设测试已通过。

这种方法的伪代码如下:

for (i to MAX_RETRIES; i++) 
   checkThatTheChangesInDBHasBeenMade();
   checkThatTheRestCallHasBeenMade();
   checkThatTheMessageIsPostedInAnotherQueue();

   Thread.sleep(50ms);

这样,在最好的情况下,您的测试将在 50 毫秒内通过。在最坏的情况下,它会失败,这将需要 MAX_RETRIES * 50ms 时间来执行测试。

另外,我应该提一下,有一个很好的工具叫做awaitility,它提供了很好的 API(顺便说一句,它支持 groovy DSL)来处理异步世界中的此类问题:

await().atMost(5, SECONDS).until(customerStatusIsUpdated());

【讨论】:

【参考方案2】:

我认为您的问题的根源是异步事件处理。发送事件后,您的测试就结束了。这当然会导致 Spring 上下文和代理关闭。 JMS 侦听器正在另一个线程中运行。你必须想办法等待他们。否则,您的线程(即您的测试用例)就完成了。

我们在上一个项目中遇到了类似的问题,并编写了一个对我们帮助很大的小实用程序。 JMS 提供“浏览”队列并查看它是否为空的能力:

public final class JmsUtil 

    private static final int MAX_TRIES = 5000;
    private final JmsTemplate jmsTemplate;

    public JmsUtil(JmsTemplate jmsTemplate) 
        this.jmsTemplate = jmsTemplate;
    

    private int getMessageCount(String queueName) 
        return jmsTemplate.browseSelected(queueName, "true = true", (s, qb) -> Collections.list(qb.getEnumeration()).size());
    

    public void waitForAll(String queueName) 
        int i = 0;
        while (i <= MAX_TRIES) 
            if (getMessageCount(queueName) == 0) 
                return;
            
            i++;
        

使用此实用程序,您可以执行以下操作:

def "My event is successfully handled"() 

        given:
        def event = new MyEvent()

        when:
        jmsTemplate.convertAndSend("events", event)
        jmsUtility.waitForAll("events"); // wait until the event has been consumed
        jmsUtility.waitForAll("commands"); // wait until the command has been consumed

        then:
        1 == 1
    

注意:此实用程序假定您将 JMS 消息发送到队列。通过浏览队列,我们​​可以检查它是否为空。如果是某个主题,您可能需要进行另一次检查。所以要注意这一点!

【讨论】:

以上是关于在使用 JMS 消息之前保持 spring 上下文处于活动状态的主要内容,如果未能解决你的问题,请参考以下文章

Spring整合JMS

如何在spring集成消息中设置JMS Header

Spring整合Weblogic jms实战

Spring使用Spring和AMQP发送接收消息(上)

如何在春季集成中为每个出站 jms 消息设置优先级?

Spring JMS 监听器中的事务管理