使用 Nifi 将带有数组的 json 插入 BigQuery 的问题

Posted

技术标签:

【中文标题】使用 Nifi 将带有数组的 json 插入 BigQuery 的问题【英文标题】:Issue Inserting json with array into BigQuery using Nifi 【发布时间】:2019-11-01 01:17:46 【问题描述】:

我正在尝试使用 Nifi 处理器 PutBigQueryBatch 1.9.2 将以下 json 数据插入到 bigquery 表中

    
"Revenue_Label": "Dining Room",     
"StoreName": "STORE A",     
"Revenue_Id": "1",     
"Alteration_Flag": "False",     
"Order_Mode_Label": "DriveThru",     
"checkClosetime": "2019-10-24T13:43:19+13:00",     
"Alterations": [                    
"Alteration_Product_Code": "211136",             
"Alteration_Product_Net_Amount": 0.0,             
"Altered_Product_Code": "211135",             
"Alteration_Product_Amount": 0.0,             
"Altered_Product_Name": "Burger",             
"Alteration_Product_Name": "Add Sauce",             
"Alteration_Product_Qty": 1.0            ],     
"StoreId": "1234",    
 "dob": "20191024",     
 "Order_Mode_Id": "3",     
 "checknumber": "54321"

但是,我不断收到以下错误

PutBigQueryBatch:RECORD 字段必须至少有一个 子字段:java.lang.IllegalArgumentException:RECORD 字段必须 至少有一个子字段

在 PutBigQueryBatch 的属性中,我使用了以下与 BigQuery 表定义在顺序、模式、名称和类型方面匹配的架构定义

[
  "mode": "NULLABLE","name": "Revenue_Label","type": "STRING",
  "mode": "NULLABLE","name": "StoreName","type": "STRING",
  "mode": "NULLABLE","name": "Revenue_Id", "type": "STRING" ,
  "mode": "NULLABLE","name": "Alteration_Flag","type": "STRING",
  "mode": "NULLABLE","name": "Order_Mode_Label","type": "STRING",
  "mode": "NULLABLE","name": "checkClosetime","type": "TIMESTAMP" ,
  "mode": "REPEATED",    
   "name": "Alterations",    
   "type": "RECORD",    
     "fields": [      
                "mode": "NULLABLE","name": "Alteration_Product_Code", "type": "STRING" ,
                "mode": "NULLABLE", "name": "Alteration_Product_Net_Amount", "type": "FLOAT" ,
                "mode": "NULLABLE", "name": "Altered_Product_Code","type": "STRING" ,
                "mode": "NULLABLE", "name": "Alteration_Product_Amount",  "type": "FLOAT" ,
                "mode": "NULLABLE", "name": "Altered_Product_Name", "type": "STRING" ,
                "mode": "NULLABLE", "name": "Alteration_Product_Name",  "type": "STRING" ,
                "mode": "NULLABLE","name": "Alteration_Product_Qty",   "type": "FLOAT"       
                ]  
    ,    
  "mode": "NULLABLE","name": "StoreId","type": "STRING",
  "mode": "NULLABLE", "name": "dob","type": "STRING",
  "mode": "NULLABLE","name": "Order_Mode_Id","type": "STRING",
  "mode": "NULLABLE","name": "checknumber","type": "STRING" 
 ]

我尝试过的:

    从 json flowflie 中删除了所有的空格和回车符 尝试了 json 文件开头和结尾的数组,并将 BigQuery 表与此匹配。 将项目名称更改为不包含下划线。 一次构建插入一列,只有在数组存在时才会失败。 在使用实用程序“bq show --schema --format=prettyjson”时,直接使用了从 Google Cloud BigQuery 生成的架构定义。 将每个数组项的模式定义为 REQUIRED 而不是 NULLABLE。 从 Nifi 中 PutBigQueryBatch 进程的属性中的架构定义中删除了所有空格和回车符

所有都导致了与上述相同的错误

注意 在我的 nifi 进程的其他地方,我成功地使用了 PutBigQueryBatch 进程并更新了表。不同之处在于 json 不存在数组。

有人对如何解决这个问题有任何想法吗?

【问题讨论】:

至少Alterations 数组未关闭。 嗨 daggett - 谢谢你的回复。我对您的意思“更改”数组未关闭感到有些困惑。是架构定义还是需要关闭的json文件?你能提供一个应该是什么的例子吗?谢谢蒂姆 @TimManger 您没有找到解决此问题的方法吗?我遇到了类似的问题。我可以手动加载模式 json 以在 GCP 控制台中生成表,但如果我尝试通过 PutBigQueryBatch 执行此操作,它会在模式的同一 RECORD 部分失败。当手动创建表时,所有记录都可以毫无问题地写入其中。 嗨 Aklys - 我必须创建一个解决方法。我所做的是在 GCP 中创建一个主题和订阅,然后使用 nifi 中的 PublishGCPubSub 进程将 json 流文件传递给 PubSub。从那里我写了一个云函数,当 jason 文件到达队列主题并将 json 文件插入到 bigquery 的表中时触发。 【参考方案1】:

从documentation,您的原始定义看起来是正确的。不知何故,空格或制表符可能会出错,因此请尝试使用更多格式的空格。

漂亮:

 
    "Revenue_Label":"Dining Room",
    "StoreName":"STORE A",
    "Revenue_Id":"1",
    "Alteration_Flag":"False",
    "Order_Mode_Label":"DriveThru",
    "checkClosetime":"2019-10-24T13:43:19+13:00",
    "Alterations":[ 
         
            "Alteration_Product_Code":"211136",
            "Alteration_Product_Net_Amount":0.0,
            "Altered_Product_Code":"211135",
            "Alteration_Product_Amount":0.0,
            "Altered_Product_Name":"Burger",
            "Alteration_Product_Name":"Add Sauce",
            "Alteration_Product_Qty":1.0
        
    ],
    "StoreId":"1234",
    "dob":"20191024",
    "Order_Mode_Id":"3",
    "checknumber":"54321"

紧凑:

"Revenue_Label":"Dining Room","StoreName":"STORE A","Revenue_Id":"1","Alteration_Flag":"False","Order_Mode_Label":"DriveThru","checkClosetime":"2019-10-24T13:43:19+13:00","Alterations":["Alteration_Product_Code":"211136","Alteration_Product_Net_Amount":0.0,"Altered_Product_Code":"211135","Alteration_Product_Amount":0.0,"Altered_Product_Name":"Burger","Alteration_Product_Name":"Add Sauce","Alteration_Product_Qty":1.0],"StoreId":"1234","dob":"20191024","Order_Mode_Id":"3","checknumber":"54321"

【讨论】:

Lamus -谢谢您的回复。对于 Alterations 数组排除“字段”的模式定义是什么,还是我必须更改 json 文件包含它? 嗨,Lamaus - 您是否错过了“试试这个”之后的链接? 不,我修改了答案。 嗨拉莫斯 - 谢谢你的建议。可悲的是它没有用。我在第 8 行收到错误“未确定的数组。这对应于在架构中定义数组的行 "mode": "REPEATED", "name": "Altertions", "type": "RECORD", .. . 我会继续寻找解决方案。 啊,我错过了模式REPEATED,这意味着字段应该是数组。【参考方案2】:

根据您的帖子,我可以看到您在架构定义中将 REPEATED 字段定义为 Altertions;但是,在您用于上传数据的 JSON 中,此字段显示为 Alterations

【讨论】:

嗨 ebeltran - 感谢您发现这一点。我更正了拼写以匹配 Alterations。但是,可悲的是,我仍然遇到同样的错误。我开始认为 PutBigQueryBatch 1.9.2 nifi 处理器中可能存在错误。

以上是关于使用 Nifi 将带有数组的 json 插入 BigQuery 的问题的主要内容,如果未能解决你的问题,请参考以下文章

尼菲 |使用 Nifi 表达式进行 Json 解析

Apache Nifi:使用 UpdateRecord 处理器解析数据

Apache Kafka/NiFi 可以将数据转换为 JSON 文件吗?

Nifi:NIFI 中的 Json 到 CSV 转换器

nifi从json文件中获取属性

如何使用nifi在mysql中获取最后插入的记录