Azure 事件中心 - 如何使用官方 SDK 并行使用事件?

Posted

技术标签:

【中文标题】Azure 事件中心 - 如何使用官方 SDK 并行使用事件?【英文标题】:Azure event hub - How to consume events parallelly using the official SDK? 【发布时间】:2021-11-07 00:32:36 【问题描述】:

我已经设置了以下测试:

创建了一个包含 10 个分区的天蓝色事件中心 创建了一个存储帐户 创建了单个消费者组 用 10k 条消息填充集线器 创建了 2 个容器(在 AKS 上),它们基本上会使用这些事件(使用相同的使用者组)并记录它们的 Azure 应用洞察。

期望: 运行

traces
| where message == "Event received"
| summarize count() by bin(timestamp,1s), cloud_RoleInstance
| render timechart 

然后查看类似:

但我看到的是这个:

(这是每个 10k 事件的 3 次运行,以消除“pod 未预热变量”)

请注意,pod 活动之间没有(或很少)重叠,就好像其中一个持有锁或其他东西一样,并且神秘地,在某个时候,锁被释放并被另一个 pod 使用。

相关消费者代码:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)

    _processor = new EventProcessorClient(_storageClient, _consumerGroup, _hubConnection, _eventHubName);
    _processor.ProcessEventAsync += ProcessEventHandler;
    _processor.ProcessErrorAsync += ProcessErrorHandler;

    // Start the processing

    await _processor.StartProcessingAsync(stoppingToken);


internal async Task ProcessEventHandler(ProcessEventArgs eventArgs)

    _logger.LogTelemetry("Event received");

    await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);

【问题讨论】:

【参考方案1】:

如上所述,您的高级场景已设置为并行使用。

每个EventProcessorClient 独立工作,尽管它们通过存储进行协调以在它们之间分割分区的所有权。在这种情况下,每个处理器应该拥有 5 个分区,他们将在大约 60-90 秒内使用默认配置声明这些分区,在此之后所有权应该是稳定的。

对于处理器拥有的每个分区,一个独立的后台任务负责从事件中心读取事件并将它们分派给您的处理程序。您的处理程序将被并发调用,尽管它保证给定分区的单个活动调用。

您所看到的结果表明存在问题,但推测原因的上下文有限。来自 sn-p 的一些观察/问题:

ExecuteAsync 将在处理器启动后立即退出;如果其他东西没有阻止以保持主机进程处于活动状态,则它可能正在终止。

_logger 会被不同的线程同时调用。

ProcessEventHandler 不考虑异常;如果它抛出,负责处理分区的任务将会出错。根据您的主机环境,它可能会重新启动或主机进程可能会崩溃。我们强烈建议您关注guidance for processor handlers。

支持为每个事件添加检查点,但会对吞吐量产生负面影响。对于大多数情况,我们建议在 X 个事件或某个固定时间间隔过去之后进行检查点,这些值取决于您的应用程序可以重新处理的事件数量。

我很乐意帮助您深入了解可能导致您所看到的集群行为的原因,但 Stack Overflow 可能不是这样做的最佳场所。您可能希望在Azure SDK for .NET repository 中提出问题,我们可以在那里解决问题。

【讨论】:

感谢回复,这里是打开的问题:github.com/Azure/azure-sdk-for-net/issues/23939【参考方案2】:

上面的代码其实并没有错。 On this GitHub issue 我们进行了一些讨论,并且能够注意到在处理较大批量(500k 事件)时的预期行为。

截图如下:

【讨论】:

以上是关于Azure 事件中心 - 如何使用官方 SDK 并行使用事件?的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将压缩的 json 数据推送到 azure 事件中心并在 azure 流分析中处理?

Azure 事件中心事件属性如何序列化?

如何将 azure ml 模型的输出发送到事件中心?

Azure 事件中心吞吐量

使用 Python Qpid/Proton/Messenger(),如何过滤来自 Azure 事件中心的消息?

Azure 事件中心 - 如何在 .Net Core WebAPI 中实现消费者?