WebJob 一次处理来自队列的多条消息

Posted

技术标签:

【中文标题】WebJob 一次处理来自队列的多条消息【英文标题】:WebJob processing multiple messages at a time from queue 【发布时间】:2016-04-24 11:59:03 【问题描述】:

我的 WebJob 正在处理来自队列的多条消息,我不希望它这样做。我的队列有依赖消息,我想依次处理这些消息。 我尝试将“BatchSize”配置为 1,但没有运气。它仍在一次处理多个消息。

【问题讨论】:

【参考方案1】:

BatchSize 设置 (JobHostConfiguration.Queues.BatchSize) 不适用于 ServiceBus 队列处理,仅适用于 Azure 队列。要配置 ServiceBus 队列处理,请使用 ServiceBusConfiguration。您要配置的旋钮是ServiceBusConfiguration.MessageOptions.MaxConcurrentCalls将此设置为 1 以禁用单个实例上的并发处理(默认为 16):

JobHostConfiguration config = new JobHostConfiguration();
ServiceBusConfiguration serviceBusConfig = new ServiceBusConfiguration();
serviceBusConfig.MessageOptions.MaxConcurrentCalls = 1;
config.UseServiceBus(serviceBusConfig);

JobHost host = new JobHost(config);
host.RunAndBlock();

这将确保在单个实例上一次只处理一条消息。如果您的 WebApp 横向扩展,则每个横向扩展的实例都将在此模式下运行,这意味着您将在多个实例之间进行并发处理。如果您也不想这样,您可以使用SingletonAttribute 来确保您的函数只有一个实例在跨实例运行。请参阅Singleton wiki page 了解更多信息。

【讨论】:

谢谢伙计,正是我需要的【参考方案2】:

如果您的消息具有某种顺序依赖性,则您可能有一个操作管道。我有一个特殊的场景,我使用 Azure 队列而不是 ServiceBus,但我相信以下想法可以适用:

我有一个自动化过程:1) 创建数据库,2) 创建 Web 应用程序,3) 配置 Web 应用程序,以及 4) 部署代码。就我而言,事情必须按此顺序完成。

对于这种情况,我为每个任务创建了一个队列:create-database-queuecreate-webapp-queueconfigure-web-appdeploy-app-queue

这样工作会更安全,因为您可以隔离任务。一旦一个队列中的一个进程完成,就可以将其推送到下一个队列。此外,您可以轻松地在流程中插入新步骤。例如,我可以不假思索地轻松插入新步骤1.5) Restore database backup

此外,您还提倡将职责与各个队列分离。

【讨论】:

【参考方案3】:

它是什么样的队列很大程度上取决于你所做的事情是否成功。我不确定为什么您会遇到队列批量大小的问题 - Azure 存储队列会为您提供一条又一条的消息,服务总线也是如此。

Azure WebJobs 将随着您托管它们的网站数量而扩展,这可能会给您带来并发问题,即如果您有一个队列处理器作为连续运行的 Web 作业,那么如果您的网站扩展到两个实例,您将有 两个 WebJobs 也在为同一个队列而战。

因此,如果您需要单个队列处理器来运行,将 Web 作业转移到云服务中可能是一个更好的解决方案,您可以 100% 控制实例数量,与网站及其实例完全分离.

由于您需要一个接一个地连续处理每条消息,因此扩展处理器的数量是没有意义的,因为它们无论如何都需要等待下一条消息被处理,所以只要这是一个需求,并且您可能想要扩展您的网站,我建议 CloudService 作为更好的选择。

【讨论】:

仅供参考,您可以将 WebJobs 配置为 Singleton。因此,如果这是您唯一想要完成的事情,则无需迁移到云服务

以上是关于WebJob 一次处理来自队列的多条消息的主要内容,如果未能解决你的问题,请参考以下文章

如何防止 Azure webjob 同时多次处理相同的消息

如何基于 Azure 中的服务总线队列自动缩放 Python webjob?

连续 WebJob 自动停止

Mule Quartz 调度器:处理多条消息

如何每隔 X 秒批量处理来自服务总线队列的消息

Spring amqp批处理接收消息