使用 spark 更改 bigquery 中嵌套数据的列名

Posted

技术标签:

【中文标题】使用 spark 更改 bigquery 中嵌套数据的列名【英文标题】:Change column names of nested data in bigquery using spark 【发布时间】:2020-02-14 04:00:15 【问题描述】:

我正在尝试使用 Spark Scala 将一些数据写入 BigQuery,我的 spark df 看起来像,

root
 |-- id: string (nullable = true)
 |-- cost: double (nullable = false)
 |-- nodes: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- settled: string (nullable = true)
 |    |    |-- constant: string (nullable = true)
 |-- status: string (nullable = true)

我试图改变数据框的结构。

val schema = StructType(Array(
  StructField("id", StringType, true),
  StructField("cost", DoubleType, true),
  StructField("nodes", StructType(Array(StructField("settled", StringType), StructField("constant", StringType)))),
  StructField("status", StringType, true)))

val actualDf = spark.createDataFrame(results, schema)

但它没有用。当它写入 BigQuery 时,列名如下所示,

id、成本、nodes.list.element.settled、nodes.list.element.constant、状态

有没有办法将这些列名更改为,

id、成本、结算、常数、状态

【问题讨论】:

【参考方案1】:

您可以explode 节点数组来获取列的扁平结构,然后将数据帧写入bigquery。

例子:

val jsn_ds=Seq(""""id":1, "cost": "2.0","nodes":["settled":"u","constant":"p"],"status":"s"""").toDS

spark.read.json(jsn_ds).printSchema
// root
// |-- cost: string (nullable = true)
// |-- id: long (nullable = true)
// |-- nodes: array (nullable = true)
// |    |-- element: struct (containsNull = true)
// |    |    |-- constant: string (nullable = true)
// |    |    |-- settled: string (nullable = true)
// |-- status: string (nullable = true)

spark.read.json(jsn_ds).
      withColumn("expld",explode('nodes)).
      select("*","expld.*").
      drop("expld","nodes").
      show()

//+----+---+------+--------+-------+
//|cost| id|status|constant|settled|
//+----+---+------+--------+-------+
//| 2.0|  1|     s|       p|      u|
//+----+---+------+--------+-------+

【讨论】:

感谢您的回答。我收到以下错误。 Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) nodes#106 missing from id#178,cost#179,nodes#180,status#181 in operator !Generate explode(nodes#106), true, false, [expld#188];; Project [id#178, cost#179, nodes#180, status#181, expld#188] +- !Generate explode(nodes#106), true, false, [expld#188] +- LogicalRDD [id#178, cost#179, nodes#180, status#181]代码,spark.read.json(results.toJSON).withColumn("expld", explode(results.col("nodes"))).select("*", "expld.*").drop("expld", "nodes").show() 试试这个spark.read.json(results.toJSON).withColumn("expld", explode(col("nodes"))).select("*", "expld.*").drop("expld", "nodes").show()

以上是关于使用 spark 更改 bigquery 中嵌套数据的列名的主要内容,如果未能解决你的问题,请参考以下文章

在 BigQuery 中取消嵌套多个嵌套字段

在 BigQuery 中取消嵌套结构

使用 scala 从 spark 中删除 bigquery 表

在 Power BI 中使用 BigQuery 重复/嵌套字段

在 Bigquery 中使用 where 条件更新嵌套记录

使用 BigQuery 取消嵌套 customDimensions