如何使用 spring 集成来调整 sqs 队列的消耗

Posted

技术标签:

【中文标题】如何使用 spring 集成来调整 sqs 队列的消耗【英文标题】:How to pace the consumption of a sqs queue using spring integration 【发布时间】:2018-04-01 15:16:32 【问题描述】:

我正在尝试设置一个集成流来使用来自亚马逊 sqs 队列的消息,并且到目前为止它工作正常。但我想调整每分钟或每秒的消息数量。例如每分钟 20 条消息。

这里是我的 sql 监听 bean 的定义

    @Bean
    public MessageProducer mySqsMessageDrivenChannelAdapter() 
        SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, queueName);
        adapter.setMessageDeletionPolicy(SqsMessageDeletionPolicy.ON_SUCCESS);

        adapter.setVisibilityTimeout(TIMEOUT_VISIBILITY);
        adapter.setWaitTimeOut(TIMEOUT_MESSAGE_WAIT);
        adapter.setMaxNumberOfMessages(prefetch);
        adapter.setOutputChannel(processMessageChannel());
        return adapter;
    

如您所见,我设置了每次轮询获取的最大消息数,但是如何设置轮询之间的延迟?

在常规 jms 队列中,我可以使用 JMS.inboundAdapter 使用自定义轮询器,但似乎使用 SqsMessageDrivenChannelAdapter 我无法设置任何轮询计时器值。

也许我可以使用除 SqsMessageDrivenChannelAdapter 之外的 MessageProducer,但使用哪一个?

是否可以使用 sqs 设置 JMS.inboundAdapter?

【问题讨论】:

它看到解决方案可以在这里找到***.com/questions/29667321/…。在这种情况下,这个问题可以被认为是重复的,但这里的重点是我正在使用弹簧集成。如果可行,我将尝试调整那里的解决方案,我将关闭此解决方案。 【参考方案1】:

Spring Integration SqsMessageDrivenChannelAdapter 是一个消息驱动程序活动组件。它基于 Springh Cloud AWS 项目中的 SimpleMessageListenerContainer,该项目具有长期运行的 while() 循环以调用 AmazonSQS.receiveMessage()。该循环中的逻辑并不太复杂:

try 
    ReceiveMessageResult receiveMessageResult = getAmazonSqs().receiveMessage(this.queueAttributes.getReceiveMessageRequest());
    CountDownLatch messageBatchLatch = new CountDownLatch(receiveMessageResult.getMessages().size());
    for (Message message : receiveMessageResult.getMessages()) 
        if (isQueueRunning()) 
            MessageExecutor messageExecutor = new MessageExecutor(this.logicalQueueName, message, this.queueAttributes);
                 getTaskExecutor().execute(new SignalExecutingRunnable(messageBatchLatch, messageExecutor));
         else 
           messageBatchLatch.countDown();
        
    
    try 
         messageBatchLatch.await();
     catch (InterruptedException e) 
         Thread.currentThread().interrupt();
    
 catch (Exception e) 

如您所见,我们在那里创建 messageBatchLatch 并在循环后等待它。 每条消息都由它们自己的SignalExecutingRunnable 处理,其中countDown()s 在MessageExecutor 的末尾。因此,您可以通过目标服务方法中的人工Thread.sleep() 来实现您想要做的事情,以便在 SQS 轮询之间有更多的间隔。

但我听到了您的要求,我们确实必须添加以下内容:

/**
 * The sleep interval in milliseconds used in the main loop between shards polling cycles.
 * Defaults to @code 1000 minimum @code 250.
 * @param idleBetweenPolls the interval to sleep between shards polling cycles.
 */
public void setIdleBetweenPolls(int idleBetweenPolls) 
    this.idleBetweenPolls = Math.max(250, idleBetweenPolls);

我为KinesisMessageDrivenChannelAdapter 执行此操作,但在这里我们必须请求 Spring Cloud AWS 为SimpleMessageListenerContainer 执行此操作。

【讨论】:

以上是关于如何使用 spring 集成来调整 sqs 队列的消耗的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud - SQS - 此wsdl版本的指定队列不存在

如何聚合 AWS SQS ApproximateNumberOfMessages

Spring Cloud - SQS

如何确保与 SQS 集成的 Lambda 调用的下游 API 在消息进入 DLQ 之前至少被调用 2 次?

Spring Boot、Spring Cloud AWS 和 AWS SQS 未从队列中读取

当 DLQ 尚不存在时,如何通过 Terraform 配置 SQS?