流分析通过事件中心从 Python 反序列化 JSON

Posted

技术标签:

【中文标题】流分析通过事件中心从 Python 反序列化 JSON【英文标题】:Stream Analytics deserialising JSON from Python via Event Hub 【发布时间】:2016-07-15 08:12:36 【问题描述】:

我已经设置了一个 Azure 事件中心,我正在从 Python 脚本发送 JSON 格式的 AMQP 消息,并尝试使用流分析将这些消息流式传输到 Power BI。 消息是来自 IoT 设备的非常简单的设备活动

Python sn-p 是

msg = json.dumps( "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz , ensure_ascii=False, encoding='utf8')
message.body = msg
messenger.put(message)
messenger.send()

我使用 MS 教程中的示例 C# 消息阅读器从事件中心读取数据没有问题,输出为:

Message received.  Partition: '2', Data: '??"DeviceUID": "z_70b3d515200002e7_0", "Signal": "/on?1", "DeviceID": "1", "Hub": "91754623489", "Timestamp": "2016-07-15T07:56:50.277440Z"'

但是当我尝试从事件中心测试流分析输入时,我收到了错误

诊断:无法将输入事件反序列化为 Json。一些可能的原因:1)格式错误的事件 2)输入源配置错误的序列化格式

我不确定 Malformed Events 是什么意思 - 我假设流分析可以处理通过 AMQP 发送到事件中心的数据?

我看不出 C# 应用程序接收到的 JSON 有任何问题 - 除非 BOM 符号导致问题?

这是我第一次尝试这一切,我已经搜索了任何类似的帖子但无济于事,所以如果有人能指出我正确的方向,我将不胜感激。

干杯 抢

【问题讨论】:

【参考方案1】:

这是由于客户端 API 不兼容造成的。 Python 使用 Proton 在 AMQP 值消息的正文中发送 JSON 字符串。正文被编码为 AMQP 字符串(AMQP 类型编码字节 + utf8 编码字符串字节)。流分析使用 Service Bus .Net SDK,它将 AMQP 消息公开为 EventData,其正文始终是字节数组。对于 AMQP 值消息,它包括 AMQP 类型的编码字节,因为没有它们就不可能解码以下值。开头这些多余的字节会导致 JSON 序列化失败。

为了实现消息体的互操作性,应用程序应确保发布者和消费者就其类型和编码达成一致。在这种情况下,发布者应该在 AMQP 数据消息中发送原始字节。使用 Proton Python API,你可以试试这个:

message.body = msg.encode('utf-8')

另一种解决方法是在应用程序属性中发送简单类型(例如字符串)。

其他人也遇到了这个问题。 https://github.com/Azure/amqpnetlite/issues/117

【讨论】:

【参考方案2】:

正如@XinChen 所说,问题是由 AMQP 协议引起的。

根据我的经验,以下两种解决方法对这种情况有效。

    使用Send Event REST API 代替带有AMQP 的Azure Python SDK,但其余api 基于HTTP 协议,性能不高。 使用 Base64 编码发送 JSON 消息,然后将收到的消息解码为 JSON 字符串。

【讨论】:

【参考方案3】:

这两件事对我有用:

添加message.inferred = True 检查以确保您指定的转储编码为encoding='utf-8' 而不是encoding='utf8',如您的示例所示。

更新的 OP:

msg = json.dumps( "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz , ensure_ascii=False, encoding='utf-8')
message.body = msg
message.inferred = True
messenger.put(message)
messenger.send()

通过添加推断标志,我认为消息序列化程序可以正确推断正文是字节并创建 AMPQ DATA,从而解决@Xin Chen 的观点。

消息的推断标志指示消息内容如何编码到 AMQP 部分中。如果推断为真,则消息正文中的二进制和列表值将分别编码为 AMQP DATA 和 AMQP SEQUENCE 部分。如果 inferred 为 false,则消息正文中的所有值都将被编码为 AMQP VALUE 部分,无论其类型如何。

回复:Qpid Proton Docs #inferred

回复:JSON Encoder and Decoder #dumps

【讨论】:

以上是关于流分析通过事件中心从 Python 反序列化 JSON的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将压缩的 json 数据推送到 azure 事件中心并在 azure 流分析中处理?

如何捕获来自事件中心的错误 json 记录到 azure 流分析

在流分析工作并将它们路由到服务总线之后,事件中心中的事件会发生啥?

Azure 流分析获取前一个输出行以加入输入

使用流分析读取时,事件中心输入大小数据的输出大小是输出大小的三倍

如何进行反编程?