问题测试spring cloud SQS Listener

Posted

技术标签:

【中文标题】问题测试spring cloud SQS Listener【英文标题】:Issue testing spring cloud SQS Listener 【发布时间】:2018-11-11 00:10:23 【问题描述】:

环境

Spring Boot:1.5.13.RELEASE 云:Edgware.SR3 云 AWS:1.2.2.RELEASE Java 8 OSX 10.13.4

问题

我正在尝试为 SQS 编写集成测试。

我有一个本地运行的localstack docker 容器,SQS 在TCP/4576 上运行

在我的测试代码中,我定义了一个 SQS 客户端,其端点设置为本地 4576,并且可以成功连接并创建队列、发送消息和删除队列。我还可以使用 SQS 客户端接收消息并提取我发送的消息。

我的问题是,如果我删除手动接收消息的代码以允许另一个组件获取消息,似乎什么都没有发生。我有一个 spring 组件注释如下:

监听器

@Component
public class MyListener 
@SqsListener(value = "my_queue", deletionPolicy = ON_SUCCESS)
    public void receive(final MyMsg msg) 
        System.out.println("GOT THE MESSAGE: "+ msg.toString());
    

测试

@RunWith(SpringRunner.class)
@SpringBootTest(properties = "spring.profiles.active=test")
public class MyTest 

    @Autowired
    private AmazonSQSAsync amazonSQS;

    @Autowired
    private SimpleMessageListenerContainer container;

    private String queueUrl;

    @Before
    public void setUp() 
        queueUrl = amazonSQS.createQueue("my_queue").getQueueUrl();
    

    @After
    public void tearDown() 
        amazonSQS.deleteQueue(queueUrl);
    

    @Test
    public void name() throws InterruptedException 
        amazonSQS.sendMessage(new SendMessageRequest(queueUrl, "hello"));
        System.out.println("isRunning:" + container.isRunning());
        System.out.println("isActive:" + container.isActive());
        System.out.println("isRunningOnQueue:" + container.isRunning("my_queue"));
        Thread.sleep(30_000);
        System.out.println("GOT MESSAGE: " + amazonSQS.receiveMessage(queueUrl).getMessages().size());
    

    @TestConfiguration
    @EnableSqs
    public static class SQSConfiguration 

        @Primary
        @Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() 
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://127.0.0.1:4576", "eu-west-1");
            return new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("key", "secret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
        
    

在我看到的测试日志中:

o.s.c.a.m.listener.QueueMessageHandler :在 MyListener 类上找到 1 个消息处理程序方法:public void MyListener.receive(MyMsg)=org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a 2018-05-31 22:50:39.582 信息 16329 ---

o.s.c.a.m.listener.QueueMessageHandler :将“org.springframework.cloud.aws.messaging.listener.QueueMessageHandler$MappingInformation@1cd4082a”映射到公共 void MyListener.receive(MyMsg)

接着是:

isRunning:true

isActive:true

isRunningOnQueue:false

收到消息:1

这表明,在发送消息之间的 30 秒暂停中,容器没有接收到它,而当我手动轮询消息时它在队列中并且我可以使用它。

我的问题是,为什么不调用侦听器,为什么 isRunningOnQueue:false 行暗示它不是为该队列自动启动的?

请注意,我还尝试设置我自己的 SimpleMessageListenerContainer bean,并将 autostart 显式设置为 true(无论如何都是默认值)并且观察到行为没有变化。我认为由@EnableSqs 设置的org.springframework.cloud.aws.messaging.config.annotation.SqsConfiguration#simpleMessageListenerContainer 应该配置一个自动启动的SimpleMessageListenerContainer,它应该为我轮询消息。

我也设置了

logging.level.org.apache.http=DEBUG
logging.level.org.springframework.cloud=DEBUG

在我的测试属性中,可以看到 HTTP 调用创建队列、发送消息和删除等,但没有接收 HTTP 调用(除了我在测试结束时的手动调用)。

【问题讨论】:

【参考方案1】:

经过一番修改,我发现了这一点。

即使简单的消息容器工厂设置为不自动启动,它似乎无论如何都会进行初始化,这涉及到确定队列是否存在。

在这种情况下,队列是在我的测试中的 setup 方法中创建的 - 但遗憾的是这是在设置 spring 上下文之后,这意味着发生了异常。

我通过简单地将队列创建移动到 SQS 客户端的上下文创建来解决此问题(这发生在创建消息容器之前)。即:

@Bean(destroyMethod = "shutdown")
        public AmazonSQSAsync amazonSQS() 
            final AwsClientBuilder.EndpointConfiguration endpoint = new AwsClientBuilder.EndpointConfiguration("http://localhost:4576", "eu-west-1");
            final AmazonSQSBufferedAsyncClient client = new AmazonSQSBufferedAsyncClient(AmazonSQSAsyncClientBuilder
                    .standard()
                    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("dummyKey", "dummySecret")))
                    .withEndpointConfiguration(endpoint)
                    .build());
            client.createQueue("test-queue");
            return client;
        

【讨论】:

以上是关于问题测试spring cloud SQS Listener的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud - SQS

Spring Cloud - SQS - 此wsdl版本的指定队列不存在

Spring cloud SQS - 轮询间隔

spring cloud aws 多个 sqs 监听器

如何修改 Spring Cloud AWS 用来反序列化 SQS 消息的对象映射器?

Spring Boot、Spring Cloud AWS 和 AWS SQS 未从队列中读取