Spring cloud SQS - 轮询间隔
Posted
技术标签:
【中文标题】Spring cloud SQS - 轮询间隔【英文标题】:Spring cloud SQS - Polling interval 【发布时间】:2016-06-21 23:31:47 【问题描述】:监听一个AWS SQS队列,使用spring cloud如下:
@SqsListener(value = "$queue.name", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void queueListener(String message, @Headers Map<String, Object> sqsHeaders)
// code
弹簧配置:
<aws-messaging:annotation-driven-queue-listener
max-number-of-messages="10" wait-time-out="20" visibility-timeout="3600"
amazon-sqs="awsSqsClient" />
AwsSqsClient:
@Bean
public com.amazonaws.services.sqs.AmazonSQSAsyncClient awsSqsClient()
ExecutorService executorService = Executors.newFixedThreadPool(10);
return new AmazonSQSAsyncClient(new DefaultAWSCredentialsProviderChain(), executorService);
这很好用。
在 SQS 客户端中配置了 10 个线程来处理这些消息,如您在上面的代码中所见。这也可以正常工作,在任何时间点最多处理 10 条消息。
问题是,我想不出控制轮询间隔的方法。默认情况下,一旦所有线程都空闲,就会进行弹簧轮询。
即考虑下面的例子
-
大约 3 条消息被传递到队列
Spring 轮询队列并获得 3 条消息
正在处理 3 条消息,每条消息大约需要 20 分钟
与此同时,大约有 25 条消息被传递到队列中。在之前传递的所有 3 条消息都完成之前,Spring 不会轮询队列。基本上按照上面的示例,Spring 仅在 20 分钟后进行民意调查,尽管仍有 7 个线程空闲!!
知道如何控制这个轮询吗?即,如果有任何线程空闲,则应该开始轮询,并且不应该等到所有线程都空闲
【问题讨论】:
您可以使用@SqsListener 吗?您是从源代码构建的吗? 我也面临同样的情况来控制轮询间隔。你找到解决办法了吗? 我们找不到任何选项,因此退出了春季并开始直接使用 AWS SDK 进行队列轮询。在根据可用线程数使用 AWS 开发工具包进行轮询时,我们会限制消息数 【参考方案1】:您的侦听器可以将消息加载到您的 Spring 应用程序中,并将它们与 Acknowledgement
和 Visibility
对象一起提交到另一个线程池(如果您想同时控制这两个对象)。
一旦将消息提交到此线程池,您的侦听器就可以加载更多数据。您可以通过调整线程池设置来控制并发。
您的侦听器的方法签名将类似于以下之一:
@SqsListener(value = "$queueName", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(YourCustomPOJO pojo,
@Headers Map<String, Object> headers,
Acknowledgment acknowledgment,
Visibility visibility) throws Exception
...... Send pojo to worker thread and return
工作线程会确认处理成功
acknowledgment.acknowledge().get();
确保您的 message visibility 设置的值大于您的最高处理时间(使用一些超时来限制执行时间)。
【讨论】:
以上是关于Spring cloud SQS - 轮询间隔的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Amazon SQS Spring 云注释 @SqsListener 轮询特定数量的消息
SimpleMessageListenerContainer Amazon SQS Pollinterval
使用 Node 轮询 Amazon SQS 队列的最有效方法