如何将 Json 转换为 CSV 并将其发送到大查询或谷歌云存储桶

Posted

技术标签:

【中文标题】如何将 Json 转换为 CSV 并将其发送到大查询或谷歌云存储桶【英文标题】:How to convert Json to CSV and send it to big query or google cloud bucket 【发布时间】:2021-12-29 08:00:49 【问题描述】:

我是 nifi 的新手,我想将大量 json 数据转换为 csv 格式。 这是我目前正在做的,但这不是预期的结果。

这些是步骤:

使用 InvokeHTTP 创建 access_token 和发送请求正文的进程(这部分工作正常,因为这是预期结果,我不会命名进程)并在 json 中获取响应正文。

json 响应示例:

    [
   
      "results":[
         
            "customer":
               "resourceName":"customers/123456789",
               "id":"11111111"
            ,
            "campaign":
               "resourceName":"customers/123456789/campaigns/222456422222",
               "name":"asdaasdasdad",
               "id":"456456546546"
            ,
            "adGroup":
               "resourceName":"customers/456456456456/adGroups/456456456456",
               "id":"456456456546",
               "name":"asdasdasdasda"
            ,
            "metrics":
               "clicks":"11",
               "costMicros":"43068982",
               "impressions":"2079"
            ,
            "segments":
               "device":"DESKTOP",
               "date":"2021-11-22"
            ,
            "incomeRangeView":
               "resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
            
         ,
            
            "customer":
               "resourceName":"customers/123456789",
               "id":"11111111"
            ,
            "campaign":
               "resourceName":"customers/123456789/campaigns/222456422222",
               "name":"asdasdasdasd",
               "id":"456456546546"
            ,
            "adGroup":
               "resourceName":"customers/456456456456/adGroups/456456456456",
               "id":"456456456546",
               "name":"asdasdasdas"
            ,
            "metrics":
               "clicks":"11",
               "costMicros":"43068982",
               "impressions":"2079"
            ,
            "segments":
               "device":"DESKTOP",
               "date":"2021-11-22"
            ,
            "incomeRangeView":
               "resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
            
         ,
....etc....
      ]
   
]

现在我正在使用: ===>SplitJson ($[].results[])==>JoltTransformJSON 与此规范:

[
    "operation": "shift",
    "spec": 

        "customer": 
            "id": "customer_id"
        ,
        "campaign": 
            "id": "campaign_id",
            "name": "campaign_name"
        ,
        "adGroup": 
            "id": "ad_group_id",
            "name": "ad_group_name"

        ,
        "metrics": 
            "clicks": "clicks",
            "costMicros": "cost",
            "impressions": "impressions"
        ,
        "segments": 
            "device": "device",
            "date": "date"
        ,
        "incomeRangeView": 
            "resourceName": "keywords_id"
        
    
]

==>> MergeContent(这是我不知道如何解决的问题) 合并策略:碎片整理 合并格式:二进制连接 属性策略只保留通用属性 最大箱数 5(我尝试了 10 个相同的结果) 分隔符策略:文本 标题:[ 页脚:] 分界符:,

我得到什么结果? 我得到一个包含部分 json 数据的 json 文件 示例:我在 1 个 json 文件中有 50k customer_ids,所以我想将此数据发送到大查询表中,并将所有 ids 放在同一字段“customer_id”下。

MergeContent 使用拆分的 json 文件并将它们合并,但我仍然会为每个文件获得 10k customer_ids,即我有 5 个文件而不是 1 个具有 50k customer_ids 的文件

在 MergeContent 之后,我使用 ==>>ConvertRecord 和以下设置: 记录阅读器 JsonTreeReader(架构访问策略:InferSchema) 记录编写器 CsvRecordWriter ( Schema 写策略:不要写 Schema 架构访问策略:继承记录架构 CSV 格式:Microsoft Excel 包括标题行:true 字符集 UTF-8 )

==>>UpdateAttribute (custom prop: filename: $filename.csv) ==>> PutGCSObject(并将数据放入google存储桶(此步骤工作正常-我可以将文件放在那里))

使用这种方法,我无法将数据发送到大查询(在 MergeContent 之后,我尝试使用 PutBigQueryBatch 并在 bq sheel 中使用此命令来获取我需要的架构:

bq show --format=prettyjson some_data_set.some_table_in_that_data_set | jq '.schema.fields' 

我根据需要填写了所有字段并加载文件类型:如果我将其转换为 CSV,我尝试了 NEWLINE_DELIMITED_JSON 或 CSV(我没有收到错误,但没有数据上传到表中) )

我做错了什么?我基本上想以这样的方式映射数据,使每个字段数据都在相同的字段名称下

【问题讨论】:

【参考方案1】:

您缺少的技巧是使用记录。

不要使用 X>SplitJson>JoltTransformJson>Merge>Convert>X,而是尝试使用 JSON Reader 和 CSV Writer 的 X>JoltTransformRecord>X。这样可以避免很多低效率。

如果您确实需要拆分(除非完全必要,否则应避免拆分和合并),您可以改用 MergeRecord - 再次使用 JSON 读取器和 CSV 写入器。这将使您的流程 X>Split>Jolt>MergeRecord>X.

【讨论】:

我在 splitJson $[].results[] 中使用,然后是其余的。如果我得到响应然后 JoltTransformRecord 那么规范应该是它自己正确的响应吗?这个想法听起来很有用!我正在尝试 - 谢谢! 我收到一个错误:无法转换标准FlowFileRecord[uuid=...]。 [resource claim=standartresourceclaim[id=...] 由于转换第一条记录时出错,我设法使其仅在拆分后才起作用。当我得到数据时,我得到了上面示例 json 响应中所述的数据。我知道颠簸规范应该是别的东西,但究竟是什么? 这是规范返回的内容: "customer_id": ["234234", "34234"], "campaign_id": ["234234", "2"],34234 "campaign_name": ["'aaaa", "'aaaaa"], "ad_group_id": ["12312", "12312"], "ad_group_name": ["aaa", "aaa"], "点击次数": ["11", " 11”]、“成本”:[“243423”、“234234”]、“展示次数”:[“2079”、“2079”]、“设备”:[“桌面”、“桌面”]、“日期”: ["2021-11-22", "2021-11-22"] 因此您可以从 JoltTransformJson 开始将 results 数组的内容移回顶层 - 因此您只需 [ "results" : [ ..., .... ..., ... ] ] 而不是 [ "results" : [ ..., .... ..., ... ] ]。这会产生 1 个 FlowFile,其中所有结果都是平面***数组中的项目。现在您可以使用 JoltTransformRecord,它将单独评估数组中的每个项目。尝试使用jolt-demo.appspot.com 处理您的 JOLT 规范以获得您需要的结果。 感谢您的建议 - 这就是我实际所做的,现在有 [...,.........]。我可以使用 convertRecord 作为下一步将其转换为 csv。现在唯一的问题是第一行有标题,当我将数据推送到 bq 时,它会将它们作为标题读取,作为第一行和 bq 中的标题如何自动生成为随机值

以上是关于如何将 Json 转换为 CSV 并将其发送到大查询或谷歌云存储桶的主要内容,如果未能解决你的问题,请参考以下文章

如何将 JSON 字符串转换为数组

如何将 HTML 表转换为 JSON 并将 JSON 与 JSON.NET 一起使用?

如何将我上传的文件转换为 json

将 csv 上传到内存中并将 json 发送到 api 的 Django 页面

如何将 Excel 转换为 JSON 并将其附加到现有的 JSON 文件?

挂钩 OData 的 $metadata 响应并将其从 XML 转换为 JSON