使用 azure 逻辑应用程序将消息作为字符串发送到 azure 服务总线

Posted

技术标签:

【中文标题】使用 azure 逻辑应用程序将消息作为字符串发送到 azure 服务总线【英文标题】:Sending message to azure service bus as a string using azure logic app 【发布时间】:2018-06-28 12:54:53 【问题描述】:

我正在使用逻辑应用操作“发送消息”向服务总线主题发送消息。在控制台应用程序中读取它时,如果我这样做:

SubscriptionClient subClient = SubscriptionClient.CreateFromConnectionString(connstr, topicName, subscriptionName);
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = true; 
options.MaxConcurrentCalls = 1;  
subClient.OnMessage((message) => 
    string sjson = null;
    try
    
        sjson = message.GetBody<string>();
        Console.WriteLine(sjson);
    
    catch (Exception ex)
    
        Console.WriteLine(ex.ToString());
    
, options);

它会抛出以下异常:

System.Runtime.Serialization.SerializationException: There was an error deserializing the object of type System.String. The input source is not correctly formatted. 
---> System.Xml.XmlException: The input source is not correctly formatted.
at System.Xml.XmlExceptionHelper.ThrowXmlException(XmlDictionaryReader reader, String res, String arg1, String arg2, String arg3)
at System.Xml.XmlBufferReader.ReadValue(XmlBinaryNodeType nodeType, ValueHandle value)at System.Xml.XmlBinaryReader.ReadNode() 
at System.Xml.XmlBinaryReader.Read() 
at System.Xml.XmlBaseReader.IsStartElement() 
at System.Xml.XmlBaseReader.IsStartElement(XmlDictionaryString localName, XmlDictionaryString namespaceUri)  
at System.Runtime.Serialization.XmlReaderDelegator.IsStartElement(XmlDictionaryString localname, XmlDictionaryString ns) 
at System.Runtime.Serialization.XmlObjectSerializer.IsRootElement(XmlReaderDelegator reader, DataContract contract, XmlDictionaryString name, XmlDictionaryString ns)
at System.Runtime.Serialization.DataContractSerializer.InternalIsStartObject(XmlReaderDelegator reader)  
at System.Runtime.Serialization.DataContractSerializer.InternalReadObject(XmlReaderDelegator xmlReader, Boolean verifyObjectName, DataContractResolver dataContractResolver) 
at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)--- End of inner exception stack trace ---at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)at System.Runtime.Serialization.DataContractSerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)  
at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(XmlDictionaryReader reader, Boolean verifyObjectName)  
at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlReader reader, Boolean verifyObjectName)at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName)  
at System.Runtime.Serialization.XmlObjectSerializer.InternalReadObject(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)at System.Runtime.Serialization.XmlObjectSerializer.ReadObjectHandleExceptions(XmlReaderDelegator reader, Boolean verifyObjectName, DataContractResolver dataContractResolver)at System.Runtime.Serialization.XmlObjectSerializer.ReadObject(XmlDictionaryReader reader)at Microsoft.ServiceBus.Messaging.DataContractBinarySerializer.ReadObject(Stream stream) 
at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T](XmlObjectSerializer serializer)  
at Microsoft.ServiceBus.Messaging.BrokeredMessage.GetBody[T]() 

事实证明,逻辑应用将消息作为流而不是字符串发送,这就是引发异常的原因。因为,如果我这样做,控制台应用程序能够读取消息:

SubscriptionClient subClient = SubscriptionClient.CreateFromConnectionString(connstr, topicName, subscriptionName);
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = true; 
options.MaxConcurrentCalls = 1;  
subClient.OnMessage((message) => 
    Stream stream;
    StreamReader reader;
    string messageJson;
    try
    
        stream = message.GetBody<Stream>();
        reader = new StreamReader(stream);
        messageJson = reader.ReadToEnd();
        Console.WriteLine(messageJson);
    
    catch (Exception ex)
    
        Console.WriteLine(ex.ToString());
    
, options);

所以,我的问题是,有没有办法让逻辑应用程序将消息作为字符串而不是流发送?还是逻辑应用的限制?

我已尝试将“application/json”、“System.String”和“text/plain”作为 Logic App 操作中消息的内容类型,但它不起作用。

【问题讨论】:

【参考方案1】:

逻辑应用服务总线连接器目前仅将数据作为字节流转储到服务总线。根据 Azure 逻辑应用的当前设计,无法将消息作为字符串发送

【讨论】:

【参考方案2】:

很遗憾,无法更改尸体的交付方式。为了解决这个问题,我们用一个帮助方法更新了我们的监听器,该方法试图找出主体的类型并相应地提取:

private static JObject GetBody(BrokeredMessage brokeredMessage)
        
            string objAsJson;
            object objIsStream;

            brokeredMessage.Properties.TryGetValue("isStream", out objIsStream);
            bool bIsStream = objIsStream != null ? (bool)objIsStream : brokeredMessage.DeliveryCount % 2 == 0; // Default delivery method is String; Retry String as Stream if it fails

            if (bIsStream)
            
                // Azure Functions and Logic Apps send messages as Stream
                // Service Bus Explorer defaults to Stream but can send as String
                Stream stream = brokeredMessage.GetBody<Stream>();
                StreamReader reader = new StreamReader(stream);
                objAsJson = reader.ReadToEnd();
            
            else
            
                // Our services send messages as String
                objAsJson = brokeredMessage.GetBody<string>();
            

            return JObject.Parse(objAsJson);
        

【讨论】:

不会objIsStream 总是为空? 当我的 Azure Functions 向服务总线发送消息时,它会添加 isStream 属性并将其设置为 true 以明确告诉侦听器类型为 Stream。

以上是关于使用 azure 逻辑应用程序将消息作为字符串发送到 azure 服务总线的主要内容,如果未能解决你的问题,请参考以下文章

使用 azure 逻辑应用将 blob 从 azure 存储帐户动态发送到电子邮件

如何解析Azure IOT中心逻辑应用程序中的消息

逻辑应用中的 Azure 服务总线队列错误

如何通过azure设备配置服务从azure功能向iot设备发送自定义错误消息?

Azure 逻辑应用接收图像 HTTP 触发器

如何使用 python 快速将消息发送到 Azure 队列存储?