Spark:如何将多行转换为具有多列的单行?
Posted
技术标签:
【中文标题】Spark:如何将多行转换为具有多列的单行?【英文标题】:Spark: How to convert multiple rows into single row with multiple columns? 【发布时间】:2018-09-20 10:13:33 【问题描述】:注意:这只是一个简单的示例数据。与实际的板球队相比,这没有意义。
我有一个如下所示的 JSON 文件:
"someID": "a5cf4922f4e3f45",
"payload":
"teamID": "1",
"players": [
"type": "Batsman",
"name": "Amar",
"address":
"state": "Gujarat"
,
"type": "Bowler",
"name": "Akbar",
"address":
"state": "Telangana"
,
"type": "Fielder",
"name": "Antony",
"address":
"state": "Kerala"
]
我用下面的代码对此进行了分解:
df_record = spark.read.json("path-to-file.json",multiLine=True)
df_player_dtls = df_record.select("payload.teamID", explode("payload.players").alias("xplayers")) \
.select("teamID", \
"xplayers.type", \
"xplayers.name", \
"xplayers.address.state")
df_player_dtls.createOrReplaceTempView("t_player_dtls")
spark.sql("SELECT * FROM t_player_dtls").show()
所以目前的输出看起来像:
+--------+---------+--------+------------+
| TeamID | Type | Name | State |
+--------+---------+--------+------------+
| 1 | Batsman | Amar | Gujarat |
| 1 | Bowler | Akbar | Telangana |
| 1 | Fielder | Antony | Kerala |
| 2 | Batsman | John | Queensland |
| 2 | Bowler | Smith | Perth |
+--------+---------+--------+------------+
我想把它转换成下面显示的格式:
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| TeamID | Batsman.Name | Batsman.State | Bowler.Name | Bowler.State | Fielder.Name | Fielder.State |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| 1 | Amar | Gujarat | Akbar | Telangana | Antony | Kerala |
| 2 | John | Queensland | Smith | Perth | null | null |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
一支球队中只有每种类型的球员,并且每支球队中最多可以有四种类型的球员(击球手、投球手、外野手和守门员)。所以每队最多有四名球员。因此,将保存此数据的决赛桌有九列(一列是球队 ID,四名球员的姓名和州)。
是否可以在 Spark 中实现这一点?我是 Spark 的新手,非常感谢解释这些步骤的答案。
【问题讨论】:
【参考方案1】:我们可以使用pyspark的pivot函数
from pyspark.sql.functions import first
df = df_player_dtls.groupBy("TeamID").pivot("Type").agg(
first('Name').alias('Name'),
first("State").alias("State"))
df.show(10,False)
【讨论】:
【参考方案2】:使用 SQL 是可能的,这不是最有效的方法(UDF 可能),但它可以工作。抱歉,它是 Scala 风格的。
val res = spark.sql(
"""select teamID
|, Batsman.name as `Batsman.name`, Batsman.state as `Batsman.state`
|, Bowler.name as `Bowler.name`, Bowler.state as `Bowler.state`
|, Fielder.name as `Fielder.name`, Fielder.state as `Fielder.state`
|from (
| select teamID,
| max(case type when 'Batsman' then info end) as Batsman
| , max(case type when 'Bowler' then info end) as Bowler
| , max(case type when 'Fielder' then info end) as Fielder
| from (select teamID, type, struct(name, state) as info from t_player_dtls) group by teamID
|)""".stripMargin)
我使用 group by 围绕 teamID 列旋转数据,max 将选择一个不为空的值,case 语句将只允许一条记录进入 最大。为了简化最大案例组合,我使用了 struct 函数,该函数创建了一个复合列 info,该列由我们稍后想要提升为平面模式的有效负载组成。
UDF 会更高效,但我对 python 不熟悉。
更新 两种解决方案(SQL 和 pivot)都使用 explode 和 groupBy 组合,@Anshuman 更容易编码,具有以下执行计划:
SQL
== Physical Plan ==
SortAggregate(key=[teamID#10], functions=[max(CASE WHEN (type#16 = Batsman) THEN info#31 END), max(CASE WHEN (type#16 = Bowler) THEN info#31 END), max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(teamID#10, 200)
+- SortAggregate(key=[teamID#10], functions=[partial_max(CASE WHEN (type#16 = Batsman) THEN info#31 END), partial_max(CASE WHEN (type#16 = Bowler) THEN info#31 END), partial_max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
+- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, named_struct(name, xplayers#12.name, state, xplayers#12.address.state) AS info#31]
+- Generate explode(payload#4.players), true, false, [xplayers#12]
+- *Project [payload#4]
+- Scan ExistingRDD[payload#4,someID#5]
枢轴
== Physical Plan ==
SortAggregate(key=[TeamID#10], functions=[first(if ((Type#16 <=> Batsman)) Name#17 else null, true), first(if ((Type#16 <=> Batsman)) State#18 else null, true), first(if ((Type#16 <=> Bowler)) Name#17 else null, true), first(if ((Type#16 <=> Bowler)) State#18 else null, true), first(if ((Type#16 <=> Fielder)) Name#17 else null, true), first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(TeamID#10, 200)
+- SortAggregate(key=[TeamID#10], functions=[partial_first(if ((Type#16 <=> Batsman)) Name#17 else null, true), partial_first(if ((Type#16 <=> Batsman)) State#18 else null, true), partial_first(if ((Type#16 <=> Bowler)) Name#17 else null, true), partial_first(if ((Type#16 <=> Bowler)) State#18 else null, true), partial_first(if ((Type#16 <=> Fielder)) Name#17 else null, true), partial_first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
+- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, xplayers#12.name AS name#17, xplayers#12.address.state AS state#18]
+- Generate explode(payload#4.players), true, false, [xplayers#12]
+- *Project [payload#4]
+- Scan ExistingRDD[payload#4,someID#5]
两者都会导致 shuffle (Exchange hashpartitioning(TeamID#10, 200)*)。
如果性能是您的目标,那么您可以使用这种 Scala 方法(我不懂 Python)
import org.apache.spark.sql.functions._
val df_record = spark.read.json(Seq(row_1, row_2).toDS)
//Define your custom player types, as many as needed
val playerTypes = Seq("Batsman", "Bowler", "Fielder")
//Return type for the UDF
val returnType = StructType(playerTypes.flatMap(t => Seq(StructField(s"$t.Name", StringType), StructField(s"$t.State", StringType))))
val unpackPlayersUDF = udf( (players: Seq[Row]) =>
val playerValues: Map[String, Row] = players.map(p => (p.getAs[String]("type"), p)).toMap
val arrangedValues = playerTypes.flatMap t =>
val playerRow = playerValues.get(t) //if type does not exist, than value will be None, which is null
Seq(
playerRow.map(_.getAs[String]("name"))
, playerRow.map(_.getAs[Row]("address").getAs[String]("state"))
)
Row(arrangedValues: _*)
, returnType)
val udfRes = df_record
.withColumn("xplayers", unpackPlayersUDF($"payload.players"))
.select("payload.teamID", "xplayers.*")
udfRes.show(false)
udfRes.explain()
输出:
+------+------------+-------------+-----------+------------+------------+-------------+
|teamID|Batsman.Name|Batsman.State|Bowler.Name|Bowler.State|Fielder.Name|Fielder.State|
+------+------------+-------------+-----------+------------+------------+-------------+
|1 |Amar |Gujarat |Akbar |Telangana |Antony |Kerala |
|1 |John |Queensland |Smith |Perth |null |null |
+------+------------+-------------+-----------+------------+------------+-------------+
具有以下执行计划:
== Physical Plan ==
*Project [payload#4.teamID AS teamID#46, UDF(payload#4.players).Batsman.Name AS Batsman.Name#40, UDF(payload#4.players).Batsman.State AS Batsman.State#41, UDF(payload#4.players).Bowler.Name AS Bowler.Name#42, UDF(payload#4.players).Bowler.State AS Bowler.State#43, UDF(payload#4.players).Fielder.Name AS Fielder.Name#44, UDF(payload#4.players).Fielder.State AS Fielder.State#45]
+- Scan ExistingRDD[payload#4,someID#5]
不涉及随机播放。如果您想进一步提高性能,那么在 spark.read.schem(SCHEMA).json 中添加显式读取模式将有助于进一步,因为读者不必推断模式,从而节省时间。
【讨论】:
感谢您的快速回复!但是,我现在使用了 @Anshuman 的代码,因为这样可以避免任何硬编码。希望它会在更大的数据集上表现良好。有什么想法吗? 我的执行计划和@Anshuman 方法非常相似,都使用 groupBy 导致 Exchange hashpartitioning(TeamID#10, 200),这是 shuffle 的一部分,从性能的角度来看,这是最昂贵的部分。 Coding wise Anshuman 更好,我不知道 pivot 功能,学到了一些新东西,为 Anshuman 干杯!!!。如果您担心性能,则需要避免使用 explode 和 groupBy,因为数据已经在 JSON 中分组,我知道如何编写 Scala UDF 以避免问题,将与执行计划一起发布在单独的答案中以上是关于Spark:如何将多行转换为具有多列的单行?的主要内容,如果未能解决你的问题,请参考以下文章
ServiceNow如何在Fufiller页面端实现String的多行与单行转换