处理 SQS 项目队列的多线程方法
Posted
技术标签:
【中文标题】处理 SQS 项目队列的多线程方法【英文标题】:Multithreaded approach to process SQS item Queue 【发布时间】:2021-10-27 20:12:03 【问题描述】:在这个场景中,我必须从队列中轮询 AWS SQS 消息,每个异步请求最多可以获取 10 个 sqs 项目/消息。一旦我轮询项目,然后我必须在 kubernetes pod 上处理这些项目。项目处理包括从几个 API 调用中获取响应,这可能需要一些时间然后将项目保存到 DB 和 S3。 我做了一些研发并得出以下结论
-
要使用消费者生产者模型,1 个线程将轮询项目,另一个线程将处理项目或使用多线程处理项目
维护一个数据结构,其中包含准备好处理的 sqs 轮询项目,DS 可能是阻塞收集或并发队列
使用任务并行库进行线程池和项目处理。
可以使用频道
我的查询
-
实现最佳性能或提高 TPS 的最佳方法是什么。
我可以/应该使用数据流 TPL
多线程或单线程异步任务
【问题讨论】:
您预计每秒有多少条消息?你可以做各种魔术,但如果速率是每秒 1 条消息,你可以做出其他选择。 我们要达到500 TPS,我说错了,通过API调用处理需要几秒钟,但是API调用需要毫秒才能得到结果 需要按顺序处理吗? 您能否分享一些代码,展示您如何想象单线程处理? @pankysharma 您是否考虑过使用基于 SQS 队列的自动缩放? 【参考方案1】:这在很大程度上取决于您的用例的具体情况以及您希望投入多少精力。
不过,我将解释我在做出这样的决定时会使用的思考过程。
处理 SQS 消息的简单解决方案是按顺序一次处理一个(即没有并发)。这并不意味着您一次只能接收一条消息,因为您可以向集群添加更多 pod。
因此,即使在那个幼稚的解决方案中,您也有一个可以利用的并发点,但它有很多开销。减少开销的方法通常是利用相同的开销,但用它处理更多的消息。这就是为什么,例如,SQS 允许您在一次调用中获得 1-10 条消息,而不仅仅是一条。它将呼叫开销分散到 10 条消息中。在简单的解决方案中,开销是启动整个过程的成本。将进程用于更多消息意味着并发处理。
我发现,为了实现稳定和灵活的并发,您需要多个并发点,但每个点都以可配置的并行度为上限(无论是硬编码还是实际配置)。这样您就可以调整它们中的每一个以实现最佳输出(当您有空闲 CPU 和内存时增加,否则减少)。
那么,在哪里可以引入额外的并发呢?这是一个进步,每一步都可以更好地利用资源,但需要更多的努力。
为每个 SQS API 调用获取 10 条消息而不是一条,并同时处理它们。这样您就可以控制 2 个并发点:Pod 数量、消息数量(最多 10 条)并发。 有几个任务,每个任务获取 1-10 个任务并同时处理它们。这是 3 个并发点:Pod、任务和每个任务的消息。这两种解决方案遭受消息处理时间不同,这意味着单个长时间运行的消息将“阻止”所有其他 1-9 个工作“槽”,从而有效地将并发性降低到低于配置的水平。 设置一个 TPL 数据流块以同时处理消息和一个(或几个)任务连续获取消息并泵入块。请记住,需要显式删除 SQS 消息,因此块也需要接收消息句柄,以便在处理后删除消息。 TPL 数据流“管道”由几个块组成,每个块都有自己的并发度。当您有不同的消息处理步骤且每个步骤都有不同的限制(例如具有不同限制配置的不同 API)时,这很有用。我个人非常喜欢 Dataflow 库,并且对它感到满意,所以我会直接使用它。但是,当性能不是问题时,更简单的解决方案也是有效的。
【讨论】:
谢谢,我实现了大约 100 TPS,数据流有 50 个线程(并行度的混合度),但是当我在我的操作块上添加并行循环时,我得到超过 500 TPS,可以吗使用并行内部数据流 概念上没问题.. 但这取决于实现。Parallel.For
不适合异步使用,所以只要你不这样做就可以了。它可能只是另一个并发点的进展中的另一个步骤。
此外,如果 for 循环只是块正在执行的处理的一部分,那是有道理的。但是,如果它是所有块都在做,那么您可能只想在有意义的情况下将各个“迭代”放入块中并增加其并行性。【参考方案2】:
我不熟悉 Kubernetes,但在最大化吞吐量时需要考虑很多事情。
您提到的所有事情都是 IO 绑定而不是 CPU 绑定。因此,使用 TPL 会使边际收益的设计过于复杂。见:https://docs.microsoft.com/en-us/dotnet/csharp/async#recognize-cpu-bound-and-io-bound-work
您的 Kubernetes pod 可能存在网络限制。例如,使用 Azure Function Apps on Consumption Plans 限制为 1,200 个出站连接。其他服务也会有一些明确的限制。 https://docs.microsoft.com/en-us/azure/azure-functions/manage-connections?tabs=csharp#connection-limit。由于您的工作性质,您可能会在需要处理多个线程上的 IO 工作之前达到这些限制。
您可能还需要考虑您所依赖的服务的限制,并确保它们能够处理吞吐量。
您可能需要考虑使用信号量来限制活动连接的数量,以满足您的基础架构和外部依赖限制https://docs.microsoft.com/en-us/dotnet/api/system.threading.semaphoreslim?view=net-5.0
话虽如此,每秒 500 条消息是一个现实的数量。为了进一步改进它,您可以考虑让具有独立资源限制的多个进程处理队列。
【讨论】:
【参考方案3】:不熟悉您的用例,或者不熟悉您正在使用的技术,但这听起来像是一个非常常见的消息处理场景。
几条准则:
首先,这些是指导方针,您的用例可能与此处评论的人们习惯的不同。 每当您想提高吞吐量时,您都需要确定 您的瓶颈,并朝着 CPU 瓶颈蓬勃发展,确保您 充分利用它。 CPU 负载通常是最昂贵的,并且 通常为自动缩放提供更可靠的指标。显然,根据您的远程 api 调用和您的数据库,您可能会遇到其他瓶颈 - SQS 队列大小也是一个很好的自动缩放指标,但请记住,如果您的瓶颈与数据库或 API 相关,则不能保证自动缩放会增加您的吞吐量. 再次,我不会使用复杂数据结构的奇特解决方案,因为我不熟悉您的用例,所以我可能错了 - 但请保持简单。应该有一个线程负责轮询队列,当它发现新消息时,它应该创建一个处理批处理的任务。每个处理批次通常应该有一个任务 - 让 ThreadPool 处理线程数。 不熟悉 .net SQS 库。但是,我熟悉其他非常相似的解决方案的库。大多数队列库已经为您完成了这一切,您不必担心。当高度优化的库已经找到新消息时,您可能应该只调用一个回调函数。这些库可能已经为每个批次创建了一个新任务 - 您只需注册到它们的回调,并确保您await
任何 I/O 绑定代码。
编辑:我提出的解决方案确实有一个限制,即单个消息可以阻止整个批次,这不一定是坏事 - 如果您的解决方案需要对不同的消息进行不同的处理,而您不想创建这个内部批处理依赖项,TPL DataFlow 绝对是您用例的一个很好的解决方案。
【讨论】:
【参考方案4】:是的,这听起来很像 TPL 数据流 的任务,它是一种用途广泛但功能强大的工具。您的第一个链链接将从队列中获取消息(不一定是单线程的,您只需传递一些委托)。您还可以通过这种方式控制有多少项目在本地“排队”。
然后,您可以以任何您希望的方式“订阅”您的工作人员——您甚至可以对其进行自定义,以便将“错误”处理放回您的队列中——您的处理是否受 IO 限制甚至都没有关系.如果是——好吧,很好,TPL 数据流是异步的,如果不是——好吧,没问题,TPL 数据流也可以是同步的。或者你可以启动一些线程池线程,没什么大不了的。
【讨论】:
以上是关于处理 SQS 项目队列的多线程方法的主要内容,如果未能解决你的问题,请参考以下文章