如何使用 ServiceBus EventData 偏移值
Posted
技术标签:
【中文标题】如何使用 ServiceBus EventData 偏移值【英文标题】:How to use the ServiceBus EventData Offset Value 【发布时间】:2018-11-14 14:48:54 【问题描述】:我有一些使用 Service Bus Event Data 的代码,我怀疑我需要使用 offset 属性,因为目前,我的程序正在(或似乎正在)一遍又一遍地重新运行相同的事件中心数据.
我的代码如下:
public class EventHubListener : IEventProcessor
private static EventHubClient _eventHubClient;
private const string EhConnectionStringNoPath = "Endpoint=...";
private const string EhConnectionString = EhConnectionStringNoPath + ";...";
private const string EhEntityPath = "...";
public void Start()
_eventHubClient = EventHubClient.CreateFromConnectionString(EhConnectionString);
EventHubConsumerGroup defaultConsumerGroup = _eventHubClient.GetDefaultConsumerGroup();
EventHubDescription eventHub = NamespaceManager.CreateFromConnectionString(EhConnectionStringNoPath).GetEventHub(EhEntityPath);
foreach (string partitionId in eventHub.PartitionIds)
defaultConsumerGroup.RegisterProcessor<EventHubListener>(new Lease
PartitionId = partitionId
, new EventProcessorCheckpointManager());
Console.WriteLine("Processing : " + partitionId);
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
foreach (EventData eventData in messages)
string bytes = Encoding.UTF8.GetString(eventData.GetBytes());
MyData data = JsonConvert.DeserializeObject<MyData>(bytes);
当我一遍又一遍地收到相同的消息时,我怀疑我需要做这样的事情:
string bytes = Encoding.UTF8.GetString(eventData.GetBytes(), eventData.Offset, eventData.SerializedSizeInBytes - eventData.Offset);
然而,Offset
是一个字符串,尽管它看起来是一个数值(例如“12345”)。 context.CheckPointAsync()
上的文档看起来可能就是答案;但是,在循环结束时发出它似乎没有什么区别。
所以,我有一个两部分的问题:
-
什么是偏移量?是不是我认为的那样(即流中某个点的数字标记),如果是,为什么是字符串?
为什么我会再次收到相同的消息?据我了解事件中心,尽管它们至少保证一次,但一旦检查点出现问题,我不应该收到相同的消息。
编辑:
经过一段时间的折腾,我想出了一些可以避免这个问题的方法;但是,我当然不会声称这是一个解决方案:
var filteredMessages =
messages.Where(a => a.EnqueuedTimeUtc >= _startDate)
.OrderBy(a => a.EnqueuedTimeUtc);
使用EventProcessorHost
似乎实际上使问题变得更糟;也就是说,不仅历史事件被重播,而且似乎以随机顺序重播。
编辑:
我遇到了this@Mikhail 的优秀文章,它似乎确实解决了我的确切问题。然而;并且大概是我的问题的根源(或者其中之一,假设这是正确的,那么我不确定为什么使用EventProcessorHost
不仅仅像@Mikhail 在cmets 中所说的那样开箱即用)。但是ICheckpointManager
的ServiceBus版本只有一个接口方法:
namespace Microsoft.ServiceBus.Messaging
public interface ICheckpointManager
Task CheckpointAsync(Lease lease, string offset, long sequenceNumber);
【问题讨论】:
你在使用 EventProcessorHost 吗?如果是这样,调用CheckPointAsync
就足够了,它会处理偏移量。
能否贴出定义 ProcessEventsAsync 的类的全部代码?
@Mikhail - 不,我只是在实现 IEventProcessor
但是您没有在任何地方调用 CheckPointAsync 吗?你需要调用它。例如在一段时间后或处理一批消息后。示例见github.com/DeHeerSoftware/SemanticLogging.EventHub/blob/master/…
@pm_2 请问为什么不呢?你最终会自己重新实现它。
【参考方案1】:
什么是偏移量?是不是我认为的那样(即流中某个点的数字标记),如果是,为什么是字符串?
偏移量是流中的指针。当消息保留策略到期时,事件的偏移量会随着事件从事件中心移除而发生变化。因此,一条消息曾经位于偏移量 10,几天后可能位于偏移量 0,因为旧消息已从流中删除。这有一个很好的图表:Event Hubs: Stream Offsets。
为什么我会再次收到相同的消息?据我了解事件中心,尽管它们至少保证一次,但一旦检查点出现问题,我不应该收到相同的消息。
如果您使用低级别 EventReceiver offset
,您可能会再次收到相同的消息,因为当消息保留策略到期时,消息会从事件中心过期(即默认为 1 天)。 Sequence number
是一个更好的利用字段,因为它不会改变。
当检查点成功时,它会告诉我们成功处理的最后一个事件,因此您不应该返回相同的事件,因为当客户端启动时,它会创建一个流到事件流中的某个位置在那次事件之后。您可以在GitHub 上提出问题。
EventProcessorHost
很有帮助,因为它试图平衡正在运行的实例数量之间的分区处理。 (即考虑一个 6 个分区的事件中心。如果您有 2 个 EventProcessorHosts 连接到同一个事件中心并使用同一个使用者组读取,它们最终将平衡这些分区的处理,每个分区有 3 个。)当有网络丢失等暂时性故障。
它支持对 Azure Storage Blob 等持久存储进行检查点。这是一个示例:Process Events using an EventProcessorClient
【讨论】:
【参考方案2】:您的标题应该是事件中心,而不是服务总线。对于您的问题:
-
虽然事件中心的设计与 Kafka 相似,但一个很大的区别是您应该自己管理偏移量。事件中心代理完全不知道您的消费者组的偏移量。
所以 event hub sdk 提供了一些帮助类来存储存储帐户中的偏移量,但是您仍然需要在处理完消息后手动调用 checkpoint。
【讨论】:
以上是关于如何使用 ServiceBus EventData 偏移值的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Azure.Messaging.ServiceBus 库添加自定义属性?
如何使用 terraform 将多个 IP 动态添加到 azure servicebus 防火墙
如何在 Python MagicMock 中模拟 Azure ServiceBus 接收器
使用 Azure.ServiceBus.Messaging C# SDK for SAS 令牌的服务总线连接字符串
使用 Windows.Azure.ServiceBus (5.2.0) 的 ServiceBus 消息处理程序无法使用 DataContractSerializer NET 4.6.1 反序列化正文流