spring cloud aws 多个 sqs 监听器
Posted
技术标签:
【中文标题】spring cloud aws 多个 sqs 监听器【英文标题】:spring cloud aws multiple sqs listener 【发布时间】:2019-08-05 09:30:37 【问题描述】:我的项目中有 2 个 sqs 监听器。我希望其中一个具有相同的设置,而其中一个具有不同的设置。我要更改的唯一值是 maxNumberOfMessages。
最实用的方法是什么?我想为其中一个侦听器设置不同的 maxNumberOfMessages 值。
这是我的配置;
@Bean
public AWSCredentialsProvider awsCredentialsProvider(@Value("$cloud.aws.profile") String profile,
@Value("$cloud.aws.region.static") String region,
@Value("$cloud.aws.roleArn") String role,
@Value("$cloud.aws.user") String user)
...
return new AWSStaticCredentialsProvider(sessionCredentials);
@Bean
@Primary
@Qualifier("amazonSQSAsync")
public AmazonSQSAsync amazonSQSAsync(@Value("$cloud.aws.region.static") String region, AWSCredentialsProvider awsCredentialsProvider)
return AmazonSQSAsyncClientBuilder.standard()
.withCredentials(awsCredentialsProvider)
.withRegion(region)
.build();
@Bean
@Primary
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs)
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setMaxNumberOfMessages(1);
factory.setWaitTimeOut(10);
factory.setQueueMessageHandler(new SqsQueueMessageHandler());
return factory;
这是监听器;
@SqsListener(value = "$messaging.queue.blabla.source", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(Message message, Acknowledgment acknowledgment, @Header("MessageId") String messageId)
log.info("Message Received");
try
....
acknowledgment.acknowledge().get();
catch (InterruptedException e)
e.printStackTrace();
catch (ExecutionException e)
e.printStackTrace();
catch (Exception ex)
throw new RuntimeException(ex.getMessage());
【问题讨论】:
【参考方案1】:不幸的是,Sushant 的解决方案没有在 Kotlin 中为我编译(因为 QueueAttributes 是静态保护类),但我用它编写了以下内容:
@Bean
fun simpleMessageListenerContainerFactory(sqs: AmazonSQSAsync): SimpleMessageListenerContainerFactory =
object : SimpleMessageListenerContainerFactory()
override fun createSimpleMessageListenerContainer(): SimpleMessageListenerContainer
val container = object : SimpleMessageListenerContainer()
override fun afterPropertiesSet()
super.afterPropertiesSet()
registeredQueues.forEach (queue, attributes) ->
if (queue.contains(QUEUE_NAME))
FieldUtils.writeField(
attributes,
"maxNumberOfMessages",
NEW_MAX_NUMBER_OF_MESSAGES,
true
)
container.setWaitTimeOut(waitTimeOut)
container.setMaxNumberOfMessages(maxNumberOfMessages)
container.setAmazonSqs(sqs)
return container
【讨论】:
【参考方案2】:以下 hack 对我有用(如果每个听众都听不同的队列)
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs)
return new SimpleMessageListenerContainerFactory()
@Override
public SimpleMessageListenerContainer createSimpleMessageListenerContainer()
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer()
@Override
protected void startQueue(String queueName, QueueAttributes queueAttributes)
// A place to configure queue based maxNumberOfMessages
try
if (queueName.endsWith(".fifo"))
FieldUtils.writeField(queueAttributes, "maxNumberOfMessages", 1, true);
catch (IllegalAccessException e)
throw new RuntimeException(e);
super.startQueue(queueName, queueAttributes);
;
simpleMessageListenerContainer.setAmazonSqs(amazonSqs);
return simpleMessageListenerContainer;
;
【讨论】:
这对我来说无法编译,因为 QueueAttributes 是受保护的静态类。 这对我来说很好【参考方案3】:我找到了解决方案并在 github 上的示例 repo 上分享。 github link
如果我在侦听器类上添加@EnableAsync 注释并将@Async 注释添加到处理程序方法,我的问题正在解决:)
【讨论】:
您能否进一步解释一下您是如何解决它的,从 github 链接中不清楚您如何在侦听器上配置不同的最大消息。 这不是为两个不同队列提供不同配置的解决方案以上是关于spring cloud aws 多个 sqs 监听器的主要内容,如果未能解决你的问题,请参考以下文章
Spring Boot、Spring Cloud AWS 和 AWS SQS 未从队列中读取
如何修改 Spring Cloud AWS 用来反序列化 SQS 消息的对象映射器?
Spring Cloud - SQS - 此wsdl版本的指定队列不存在