Azure 排队服务总线超过百万条记录的功能没有到达终点
Posted
技术标签:
【中文标题】Azure 排队服务总线超过百万条记录的功能没有到达终点【英文标题】:Azure function of queueing to service bus over million record does not reach to the end 【发布时间】:2021-10-01 13:30:24 【问题描述】:我正在尝试实现一个定期运行(每周一次)并调用具有 1,500,000 项元数据domain/items
的外部 API 的解决方案,然后尝试确定每个项是否需要更新或者根据任意逻辑插入到数据库中。
在几次attempts 之后,我最终实现了一个具有两个 azure 功能(一个用于入队,另一个用于出队)的服务总线解决方案。
第一个 azure 函数定期触发并为 150 万个项目的元数据调用外部 api(高级计划)-每个项目约为 1.9 KB:
[FunctionName("EnqueueFooMetadata")]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[ServiceBus("foosmetadata", Connection = "ServiceBusConnection")] IAsyncCollector<FooMetadata> foosMetadataQueue)
IEnumerable<FooMetadata> foosMetadata = await _service.GetFoosMetadata();
this._logger.LogTrace($"Start Enqueue foosMetadata.Count()");
await Task.Run(() =>
Parallel.ForEach(foosMetadata, new ParallelOptions() , async (FooMetadata fooMetadata) =>
await foosMetadataQueue.AddAsync(fooMetadata);
);
);
this._logger.LogTrace($"Done Enqueue foosMetadata.Count()");
在服务总线的另一端有一个绑定到它的函数:
[FunctionName("DequeueGiataProperties")]
public async Task Run([ServiceBusTrigger("foosmetadata", Connection = "ServiceBusConnection")] FooMetadata foo)
var getGiataProperiesResult = await _service.Dequeue(foo);
this._logger.LogTrace($"dequeing item: foo.Id, was done successfully.");
它对少量项目按预期工作(当 IEnumerable<FooMetadata> foosMetadata = await _service.GetFoosMetadata();
中的 foosMetadata
的计数约为 15,000 时),我可以看到 Done Enqueue...
的踪迹,但对于大量项目,它总是停止在中间的某个地方,我看不到痕迹。
我不想转移建议的答案,但它看起来像 azure 函数的超时问题。 处理大数据问题有什么建议?
【问题讨论】:
当它不起作用时,如何它不起作用? 嘿@mjwills 看到更新的问题:我可以看到 Done Enqueue 的踪迹...,但对于大量的项目,它总是停在中间的某个地方,我看不到踪迹。 Azure 函数执行时间的限制是多少? docs.microsoft.com/en-us/azure/azure-functions/functions-scale github.com/Azure/Azure-Functions/blob/main/… 当它停止工作需要多长时间(即“太长”大概需要多长时间)? 一种选择可能是异步收集器正在积极地进行批处理,而且太多了。另一种方法是尝试一次发送一条消息,看看 150 万条消息中有多少通过非异步收集器通过。我也会摆脱 task.run 和并行。试一试。_service.GetFoosMetadata()
总是返回 150 万个项目?
【参考方案1】:
我认为您遇到了许多问题,其中大部分都在代码的发布者部分。
-
Parallel.Foreeach 不是异步的,编译器允许您
写asyc代码,但是Parrallel.ForEach其实是同步的
特征。您在 Parallel.ForEach 中使用异步羔羊,这将
有意外的行为。
第二个问题可能是 Azure 函数的超时问题。
取决于您的计划,最多 5 分钟(一次消耗
计划)和(付费计划 20 分钟)供您完成。
以 150 万次调用 API,您期望它
将在那个时间范围内完成,很可能是
甚至 1/10 秒调用 API 的开销,正在打破
时间限制。
有很多方法可以打破 Parallel.ForeEach,主要是切换到使用基于 Task 的并行机制以及 DataFlow ActionBlock 之类的东西。
考虑到您正在进行的 API 调用数量,时间问题可能更难解决,但是
-
添加消息时服务总线支持批处理,您可以
一次将多条消息添加到队列中,您提到您在
高级计划,允许多条最大 1M 的消息
立即发布到服务总线。这个简单的改变可能会给你
足以让您发布所有消息。
如果没有完整的代码示例和消息大小示例,就很难对您提出的问题给出明确的答案。 因此,我建议您提供一个完整的工作示例,以帮助其他人尝试解决您面临的问题。
【讨论】:
【参考方案2】:在单个函数调用中将 150 万个项目转换为消息听起来像是这里的罪魁祸首。前面提到的并行foreach
和Task.Run
也无济于事。结合批处理IAsyncCollector
,难怪它会停滞不前。这里的问题可能还在于尝试发送的消息的总体大小以及 Functions SDK 中的底层实现。每个项目 60 字节,让我们平均再计算 40 字节的开销(标头、系统属性、AMQP 额外内容),即 150,000,000 字节或 143 MB。
我建议以下几个选项:
-
如果可能,减少调用返回的项目数。
否则,将批处理拆分为更小的块并将这些块作为一些消息发送。这也将提高可靠性,因为您的 HTTP 请求最终会被转换为一系列可靠处理的消息。
另一种选择是调查刷新IAsyncCollector
以强制它发送较小的批次。如果不可能,请使用您自己的消息发件人。最后,当您使用进程内 SDK 时,您可以利用服务总线功能扩展 (Microsoft.Azure.WebJobs.Extensions.ServiceBus
) 的预览版,该预览版几乎已超出预览版,目前为 5.0.0-beta.5。使用此版本,您将能够使用 Azure 服务总线的最新 SDK,内置安全批处理 (ServiceBusMessageBatch
)。
【讨论】:
以上是关于Azure 排队服务总线超过百万条记录的功能没有到达终点的主要内容,如果未能解决你的问题,请参考以下文章
限制服务总线消息接收的 Azure Functions 速率