流分析 - 如何处理参考输入中的 json

Posted

技术标签:

【中文标题】流分析 - 如何处理参考输入中的 json【英文标题】:Stream analytics - How to handle json in reference input 【发布时间】:2020-02-13 11:37:23 【问题描述】:

我有一个 Azure 流分析 (ASA) 作业,它处理来自事件中心的设备遥测数据。流应该与来自 sql 表的参考数据连接,以使用额外的设备元数据增强每条消息。合并的条目应存储在 CosmosDb 中。

为设备元数据提供服务的 sql 数据库:

CREATE TABLE [dbo].[MyTable]
(
  [DeviceId] NVARCHAR(20) NOT NULL PRIMARY KEY, 
  [MetaData] NVARCHAR(MAX) NULL   /* this stores json, which can vary per record */
)

在 ASA 中,我通过一个简单的查询配置了参考数据输入:

SELECT DeviceId, JSON_QUERY(MetaData) FROM [dbo].[MyTable]

我有执行连接的主要 ASA 查询:

WITH temptable AS (
SELECT * FROM [telemetry-input] TD PARTITION BY PartitionId
LEFT OUTER JOIN [metadata-input] MD
ON TD.DeviceId = MD.DeviceId
)

SELECT TD.*, MD.MetaData 
INTO [cosmos-db-output] 
FROM temptable PARTITION BY PartitionId

一切正常,合并的数据存储在 CosmosDb 中。但是,来自 sql 的 Metadata 列的值被视为字符串,并使用引号和转义字符存储在 comos 中。示例:

 "DeviceId" : "abc1234", … , "MetaData" : " \"TestKey\": \"test value\" " ;

有没有办法将元数据中的 json 处理和存储为适当的 Json 对象,即

 "DeviceId" : "abc1234", … , "MetaData" :  "TestKey": "test value"  ;

【问题讨论】:

【参考方案1】:

我找到了在 ASA 中实现它的方法 - 你需要创建 javascript user function:

function parseJson(strjson)
          return JSON.parse(strjson);

并在您的查询中调用它:

...
SELECT TD.*, udf.parseJson(MD.MetaData)
...

【讨论】:

很好。谢谢你的分享。我忘了那个功能。您可以标记自己以供他人参考。【参考方案2】:

正如您在问题中提到的,参考 json 数据被视为 json 字符串,而不是 json 对象。根据我对 ASA 中 Query Syntax 的研究,没有内置函数可以转换它。

但是,我建议您使用Azure Function Cosmos DB Trigger 来处理创建的每个文档。请参考我的功能代码:

using System;
using System.Collections.Generic;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Newtonsoft.Json.Linq;

namespace ProcessJson

    public class Class1
    
        [FunctionName("DocumentUpdates")]
        public static void Run(
        [CosmosDBTrigger(databaseName:"db",collectionName: "item", ConnectionStringSetting = "CosmosDBConnection",LeaseCollectionName = "leases",
            CreateLeaseCollectionIfNotExists = true)]
        IReadOnlyList<Document> documents,
        TraceWriter log)
        
            log.Verbose("Start.........");
            String endpointUrl = "https://***.documents.azure.com:443/";
            String authorizationKey = "***";
            String databaseId = "db";
            String collectionId = "import";

            DocumentClient client = new DocumentClient(new Uri(endpointUrl), authorizationKey);

            for (int i = 0; i < documents.Count; i++)
            
                Document doc = documents[i];
                if((doc.alreadyFormat == Undefined.Value) ||(!doc.alreadyFormat))
                   String MetaData = doc.GetPropertyValue<String>("MetaData");
                   JObject o = JObject.Parse(MetaData);

                   doc.SetPropertyValue("MetaData", o);
                   doc.SetPropertyValue("alreadyFormat", true);
                   client.ReplaceDocumentAsync(UriFactory.CreateDocumentUri(databaseId, collectionId, doc.Id), doc); 

                   log.Verbose("Update document Id " + doc.Id);

                

            
        
    

另外请参考案例:Azure Cosmos DB SQL - how to unescape inner json property

【讨论】:

以上是关于流分析 - 如何处理参考输入中的 json的主要内容,如果未能解决你的问题,请参考以下文章

如何处理 GraphQL 中的嵌套输入

如何处理错误的数据类型输入

string 类型的输入操作符 cin 和 getline 函数分别如何处理空白字符?

如何处理参数化查询中的空用户输入?

什么是JSON+如何处理JSON字符串

您如何处理 Apache Pig 中的空输入文件或丢失的输入文件?