如何将 Json 转换为 CSV 并将其发送到大查询或谷歌云存储桶
Posted
技术标签:
【中文标题】如何将 Json 转换为 CSV 并将其发送到大查询或谷歌云存储桶【英文标题】:How to convert Json to CSV and send it to big query or google cloud bucket 【发布时间】:2021-12-29 08:00:49 【问题描述】:我是 nifi 的新手,我想将大量 json 数据转换为 csv 格式。 这是我目前正在做的,但这不是预期的结果。
这些是步骤:
使用 InvokeHTTP 创建 access_token 和发送请求正文的进程(这部分工作正常,因为这是预期结果,我不会命名进程)并在 json 中获取响应正文。
json 响应示例:
[
"results":[
"customer":
"resourceName":"customers/123456789",
"id":"11111111"
,
"campaign":
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdaasdasdad",
"id":"456456546546"
,
"adGroup":
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdasda"
,
"metrics":
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
,
"segments":
"device":"DESKTOP",
"date":"2021-11-22"
,
"incomeRangeView":
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
,
"customer":
"resourceName":"customers/123456789",
"id":"11111111"
,
"campaign":
"resourceName":"customers/123456789/campaigns/222456422222",
"name":"asdasdasdasd",
"id":"456456546546"
,
"adGroup":
"resourceName":"customers/456456456456/adGroups/456456456456",
"id":"456456456546",
"name":"asdasdasdas"
,
"metrics":
"clicks":"11",
"costMicros":"43068982",
"impressions":"2079"
,
"segments":
"device":"DESKTOP",
"date":"2021-11-22"
,
"incomeRangeView":
"resourceName":"customers/456456456/incomeRangeViews/456456546~456456456"
,
....etc....
]
]
现在我正在使用: ===>SplitJson ($[].results[])==>JoltTransformJSON 与此规范:
[
"operation": "shift",
"spec":
"customer":
"id": "customer_id"
,
"campaign":
"id": "campaign_id",
"name": "campaign_name"
,
"adGroup":
"id": "ad_group_id",
"name": "ad_group_name"
,
"metrics":
"clicks": "clicks",
"costMicros": "cost",
"impressions": "impressions"
,
"segments":
"device": "device",
"date": "date"
,
"incomeRangeView":
"resourceName": "keywords_id"
]
==>> MergeContent(这是我不知道如何解决的问题) 合并策略:碎片整理 合并格式:二进制连接 属性策略只保留通用属性 最大箱数 5(我尝试了 10 个相同的结果) 分隔符策略:文本 标题:[ 页脚:] 分界符:,
我得到什么结果? 我得到一个包含部分 json 数据的 json 文件 示例:我在 1 个 json 文件中有 50k customer_ids,所以我想将此数据发送到大查询表中,并将所有 ids 放在同一字段“customer_id”下。
MergeContent 使用拆分的 json 文件并将它们合并,但我仍然会为每个文件获得 10k customer_ids,即我有 5 个文件而不是 1 个具有 50k customer_ids 的文件
在 MergeContent 之后,我使用 ==>>ConvertRecord 和以下设置: 记录阅读器 JsonTreeReader(架构访问策略:InferSchema) 记录编写器 CsvRecordWriter ( Schema 写策略:不要写 Schema 架构访问策略:继承记录架构 CSV 格式:Microsoft Excel 包括标题行:true 字符集 UTF-8 )
==>>UpdateAttribute (custom prop: filename: $filename.csv) ==>> PutGCSObject(并将数据放入google存储桶(此步骤工作正常-我可以将文件放在那里))
使用这种方法,我无法将数据发送到大查询(在 MergeContent 之后,我尝试使用 PutBigQueryBatch 并在 bq sheel 中使用此命令来获取我需要的架构:
bq show --format=prettyjson some_data_set.some_table_in_that_data_set | jq '.schema.fields'
我根据需要填写了所有字段并加载文件类型:如果我将其转换为 CSV,我尝试了 NEWLINE_DELIMITED_JSON 或 CSV(我没有收到错误,但没有数据上传到表中) )
我做错了什么?我基本上想以这样的方式映射数据,使每个字段数据都在相同的字段名称下
【问题讨论】:
【参考方案1】:您缺少的技巧是使用记录。
不要使用 X>SplitJson>JoltTransformJson>Merge>Convert>X,而是尝试使用 JSON Reader 和 CSV Writer 的 X>JoltTransformRecord>X。这样可以避免很多低效率。
如果您确实需要拆分(除非完全必要,否则应避免拆分和合并),您可以改用 MergeRecord - 再次使用 JSON 读取器和 CSV 写入器。这将使您的流程 X>Split>Jolt>MergeRecord>X.
【讨论】:
我在 splitJson $[].results[] 中使用,然后是其余的。如果我得到响应然后 JoltTransformRecord 那么规范应该是它自己正确的响应吗?这个想法听起来很有用!我正在尝试 - 谢谢! 我收到一个错误:无法转换标准FlowFileRecord[uuid=...]。 [resource claim=standartresourceclaim[id=...] 由于转换第一条记录时出错,我设法使其仅在拆分后才起作用。当我得到数据时,我得到了上面示例 json 响应中所述的数据。我知道颠簸规范应该是别的东西,但究竟是什么? 这是规范返回的内容: "customer_id": ["234234", "34234"], "campaign_id": ["234234", "2"],34234 "campaign_name": ["'aaaa", "'aaaaa"], "ad_group_id": ["12312", "12312"], "ad_group_name": ["aaa", "aaa"], "点击次数": ["11", " 11”]、“成本”:[“243423”、“234234”]、“展示次数”:[“2079”、“2079”]、“设备”:[“桌面”、“桌面”]、“日期”: ["2021-11-22", "2021-11-22"] 因此您可以从 JoltTransformJson 开始将results
数组的内容移回顶层 - 因此您只需 [ "results" : [ ..., .... ..., ... ] ]
而不是 [ "results" : [ ..., .... ..., ... ] ]
。这会产生 1 个 FlowFile,其中所有结果都是平面***数组中的项目。现在您可以使用 JoltTransformRecord,它将单独评估数组中的每个项目。尝试使用jolt-demo.appspot.com 处理您的 JOLT 规范以获得您需要的结果。
感谢您的建议 - 这就是我实际所做的,现在有 [...,.........]。我可以使用 convertRecord 作为下一步将其转换为 csv。现在唯一的问题是第一行有标题,当我将数据推送到 bq 时,它会将它们作为标题读取,作为第一行和 bq 中的标题如何自动生成为随机值以上是关于如何将 Json 转换为 CSV 并将其发送到大查询或谷歌云存储桶的主要内容,如果未能解决你的问题,请参考以下文章
如何将 HTML 表转换为 JSON 并将 JSON 与 JSON.NET 一起使用?
将 csv 上传到内存中并将 json 发送到 api 的 Django 页面