如何使用 ServiceBus EventData 偏移值

Posted

技术标签:

【中文标题】如何使用 ServiceBus EventData 偏移值【英文标题】:How to use the ServiceBus EventData Offset Value 【发布时间】:2018-11-14 14:48:54 【问题描述】:

我有一些使用 Service Bus Event Data 的代码,我怀疑我需要使用 offset 属性,因为目前,我的程序正在(或似乎正在)一遍又一遍地重新运行相同的事件中心数据.

我的代码如下:

public class EventHubListener : IEventProcessor

    private static EventHubClient _eventHubClient;        
    private const string EhConnectionStringNoPath = "Endpoint=...";
    private const string EhConnectionString = EhConnectionStringNoPath + ";...";
    private const string EhEntityPath = "...";        

    public void Start()
    
        _eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
        EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();            
        EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);

        foreach (string partitionId in eventHub.PartitionIds)
        
            defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
            
                PartitionId = partitionId
            , new EventProcessorCheckpointManager());

            Console.WriteLine("Processing : " + partitionId);
        
    

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    
        foreach (EventData eventData in messages)
                        
            string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
            MyData data = JsonConvert.DeserializeObject<MyData>(bytes);

当我一遍又一遍地收到相同的消息时,我怀疑我需要做这样的事情:

string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);

然而,Offset 是一个字符串,尽管它看起来是一个数值(例如“12345”)。 context.CheckPointAsync() 上的文档看起来可能就是答案;但是,在循环结束时发出它似乎没有什么区别。

所以,我有一个两部分的问题:

    什么是偏移量?是不是我认为的那样(即流中某个点的数字标记),如果是,为什么是字符串? 为什么我会再次收到相同的消息?据我了解事件中心,尽管它们至少保证一次,但一旦检查点出现问题,我不应该收到相同的消息。

编辑:

经过一段时间的折腾,我想出了一些可以避免这个问题的方法;但是,我当然不会声称这是一个解决方案:

var filteredMessages =
            messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
            .OrderBy(a => a.EnqueuedTimeUtc);

使用EventProcessorHost 似乎实际上使问题变得更糟;也就是说,不仅历史事件被重播,而且似乎以随机顺序重播。

编辑:

我遇到了this@Mikhail 的优秀文章,它似乎确实解决了我的确切问题。然而;并且大概是我的问题的根源(或者其中之一,假设这是正确的,那么我不确定为什么使用EventProcessorHost 不仅仅像@Mikhail 在cmets 中所说的那样开箱即用)。但是ICheckpointManager的ServiceBus版本只有一个接口方法:

namespace Microsoft.ServiceBus.Messaging


    public interface ICheckpointManager
    
        Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
    

【问题讨论】:

你在使用 EventProcessorHost 吗?如果是这样,调用CheckPointAsync 就足够了,它会处理偏移量。 能否贴出定义 ProcessEventsAsync 的类的全部代码? @Mikhail - 不,我只是在实现 IEventProcessor 但是您没有在任何地方调用 CheckPointAsync 吗?你需要调用它。例如在一段时间后或处理一批消息后。示例见github.com/DeHeerSoftware/SemanticLogging.EventHub/blob/master/… @pm_2 请问为什么不呢?你最终会自己重新实现它。 【参考方案1】:

    什么是偏移量?是不是我认为的那样(即流中某个点的数字标记),如果是,为什么是字符串?

    偏移量是流中的指针。当消息保留策略到期时,事件的偏移量会随着事件从事件中心移除而发生变化。因此,一条消息曾经位于偏移量 10,几天后可能位于偏移量 0,因为旧消息已从流中删除。这有一个很好的图表:Event Hubs: Stream Offsets。

    为什么我会再次收到相同的消息?据我了解事件中心,尽管它们至少保证一次,但一旦检查点出现问题,我不应该收到相同的消息。

    如果您使用低级别 EventReceiver offset,您可能会再次收到相同的消息,因为当消息保留策略到期时,消息会从事件中心过期(即默认为 1 天)。 Sequence number 是一个更好的利用字段,因为它不会改变。

    当检查点成功时,它会告诉我们成功处理的最后一个事件,因此您不应该返回相同的事件,因为当客户端启动时,它会创建一个流到事件流中的某个位置在那次事件之后。您可以在GitHub 上提出问题。

EventProcessorHost 很有帮助,因为它试图平衡正在运行的实例数量之间的分区处理。 (即考虑一个 6 个分区的事件中心。如果您有 2 个 EventProcessorHosts 连接到同一个事件中心并使用同一个使用者组读取,它们最终将平衡这些分区的处理,每个分区有 3 个。)当有网络丢失等暂时性故障。

它支持对 Azure Storage Blob 等持久存储进行检查点。这是一个示例:Process Events using an EventProcessorClient

【讨论】:

【参考方案2】:

您的标题应该是事件中心,而不是服务总线。对于您的问题:

    虽然事件中心的设计与 Kafka 相似,但一个很大的区别是您应该自己管理偏移量。事件中心代理完全不知道您的消费者组的偏移量。 所以 event hub sdk 提供了一些帮助类来存储存储帐户中的偏移量,但是您仍然需要在处理完消息后手动调用 checkpoint。

【讨论】:

以上是关于如何使用 ServiceBus EventData 偏移值的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Azure.Messaging.ServiceBus 库添加自定义属性?

如何使用 terraform 将多个 IP 动态添加到 azure servicebus 防火墙

如何在 Python MagicMock 中模拟 Azure ServiceBus 接收器

使用 Azure.ServiceBus.Messaging C# SDK for SAS 令牌的服务总线连接字符串

使用 Windows.Azure.ServiceBus (5.2.0) 的 ServiceBus 消息处理程序无法使用 DataContractSerializer NET 4.6.1 反序列化正文流

Microsoft.ServiceBus.Messaging 与 Microsoft.Azure.ServiceBus