互操作性 Azure 服务总线消息队列消息

Posted

技术标签:

【中文标题】互操作性 Azure 服务总线消息队列消息【英文标题】:Interoperability Azure Service Bus Message Queue Messages 【发布时间】:2016-02-06 04:00:04 【问题描述】:

我有一个 Java 应用程序和一个 NodeJS 应用程序都使用一个 Azure 服务总线消息队列。

我在客户身上看到了一些奇怪的效果,如下所示。

JAVA MESSAGE Producer(根据 Azure JMS 教程使用 QPID 库):

 TextMessage message = sendSession.createTextMessage();
        message.setText("Test AMQP message from JMS");
        long randomMessageID = randomGenerator.nextLong() >>>1;
        message.setJMSMessageID("ID:" + randomMessageID);
        sender.send(message);
        System.out.println("Sent message with JMSMessageID = " + message.getJMSMessageID());

输出: 发送消息 JMSMessageID = ID:2414932965987073843

NODEJS 消息消费者:

serviceBus.receiveQueueMessage(queue, timeoutIntervalInS: timeOut, isReceiveAndDelete: true, function(err, message) 
if(message !==null)console.log(util.inspect(message, showHidden: false, depth: null));
);

输出:

 body: '@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001aTest AMQP message from JMS',
brokerProperties:
 DeliveryCount: 1,
EnqueuedSequenceNumber: 5000004,
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:28:21 GMT',
MessageId: '2414932965987073843',
PartitionKey: '89',
SequenceNumber: 59672695067659070,
State: 'Active',
TimeToLive: 1209600,
To: 'moequeue' ,
contentType: 'application/xml; charset=utf-8' 

如果我将其与通过 serviceBus.sendQueueMessage() 插入队列的消息进行比较,则属性如下所示:

 body: 'test message',
brokerProperties:
 DeliveryCount: 1,
EnqueuedSequenceNumber: 0,
EnqueuedTimeUtc: 'Wed, 04 Nov 2015 21:44:03 GMT',
MessageId: 'bc0a3d4f-15ba-434f-9fb0-1a3789885f8c',
PartitionKey: '734',
SequenceNumber: 37436171906517256,
State: 'Active',
TimeToLive: 1209600 ,
contentType: 'text/plain',
customProperties:
 message_number: 0,
sent_date: Wed Nov 04 2015 21:44:03 GMT+0000 (UTC)  

所以内容类型一开始就不同 - 为什么? - 然后第一个消息有效负载正文中的奇怪垃圾来自哪里:@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/�\u001a 这是序列化的结果吗?如何缓解这种情况?

在这里也可以找到代码: http://pastebin.com/T9RTFRBk

【问题讨论】:

【参考方案1】:

我遇到了同样的问题,即以“@\u0006string\b3http://schemas.microsoft.com/2003/10/Serialization/\u0001”为前缀的消息正文。

我使用 Nodejs 以及 azure-iot-device-mqttazure-iot-device 包将消息发送到 IoT 中心。我正在使用流分析作业从 IoT 中心接收消息并将它们发布到队列。我正在使用带有 amqp10 包的 Nodejs 从队列中接收事件。

问题不是由我发送或接收消息的方式引起的。相反,问题在于流分析兼容性级别!兼容级别 1.0(至少在我部署时是默认的)使用 DataContractSerializer 将消息序列化为 XML 流! Microsoft 使用兼容性级别 1.1 更改(修复)了此问题。因此,您可能只需将流分析作业的兼容性级别(CONFIGURE->Compatibility level)更改为 1.1。

见:https://docs.microsoft.com/en-us/azure/stream-analytics/stream-analytics-compatibility-level#major-changes-in-the-latest-compatibility-level-11:

【讨论】:

【参考方案2】:

我们遇到了完全相同的问题,尽管在一个使用基于 Camel 的生产者的示例中涉及更多。由于我们环境的变化,我们开始遇到这些问题。

这里的问题是 REST 服务在对节点客户端的 HTTP 响应进行编码时如何解释 JMS 消息。

我们发现 JmsTextmessage 出于某种原因(不完全清楚)被假定为“application/xml”类型,并且内容将按原样转发。因此,您在示例中得到了 OUTPUT。

如果改为使用 JmsByteMessage,则内容将被解释为“应用程序/八位字节流”并且不会在传输中被破坏。

所以尝试以下方式:

BytesMessage message = sendSession.createBytesMessage();
String body = "Test AMQP message from JMS";
message.writeBytes(body.getBytes(StandardCharsets.UTF_8));
sender.send(message);

我们使用它来传输 JSON 编码数据以由 Node.js 客户端解释。

【讨论】:

【参考方案3】:

Azure 服务总线支持两种不同的协议:AMQP 和 HTTP。使用 qpid 库的 Java/JMS 将 AMQP 协议用于 ServiceBus。但是,ServiceBus REST API 包装在 NodeJS thur HTTP 协议中。

Service Bus 中的 AMQP 支持详情,请参考https://azure.microsoft.com/en-us/documentation/articles/service-bus-amqp-overview/。

ServiceBus的REST API请参考https://msdn.microsoft.com/en-us/library/azure/hh780717.aspx。

AMQP 是一种二进制应用层协议,旨在高效地 支持各种消息传递应用程序和通信 模式。 - from WikiPedia

但是 HTTP 是一个文本协议。

消息格式如下,请参考工件http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format的Message Format部分。并且AMQP规范可以参考http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html。

                                                     Bare Message
                                                            |
                                      .---------------------+--------------------.
                                      |                                          |
 +--------+-------------+-------------+------------+--------------+--------------+--------+
 | header | delivery-   | message-    | properties | application- | application- | footer |
 |        | annotations | annotations |            | properties   | data         |        |
 +--------+-------------+-------------+------------+--------------+--------------+--------+
 |                                                                                        |
 '-------------------------------------------+--------------------------------------------'
                                             |
                                      Annotated Message

所以用Java发送的消息或者用NodeJS发送的消息被序列化成不同的结果。

AMQP的body内容中\uXXXX的内容是Unicode字符。

Unicode字符\u0006是Acknowledge控制字符,请参考https://en.wikipedia.org/wiki/Acknowledge_character了解。

而Unicode字符\u001a是替代控制字符,请参考https://en.wikipedia.org/wiki/Substitute_character。

它们限制了消息头中元数据的开始和结束。

【讨论】:

明白了,那么有没有办法在 Node 中“安全地”反序列化这些消息?我认为在所有客户端上坚持使用相同的协议是一个好主意,但现实可能会有所不同,并且有效负载仍然需要可交换。 @user3506080 在发送方和接收方中使用不同的协议并不是一个好主意。但我认为如果真的有必要,您可以通过特殊字符拆分内容来安全地获得正确的消息。 虽然信息在技术上是正确的,但完全是错误的。 AMQP 是一种二进制传输协议,根本不对实际消息进行任何编码。消息的编码是什么JMS。如果您将消息的类型更改为字节消息(JmsByteMessage),则编码将被 REST 服务解释为“application/octetstream”而不是“application/xml”,并且您的消息将在节点中可读。 我明白这是在说什么,但是,它并没有说明任何明确的解决方案。我的看法是我们必须从 u006 删除到 u001,然后再删除 JSON.parse。有没有一个库可以为我们做到这一点(即使很简单,最好还是坚持标准)?

以上是关于互操作性 Azure 服务总线消息队列消息的主要内容,如果未能解决你的问题,请参考以下文章

Azure 服务总线队列消息处理

从 Azure 函数将消息写入 Azure 服务总线队列

Azure 服务总线中的死信队列中的消息是不是过期?

如何查看 Azure 服务总线队列中的所有消息?

读取 Azure 服务总线队列中的所有活动消息

Azure 服务总线队列以并行方式异步处理消息