加载到 BigQuery 时,Spark 写入 Parquet array<string> 会转换为不同的数据类型

Posted

技术标签:

【中文标题】加载到 BigQuery 时,Spark 写入 Parquet array<string> 会转换为不同的数据类型【英文标题】:Spark writing Parquet array<string> converts to a different datatype when loading into BigQuery 【发布时间】:2018-12-07 18:10:42 【问题描述】:

Spark 数据框架构:

    StructType(
        [StructField("a", StringType(), False),
        StructField("b", StringType(), True),
        StructField("c" , BinaryType(), False),
        StructField("d", ArrayType(StringType(), False), True),
        StructField("e", TimestampType(), True)
        ])

当我将数据框写入 parquet 并将其加载到 BigQuery 中时,它会以不同的方式解释架构。这是一个简单的 JSON 加载,并使用 spark 数据帧写入 parquet。

BigQuery 架构:

            [
    
        "type": "STRING",
        "name": "a",
        "mode": "REQUIRED"
    ,
    
        "type": "STRING",
        "name": "b",
        "mode": "NULLABLE"
    ,
    
        "type": "BYTES",
        "name": "c",
        "mode": "REQUIRED"
    ,
    
        "fields": [
        
            "fields": [
            
                "type": "STRING",
                "name": "element",
                "mode": "NULLABLE"
            
            ],
            "type": "RECORD",
            "name": "list",
            "mode": "REPEATED"
        
        ],
        "type": "RECORD",
        "name": "d",
        "mode": "NULLABLE"
    ,
    
        "type": "TIMESTAMP",
        "name": "e",
        "mode": "NULLABLE"
    
    ]

这与 spark 的写入方式或 BigQuery 读取 parquet 的方式有关吗?知道如何解决这个问题吗?

【问题讨论】:

应该 ArrayType(StringType().... 是 ArrayType(StringType,... 这是一个 pyspark 的东西。在 spark 中它不是一个函数,但在 pyspark 中它是一个函数。如果我错了,请纠正我! 【参考方案1】:

这是由于spark-bigquery connector 使用的中间文件格式(默认为镶木地板)。

连接器首先将数据写入 parquet 文件,然后使用 BigQuery Insert API 将它们加载到 BigQuery。

如果您使用 parquet-tools 检查中间 parquet 架构,您会发现类似于字段 d (ArrayType(StringType) in Spark)

 optional group a (LIST) 
    repeated group list 
      optional binary element (STRING);
    
  

现在,如果您自己在 BigQuery 中使用 bq load 或 BigQuery Insert API 加载此拼花,则可以通过启用 parquet_enable_list_inference 来告诉 BQ 忽略中间字段

很遗憾,在使用 spark-bigquery 连接器时,我看不到如何启用此选项!

作为一种解决方法,您可以尝试使用orc 作为中间格式。

       df
        .write
        .format("bigquery")
        .option("intermediateFormat", "orc")

【讨论】:

以上是关于加载到 BigQuery 时,Spark 写入 Parquet array<string> 会转换为不同的数据类型的主要内容,如果未能解决你的问题,请参考以下文章

使用 spark 将 parquet 数据从 Google 云存储加载到 BigQuery

BigQuery:写入查询结果时使用 bigquery 作业的意外行为

有没有更好的方法通过 PySpark 集群(dataporc)将 spark df 加载到 BigQuery 中?

Spark BigQuery 连接器:写入 ARRAY 类型会导致异常:“”无效值:ARRAY 不是有效值“”

将流转换为小批量以加载到 bigquery

使用 scala 从 spark 中删除 bigquery 表