流分析通过事件中心从 Python 反序列化 JSON
Posted
技术标签:
【中文标题】流分析通过事件中心从 Python 反序列化 JSON【英文标题】:Stream Analytics deserialising JSON from Python via Event Hub 【发布时间】:2016-07-15 08:12:36 【问题描述】:我已经设置了一个 Azure 事件中心,我正在从 Python 脚本发送 JSON 格式的 AMQP 消息,并尝试使用流分析将这些消息流式传输到 Power BI。 消息是来自 IoT 设备的非常简单的设备活动
Python sn-p 是
msg = json.dumps( "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz , ensure_ascii=False, encoding='utf8')
message.body = msg
messenger.put(message)
messenger.send()
我使用 MS 教程中的示例 C# 消息阅读器从事件中心读取数据没有问题,输出为:
Message received. Partition: '2', Data: '??"DeviceUID": "z_70b3d515200002e7_0", "Signal": "/on?1", "DeviceID": "1", "Hub": "91754623489", "Timestamp": "2016-07-15T07:56:50.277440Z"'
但是当我尝试从事件中心测试流分析输入时,我收到了错误
诊断:无法将输入事件反序列化为 Json。一些可能的原因:1)格式错误的事件 2)输入源配置错误的序列化格式
我不确定 Malformed Events 是什么意思 - 我假设流分析可以处理通过 AMQP 发送到事件中心的数据?
我看不出 C# 应用程序接收到的 JSON 有任何问题 - 除非 BOM 符号导致问题?
这是我第一次尝试这一切,我已经搜索了任何类似的帖子但无济于事,所以如果有人能指出我正确的方向,我将不胜感激。
干杯 抢
【问题讨论】:
【参考方案1】:这是由于客户端 API 不兼容造成的。 Python 使用 Proton 在 AMQP 值消息的正文中发送 JSON 字符串。正文被编码为 AMQP 字符串(AMQP 类型编码字节 + utf8 编码字符串字节)。流分析使用 Service Bus .Net SDK,它将 AMQP 消息公开为 EventData,其正文始终是字节数组。对于 AMQP 值消息,它包括 AMQP 类型的编码字节,因为没有它们就不可能解码以下值。开头这些多余的字节会导致 JSON 序列化失败。
为了实现消息体的互操作性,应用程序应确保发布者和消费者就其类型和编码达成一致。在这种情况下,发布者应该在 AMQP 数据消息中发送原始字节。使用 Proton Python API,你可以试试这个:
message.body = msg.encode('utf-8')
另一种解决方法是在应用程序属性中发送简单类型(例如字符串)。
其他人也遇到了这个问题。 https://github.com/Azure/amqpnetlite/issues/117
【讨论】:
【参考方案2】:正如@XinChen 所说,问题是由 AMQP 协议引起的。
根据我的经验,以下两种解决方法对这种情况有效。
-
使用
Send Event
REST API 代替带有AMQP 的Azure Python SDK,但其余api 基于HTTP 协议,性能不高。
使用 Base64 编码发送 JSON 消息,然后将收到的消息解码为 JSON 字符串。
【讨论】:
【参考方案3】:这两件事对我有用:
添加message.inferred = True
检查以确保您指定的转储编码为encoding='utf-8'
而不是encoding='utf8'
,如您的示例所示。
更新的 OP:
msg = json.dumps( "Hub": MAC, "DeviceID": id, "DeviceUID": ouid, "Signal": text, "Timestamp": dtz , ensure_ascii=False, encoding='utf-8')
message.body = msg
message.inferred = True
messenger.put(message)
messenger.send()
通过添加推断标志,我认为消息序列化程序可以正确推断正文是字节并创建 AMPQ DATA,从而解决@Xin Chen 的观点。
消息的推断标志指示消息内容如何编码到 AMQP 部分中。如果推断为真,则消息正文中的二进制和列表值将分别编码为 AMQP DATA 和 AMQP SEQUENCE 部分。如果 inferred 为 false,则消息正文中的所有值都将被编码为 AMQP VALUE 部分,无论其类型如何。
回复:Qpid Proton Docs #inferred
回复:JSON Encoder and Decoder #dumps
【讨论】:
以上是关于流分析通过事件中心从 Python 反序列化 JSON的主要内容,如果未能解决你的问题,请参考以下文章
如何有效地将压缩的 json 数据推送到 azure 事件中心并在 azure 流分析中处理?
如何捕获来自事件中心的错误 json 记录到 azure 流分析
在流分析工作并将它们路由到服务总线之后,事件中心中的事件会发生啥?