Azure 事件中心和多个使用者组

Posted

技术标签:

【中文标题】Azure 事件中心和多个使用者组【英文标题】:Azure event hubs and multiple consumer groups 【发布时间】:2015-03-03 13:39:00 【问题描述】:

需要有关在以下场景中使用 Azure 事件中心的帮助。我认为消费者群体可能是这种情况下的正确选择,但我无法在网上找到具体示例。

这是问题的粗略描述和使用事件中心的建议解决方案(我不确定这是否是最佳解决方案。感谢您的反馈

我有多个事件源,它们会生成大量事件数据(来自传感器的遥测数据),这些数据需要保存到我们的数据库中,并且应该并行执行一些分析,例如运行平均值、最小值-最大值强>。

发送方只能将数据发送到单个端点,但事件中心应使该数据可供两个数据处理程序使用。

我正在考虑使用两个消费者组,第一个将是一组工作角色实例,负责将数据保存到我们的键值存储中,第二个消费者组将是一个分析引擎(可能与Azure 流分析)。

首先,我如何设置消费者组,我需要在发送方/接收方做些什么,以使事件的副本出现在所有消费者组上?

我确实在网上阅读了许多示例,但它们要么使用 client.GetDefaultConsumerGroup(); 和/或将所有分区都由同一工作角色的多个实例处理。

对于我的场景,当一个事件被触发时,它需要由两个不同的工作角色并行处理(一个是保存数据,另一个是做一些分析)

谢谢!

【问题讨论】:

嗨,您对此有什么解决方案吗?即使我在我的应用程序中有相同的要求。您能分享一下您对此的看法吗? 【参考方案1】:

TLDR:看起来很合理,只需通过 CreateConsumerGroupIfNotExists 使用不同的名称来创建两个消费者组。

消费者组主要是一个概念,因此它们的具体工作方式取决于您的订阅者的实施方式。如您所知,从概念上讲,它们是一组订阅者一起工作,因此每个组都接收所有消息,并且在理想(不会发生)的情况下,可能会使用每条消息一次。这意味着每个消费者组“让所有分区都由同一工作者角色的多个实例处理”。你想要这个。

这可以通过不同的方式实现。 Microsoft 提供了两种直接使用来自事件中心的消息的方法,以及使用可能构建在两种直接方法之上的 Streaming Analytics 之类的选项。第一种方式是Event Hub Receiver,第二种是更高级别的Event Processor Host。

我没有直接使用Event Hub Receiver,所以这个特别的评论是基于这些系统如何工作的理论和文档中的推测:虽然它们是来自EventHubConsumerGroups 的created,但作为这些接收器,这没什么用处不要相互协调。如果您使用这些,您将需要(并且可以!)自己进行所有协调和提交偏移量,这在某些情况下具有优势,例如在与计算聚合相同的事务中将偏移量写入事务数据库。使用这些low level receivers,使用同一个 Azure 消费组的不同逻辑消费组可能不应该(规范而不实用的建议)特别成问题,但你应该使用不同的名称,以防它确实重要或者你更改为EventProcessorHosts .

现在了解更多有用的信息,EventProcessorHosts 可能构建在 EventHubReceivers 之上。它们是更高级别的东西,并且支持使多台机器作为逻辑消费者组一起工作。下面我从我的代码中包含了一个经过轻微编辑的 sn-p,它生成了一个 EventProcessorHost,并在解释一些选择时留下了一堆 cmets。

//We need an identifier for the lease. It must be unique across concurrently 
//running instances of the program. There are three main options for this. The 
//first is a static value from a config file. The second is the machine's NETBios
//name ie System.Environment.MachineName. The third is a random value unique per run which
//we have chosen here, if our VMs have very weak randomness bad things may happen.

string hostName = Guid.NewGuid().ToString();

//It's not clear if we want this here long term or if we prefer that the Consumer 
//Groups be created out of band. Nor are there necessarily good tools to discover 
//existing consumer groups.
NamespaceManager namespaceManager = 
    NamespaceManager.CreateFromConnectionString(eventHubConnectionString);
EventHubDescription ehd = namespaceManager.GetEventHub(eventHubPath);
namespaceManager.CreateConsumerGroupIfNotExists(ehd.Path, consumerGroupName);

host = new EventProcessorHost(hostName, eventHubPath, consumerGroupName, 
    eventHubConnectionString, storageConnectionString, leaseContainerName);
//Call something like this when you want it to start
host.RegisterEventProcessorFactoryAsync(factory)

你会注意到我告诉 Azure 如果它不存在就创建一个新的消费者组,如果它不存在你会收到一个可爱的错误消息。老实说,我不知道这样做的目的是什么,因为它不包含存储连接字符串,该字符串需要在实例之间保持相同,以便 EventProcessorHost 的协调(并且可能是提交)工作正确。

在这里,我提供了一张来自Azure Storage Explorer 的图片,其中包含我在 11 月试验的消费者组的租约和大概抵消。请注意,虽然我有一个 testhub 和一个 testhub-testcg 容器,但这是由于手动命名它们。如果它们在同一个容器中,那就是“$Default/0”与“testcg/0”之类的东西。

如您所见,每个分区有一个 blob。我的假设是这些 blob 用于两件事。其中第一个是用于在实例之间分配分区的 Blob 租约参见here,第二个是存储已提交的分区内的偏移量。

消费实例不是将数据推送到消费者组,而是向存储系统询问一个分区中某个偏移量的数据。 EventProcessorHosts 是一种很好的高级方式来拥有一个逻辑消费者组,其中每个分区一次只能被一个消费者读取,并且不会忘记逻辑消费者组在每个分区中取得的进展。

请记住,每个分区的吞吐量是经过测量的,因此,如果您要最大化入口,则只能有两个全速运行的逻辑消费者。因此,您需要确保有足够的分区和吞吐量单位:

    读取您发送的所有数据。 如果您因问题落后几个小时,请在 24 小时保留期内赶上。

总之:消费者群体是您所需要的。您阅读的使用特定使用者组的示例很好,在每个逻辑使用者组中,Azure 使用者组使用相同的名称,而不同的逻辑使用者组使用不同的名称。

我还没有使用过 Azure 流分析,但至少在预览版中你是 limited to the default consumer group。因此,不要将默认使用者组用于其他用途,如果您需要两个单独的 Azure 流分析批次,您可能需要做一些讨厌的事情。但是很容易配置!

【讨论】:

只是一个更新:Azure 流分析现在可以采用自定义消费者组。

以上是关于Azure 事件中心和多个使用者组的主要内容,如果未能解决你的问题,请参考以下文章

在 Analytics Query 中使用多个分区键将数据存储在多个 Azure 存储表中

Azure 事件中心 - 如何使用官方 SDK 并行使用事件?

来自另一个帐户中的事件中心的 Azure 函数触发器

如何在整个 Azure 管理组上运行 powershell 脚本以跨越多个订阅?

Azure - 如何将新 IP 附加到多个网络安全组的 SourceAddressPrefix

Azure 中的数据处理架构