Azure 事件中心事件属性如何序列化?

Posted

技术标签:

【中文标题】Azure 事件中心事件属性如何序列化?【英文标题】:How are Azure Event Hub event properties serialized? 【发布时间】:2019-06-21 17:51:33 【问题描述】:

我想生成并发送EventData 对象,这些对象在EventData.Properties 字典中设置了各种特定于应用程序的属性。 Properties 字典本身定义为 IDictionary<string, object>,这意味着我可以将任何数据类型作为值传递。

var eventData = new EventData(bytes);
eventData.Properties["Prop1"] = // string?
eventData.Properties["Prop2"] = // int?
eventData.Properties["Prop3"] = // DateTime?
eventData.Properties["Prop4"] = // Custom?

实际上允许将哪些数据类型传递到Properties 字典中?显然,这些数据需要以某种方式进行序列化,但文档没有提及任何相关内容。

【问题讨论】:

【参考方案1】:

我通过尝试一些不起作用的方法找到了更多信息,然后通过查看异常调用堆栈找到了源代码。

在我的例子中,支持的属性类型似乎源自 AMQP 协议支持的内容(记录在 here)。在内部 AmqpMessageConverter 代码中实际上有一个很大的 switch 语句,它基本上给了我我的答案:

AmqpMessageConverter.TryGetAmqpObjectFromNetObject

在这里总结一下:

所有 .NET 基元类型(intstringdouble 等) Guid DateTimeDateTimeOffset Stream Uri TimeSpan byte[] IList IDictionary

我自己的自定义类型(POCO - “普通旧 CLR 对象”)不被接受,并导致以下异常:

System.Runtime.Serialization.SerializationException: Serialization operation failed due to unsupported type EventHubsTesting.Program+Poco.
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.TryGetAmqpObjectFromNetObject(Object netObject, MappingType mappingType, Object& amqpObject)
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.UpdateAmqpMessageHeadersAndProperties(AmqpMessage message, String publisher, EventData eventData, Boolean copyUserProperties)
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDataToAmqpMessage(EventData eventData)
   at Microsoft.Azure.EventHubs.Amqp.AmqpMessageConverter.EventDatasToAmqpMessage(IEnumerable`1 eventDatas, String partitionKey)
   at Microsoft.Azure.EventHubs.Amqp.AmqpEventDataSender.OnSendAsync(IEnumerable`1 eventDatas, String partitionKey)
   at Microsoft.Azure.EventHubs.EventDataSender.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
   at Microsoft.Azure.EventHubs.EventHubClient.SendAsync(IEnumerable`1 eventDatas, String partitionKey)
   at EventHubsTesting.Program.Sender(CancellationTokenSource shutdownSource) in C:\EventHubsTesting\Program.cs:line 99

我假设如果我想使用自定义类型,那么我需要自己序列化它们,然后再将它们分配为应用程序属性。

【讨论】:

这基本上也是我们通过反复试验得出的结论。我们只是将其他域关心的重要事情从 POCO 中剥离出来,将它们放入字典中,并依靠订阅者来调用整个 POCO。不过,感谢您发布此答案! 请注意,Microsoft.Azure.ServiceBus.Message 上的 UserProperties 中不支持 IList 和 IDictionary。如果传入参数 mappingType == MappingType.ApplicationProperty,AmqpMessageConverter.TryGetAmqpObjectFromNetObject 将引发异常。如果要转换的是 UserProperties,则 mappingType 是 MappingType.ApplicationProperty。我认为 EventData.Properties 也是如此。【参考方案2】:

跟进 Chris 的回答 - AMQP 标头根据 AMQP 类型规范进行序列化 - http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html

对于查看此答案的任何 Java 开发人员 - 您可以使用 Apache qpid 库来执行编码/解码操作。解码器类可以在这里找到 - https://qpid.apache.org/releases/qpid-proton-j-0.33.1/api/index.html。

EH 服务与编码无关,因此通过 Kafka 协议读取的消费者需要手动解码 AMQP 标头。本机 EH 客户端将为您解码标头。见-https://github.com/Azure/azure-event-hubs-for-kafka/issues/56。

【讨论】:

以上是关于Azure 事件中心事件属性如何序列化?的主要内容,如果未能解决你的问题,请参考以下文章

Azure 流分析错误:无法反序列化来自 IOT 中心的输入事件

流分析通过事件中心从 Python 反序列化 JSON

Azure 事件中心 - 如何使用官方 SDK 并行使用事件?

如何捕获来自事件中心的错误 json 记录到 azure 流分析

Azure 事件中心偏移

使用 Python Qpid/Proton/Messenger(),如何过滤来自 Azure 事件中心的消息?