Spring cloud SQS - 轮询间隔

Posted

技术标签:

【中文标题】Spring cloud SQS - 轮询间隔【英文标题】:Spring cloud SQS - Polling interval 【发布时间】:2016-06-21 23:31:47 【问题描述】:

监听一个AWS SQS队列,使用spring cloud如下:

@SqsListener(value = "$queue.name", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders) 
    // code

弹簧配置:

<aws-messaging:annotation-driven-queue-listener
    max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
    amazon-sqs="awsSqsClient" />

AwsSqsClient:

@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient() 
    ExecutorService executorService = Executors.newFixedThreadPool(10);
    return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);

这很好用。

在 SQS 客户端中配置了 10 个线程来处理这些消息,如您在上面的代码中所见。这也可以正常工作,在任何时间点最多处理 10 条消息。

问题是,我想不出控制轮询间隔的方法。默认情况下,一旦所有线程都空闲,就会进行弹簧轮询。

即考虑下面的例子

    大约 3 条消息被传递到队列 Spring 轮询队列并获得 3 条消息 正在处理 3 条消息,每条消息大约需要 20 分钟

与此同时,大约有 25 条消息被传递到队列中。在之前传递的所有 3 条消息都完成之前,Spring 不会轮询队列。基本上按照上面的示例,Spring 仅在 20 分钟后进行民意调查,尽管仍有 7 个线程空闲!!

知道如何控制这个轮询吗?即,如果有任何线程空闲,则应该开始轮询,并且不应该等到所有线程都空闲

【问题讨论】:

您可以使用@SqsListener 吗?您是从源代码构建的吗? 我也面临同样的情况来控制轮询间隔。你找到解决办法了吗? 我们找不到任何选项,因此退出了春季并开始直接使用 AWS SDK 进行队列轮询。在根据可用线程数使用 AWS 开发工具包进行轮询时,我们会限制消息数 【参考方案1】:

您的侦听器可以将消息加载到您的 Spring 应用程序中,并将它们与 AcknowledgementVisibility 对象一起提交到另一个线程池(如果您想同时控制这两个对象)。

一旦将消息提交到此线程池,您的侦听器就可以加载更多数据。您可以通过调整线程池设置来控制并发。

您的侦听器的方法签名将类似于以下之一:

@SqsListener(value = "$queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
                   @Headers Map<String, Object> headers,
                   Acknowledgment acknowledgment,
                   Visibility visibility) throws Exception 
...... Send pojo to worker thread and return

工作线程会确认处理成功

acknowledgment.acknowledge().get();

确保您的 message visibility 设置的值大于您的最高处理时间(使用一些超时来限制执行时间)。

【讨论】:

以上是关于Spring cloud SQS - 轮询间隔的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Amazon SQS Spring 云注释 @SqsListener 轮询特定数量的消息

SimpleMessageListenerContainer Amazon SQS Pollinterval

SQS 短轮询是不是比长轮询更可取?

使用 Node 轮询 Amazon SQS 队列的最有效方法

是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?

如何使用 celery worker 从 SQS 轮询消息,消息为 JSON 格式,并且 worker 无法解码该格式