使用 spark 更改 bigquery 中嵌套数据的列名
Posted
技术标签:
【中文标题】使用 spark 更改 bigquery 中嵌套数据的列名【英文标题】:Change column names of nested data in bigquery using spark 【发布时间】:2020-02-14 04:00:15 【问题描述】:我正在尝试使用 Spark Scala 将一些数据写入 BigQuery,我的 spark df 看起来像,
root
|-- id: string (nullable = true)
|-- cost: double (nullable = false)
|-- nodes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- settled: string (nullable = true)
| | |-- constant: string (nullable = true)
|-- status: string (nullable = true)
我试图改变数据框的结构。
val schema = StructType(Array(
StructField("id", StringType, true),
StructField("cost", DoubleType, true),
StructField("nodes", StructType(Array(StructField("settled", StringType), StructField("constant", StringType)))),
StructField("status", StringType, true)))
val actualDf = spark.createDataFrame(results, schema)
但它没有用。当它写入 BigQuery 时,列名如下所示,
id、成本、nodes.list.element.settled、nodes.list.element.constant、状态
有没有办法将这些列名更改为,
id、成本、结算、常数、状态
【问题讨论】:
【参考方案1】:您可以explode
节点数组来获取列的扁平结构,然后将数据帧写入bigquery。
例子:
val jsn_ds=Seq(""""id":1, "cost": "2.0","nodes":["settled":"u","constant":"p"],"status":"s"""").toDS
spark.read.json(jsn_ds).printSchema
// root
// |-- cost: string (nullable = true)
// |-- id: long (nullable = true)
// |-- nodes: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- constant: string (nullable = true)
// | | |-- settled: string (nullable = true)
// |-- status: string (nullable = true)
spark.read.json(jsn_ds).
withColumn("expld",explode('nodes)).
select("*","expld.*").
drop("expld","nodes").
show()
//+----+---+------+--------+-------+
//|cost| id|status|constant|settled|
//+----+---+------+--------+-------+
//| 2.0| 1| s| p| u|
//+----+---+------+--------+-------+
【讨论】:
感谢您的回答。我收到以下错误。Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) nodes#106 missing from id#178,cost#179,nodes#180,status#181 in operator !Generate explode(nodes#106), true, false, [expld#188];; Project [id#178, cost#179, nodes#180, status#181, expld#188] +- !Generate explode(nodes#106), true, false, [expld#188] +- LogicalRDD [id#178, cost#179, nodes#180, status#181]
代码,spark.read.json(results.toJSON).withColumn("expld", explode(results.col("nodes"))).select("*", "expld.*").drop("expld", "nodes").show()
试试这个spark.read.json(results.toJSON).withColumn("expld", explode(col("nodes"))).select("*", "expld.*").drop("expld", "nodes").show()
以上是关于使用 spark 更改 bigquery 中嵌套数据的列名的主要内容,如果未能解决你的问题,请参考以下文章
使用 scala 从 spark 中删除 bigquery 表