SQSListener 不使用队列中的消息

Posted

技术标签:

【中文标题】SQSListener 不使用队列中的消息【英文标题】:SQSListener not consuming messages from queue 【发布时间】:2022-01-19 01:02:37 【问题描述】:

我看不到@SqsListener 正在使用的 SQS 队列中的消息

import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; //others

@Component
public class Consumer
  private static final Logger logger = LoggerFactory.getLogger(Consumer.class);

  @SqsListener(value = "TEST-MY-QUEUE")
  public void receiveMessage(String stringJson)  
    System.out.println("***Consuming message: " + stringJson);
    logger.info("Consuming message: " + stringJson);
  

我的配置(在这里我打印客户端队列,我可以确定我想要使用的队列 - TEST-MY-QUEUE 。它在区域中正确打印 URL。我还能够看到正确加载的区域(与队列相同)在 regionProvider 中

@Configuration

public class AwsConfiguration  

  @Bean
  @Primary
  AmazonSQSAsync sqsClient() 
    AmazonSQSAsync amazonSQSAsync = AmazonSQSAsyncClientBuilder.defaultClient();
    System.out.println("Client queues = " + amazonSQSAsync.listQueues()); //The queue I want to consume is here
    return amazonSQSAsync;
  

  @Bean
  AwsRegionProvider regionProvider() 
    DefaultAwsRegionProviderChain defaultAwsRegionProviderChain = new DefaultAwsRegionProviderChain();
    System.out.println("Region = " + defaultAwsRegionProviderChain.getRegion());
    return defaultAwsRegionProviderChain;
  

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(AmazonSQSAsync amazonSQSAsync, QueueMessageHandler queueMessageHandler) 
    
    SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
    simpleMessageListenerContainer.setAmazonSqs(amazonSQSAsync);
    simpleMessageListenerContainer.setMessageHandler(queueMessageHandler);
    simpleMessageListenerContainer.setMaxNumberOfMessages(10);
    simpleMessageListenerContainer.setTaskExecutor(threadPoolTaskExecutor());
    return simpleMessageListenerContainer;
  

  @Bean
  public QueueMessageHandler queueMessageHandler(AmazonSQSAsync amazonSQSAsync) 
    QueueMessageHandlerFactory queueMessageHandlerFactory = new QueueMessageHandlerFactory();
    queueMessageHandlerFactory.setAmazonSqs(amazonSQSAsync);
    QueueMessageHandler queueMessageHandler = queueMessageHandlerFactory.createQueueMessageHandler();
    return queueMessageHandler;
  

  public ThreadPoolTaskExecutor threadPoolTaskExecutor() 
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(10);
    executor.initialize();
    return executor;
  

和 pom.xml(Java 11、spring boot、spring cloud aws)


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.5.6</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws-core</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-aws-autoconfigure</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
            <version>3.0.3</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-aws-messaging</artifactId>
            <version>2.2.6.RELEASE</version>
        </dependency>

我在这里的问题中注意到非常相似的问题,我将 pom.xml 中的依赖项更改为 spring-cloud-starter-aws-messaging 但没有为我解决。我仔细检查了名称(队列,注释)一切似乎都很好 当我运行我的应用程序时,启动正常,但我没有看到任何日志或异常。没有一条消息被消费。 我错过了什么?

谢谢

【问题讨论】:

【参考方案1】:

您正在使用第三方 API。要从 Java 项目调用 Amazon Simple Queue Service (SQS),请使用官方 AWS SDK for Java V2。如果您不知道如何使用此 SDK,请参阅此开发指南:

Developer guide - AWS SDK for Java 2.x

有关 AWS SQS 的特定信息,请参阅:

Working with Amazon Simple Queue Service

这包含指向 AWS Github 的链接,您可以在其中找到 POM 依赖项、代码等。

【讨论】:

嘿,感谢您指向此文档。我可以看到它完全不同的方法(例如使用 ReceiveMessageRequest 类)。这是否意味着这个第三方 API 和 SQSListener 不再工作? 亚马逊推荐使用 AWS SDK for Java V2。 我明白了。只是我需要通过侦听器(当它们来时)使用这些消息。我找不到与指南相关的听众。使用消息的唯一编程方式...【参考方案2】:

最后是配置问题(使用凭据)

在 application.yml 中

credentials:
  useDefaultAwsCredentialsChain: true #Will use credentials in /.aws 

然后在您创建 AmazonSQSAsync 的 AWSConfig 类中,让它使用该配置

public AmazonSQSAsync amazonSQSAsync() 
  DefaultAWSCredentialsProviderChain defaultAWSCredentialsProviderChain = new DefaultAWSCredentialsProviderChain();
  return AmazonSQSAsyncClientBuilder.standard().withRegion(region)
            .withCredentials(defaultAWSCredentialsProviderChain)
            .build();

【讨论】:

以上是关于SQSListener 不使用队列中的消息的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Amazon SQS Spring 云注释 @SqsListener 轮询特定数量的消息

将 SqsListener 与 SNS 和 SQS 一起使用

php+redis简易消息队列

Redis中的Stream数据类型作为消息队列的尝试

Spring Cloud @SqsListener MessageConversionException:无法从 [java.lang.String] 转换为 GenericMessage

场景应用:Redis如何做消息队列?