spring cloud aws 多个 sqs 监听器

Posted

技术标签:

【中文标题】spring cloud aws 多个 sqs 监听器【英文标题】:spring cloud aws multiple sqs listener 【发布时间】:2019-08-05 09:30:37 【问题描述】:

我的项目中有 2 个 sqs 监听器。我希望其中一个具有相同的设置,而其中一个具有不同的设置。我要更改的唯一值是 maxNumberOfMessages。

最实用的方法是什么?我想为其中一个侦听器设置不同的 maxNumberOfMessages 值。

这是我的配置;

@Bean
public AWSCredentialsProvider awsCredentialsProvider(@Value("$cloud.aws.profile") String profile,
                                                     @Value("$cloud.aws.region.static") String region,
                                                     @Value("$cloud.aws.roleArn") String role,
                                                     @Value("$cloud.aws.user") String user) 
    ...

    return new AWSStaticCredentialsProvider(sessionCredentials);


@Bean
@Primary
@Qualifier("amazonSQSAsync")
public AmazonSQSAsync amazonSQSAsync(@Value("$cloud.aws.region.static") String region, AWSCredentialsProvider awsCredentialsProvider) 
    return AmazonSQSAsyncClientBuilder.standard()
            .withCredentials(awsCredentialsProvider)
            .withRegion(region)
            .build();


@Bean
@Primary
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) 
    SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
    factory.setAmazonSqs(amazonSqs);
    factory.setMaxNumberOfMessages(1);
    factory.setWaitTimeOut(10);
    factory.setQueueMessageHandler(new SqsQueueMessageHandler());
    return factory;

这是监听器;

@SqsListener(value = "$messaging.queue.blabla.source", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(Message message, Acknowledgment acknowledgment, @Header("MessageId") String messageId) 
    log.info("Message Received");

    try 
        ....
        acknowledgment.acknowledge().get();
     catch (InterruptedException e) 
        e.printStackTrace();
     catch (ExecutionException e) 
        e.printStackTrace();
     catch (Exception ex) 
        throw new RuntimeException(ex.getMessage());
    

【问题讨论】:

【参考方案1】:

不幸的是,Sushant 的解决方案没有在 Kotlin 中为我编译(因为 QueueAttributes 是静态保护类),但我用它编写了以下内容:

    @Bean
fun simpleMessageListenerContainerFactory(sqs: AmazonSQSAsync): SimpleMessageListenerContainerFactory =
    object : SimpleMessageListenerContainerFactory() 
        override fun createSimpleMessageListenerContainer(): SimpleMessageListenerContainer 
            val container = object : SimpleMessageListenerContainer() 
                override fun afterPropertiesSet() 
                    super.afterPropertiesSet()
                    registeredQueues.forEach  (queue, attributes) ->
                        if (queue.contains(QUEUE_NAME)) 
                            FieldUtils.writeField(
                                attributes,
                                "maxNumberOfMessages",
                                NEW_MAX_NUMBER_OF_MESSAGES,
                                true
                            )
                        
                    
                
            

            container.setWaitTimeOut(waitTimeOut)
            container.setMaxNumberOfMessages(maxNumberOfMessages)
            container.setAmazonSqs(sqs)
            return container
        
    

【讨论】:

【参考方案2】:

以下 hack 对我有用(如果每个听众都听不同的队列)

@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) 

    return new SimpleMessageListenerContainerFactory() 
        @Override
        public SimpleMessageListenerContainer createSimpleMessageListenerContainer() 
            SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer() 
                @Override
                protected void startQueue(String queueName, QueueAttributes queueAttributes) 
                    
                    // A place to configure queue based maxNumberOfMessages
                    
                    try 
                        if (queueName.endsWith(".fifo")) 
                            FieldUtils.writeField(queueAttributes, "maxNumberOfMessages", 1, true);
                        
                     catch (IllegalAccessException e) 
                        throw new RuntimeException(e);
                    
                    super.startQueue(queueName, queueAttributes);
                
            ;
            simpleMessageListenerContainer.setAmazonSqs(amazonSqs);
            return simpleMessageListenerContainer;
        
    ;

【讨论】:

这对我来说无法编译,因为 QueueAttributes 是受保护的静态类。 这对我来说很好【参考方案3】:

我找到了解决方案并在 github 上的示例 repo 上分享。 github link

如果我在侦听器类上添加@EnableAsync 注释并将@Async 注释添加到处理程序方法,我的问题正在解决:)

【讨论】:

您能否进一步解释一下您是如何解决它的,从 github 链接中不清楚您如何在侦听器上配置不同的最大消息。 这不是为两个不同队列提供不同配置的解决方案

以上是关于spring cloud aws 多个 sqs 监听器的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot、Spring Cloud AWS 和 AWS SQS 未从队列中读取

如何修改 Spring Cloud AWS 用来反序列化 SQS 消息的对象映射器?

Spring Cloud - SQS - 此wsdl版本的指定队列不存在

防止 spring-cloud-aws-messaging 尝试停止队列

问题测试spring cloud SQS Listener

AWS 实例配置文件不适用于 Spring Cloud AWS