Spark:如何将多行转换为具有多列的单行?

Posted

技术标签:

【中文标题】Spark:如何将多行转换为具有多列的单行?【英文标题】:Spark: How to convert multiple rows into single row with multiple columns? 【发布时间】:2018-09-20 10:13:33 【问题描述】:

注意:这只是一个简单的示例数据。与实际的板球队相比,这没有意义。

我有一个如下所示的 JSON 文件:


  "someID": "a5cf4922f4e3f45",
  "payload": 
    "teamID": "1",
    "players": [
      
        "type": "Batsman",
        "name": "Amar",
        "address": 
          "state": "Gujarat"
        
      ,
      
        "type": "Bowler",
        "name": "Akbar",
        "address": 
          "state": "Telangana"
        
      ,
      
        "type": "Fielder",
        "name": "Antony",
        "address": 
          "state": "Kerala"
        
      
    ]
  

我用下面的代码对此进行了分解:

df_record = spark.read.json("path-to-file.json",multiLine=True)

df_player_dtls = df_record.select("payload.teamID", explode("payload.players").alias("xplayers")) \
                          .select("teamID", \
                                  "xplayers.type", \
                                  "xplayers.name", \
                                  "xplayers.address.state")

df_player_dtls.createOrReplaceTempView("t_player_dtls")

spark.sql("SELECT * FROM t_player_dtls").show()

所以目前的输出看起来像:

+--------+---------+--------+------------+
| TeamID |  Type   |  Name  |   State    |
+--------+---------+--------+------------+
|      1 | Batsman | Amar   | Gujarat    |
|      1 | Bowler  | Akbar  | Telangana  |
|      1 | Fielder | Antony | Kerala     |
|      2 | Batsman | John   | Queensland |
|      2 | Bowler  | Smith  | Perth      |
+--------+---------+--------+------------+

我想把它转换成下面显示的格式:

+--------+--------------+---------------+-------------+--------------+--------------+---------------+
| TeamID | Batsman.Name | Batsman.State | Bowler.Name | Bowler.State | Fielder.Name | Fielder.State |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+
|      1 | Amar         | Gujarat       | Akbar       | Telangana    | Antony       | Kerala        |
|      2 | John         | Queensland    | Smith       | Perth        | null         | null          |
+--------+--------------+---------------+-------------+--------------+--------------+---------------+

一支球队中只有每种类型的球员,并且每支球队中最多可以有四种类型的球员击球手、投球手、外野手和守门员)。所以每队最多有四名球员。因此,将保存此数据的决赛桌有九列(一列是球队 ID,四名球员的姓名和州)。

是否可以在 Spark 中实现这一点?我是 Spark 的新手,非常感谢解释这些步骤的答案。

【问题讨论】:

【参考方案1】:

我们可以使用pyspark的pivot函数

from pyspark.sql.functions import first

df = df_player_dtls.groupBy("TeamID").pivot("Type").agg(
                            first('Name').alias('Name'),
                            first("State").alias("State"))
df.show(10,False)

【讨论】:

【参考方案2】:

使用 SQL 是可能的,这不是最有效的方法(UDF 可能),但它可以工作。抱歉,它是 Scala 风格的。

val res = spark.sql(
        """select teamID
          |, Batsman.name as `Batsman.name`, Batsman.state as `Batsman.state`
          |, Bowler.name as `Bowler.name`, Bowler.state as `Bowler.state`
          |, Fielder.name as `Fielder.name`, Fielder.state as `Fielder.state`
          |from (
          |   select teamID,
          |     max(case type when 'Batsman' then info end) as Batsman
          |     , max(case type when 'Bowler' then info end) as Bowler
          |     , max(case type when 'Fielder' then info end) as Fielder
          |     from (select teamID, type, struct(name, state) as info from t_player_dtls) group by teamID
      |)""".stripMargin)

我使用 group by 围绕 teamID 列旋转数据,max 将选择一个不为空的值,case 语句将只允许一条记录进入 最大。为了简化最大案例组合,我使用了 struct 函数,该函数创建了一个复合列 info,该列由我们稍后想要提升为平面模式的有效负载组成。

UDF 会更高效,但我对 python 不熟悉。

更新 两种解决方案(SQL 和 pivot)都使用 explodegroupBy 组合,@Anshuman 更容易编码,具有以下执行计划:

SQL

== Physical Plan ==
SortAggregate(key=[teamID#10], functions=[max(CASE WHEN (type#16 = Batsman) THEN info#31 END), max(CASE WHEN (type#16 = Bowler) THEN info#31 END), max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
+- *Sort [teamID#10 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(teamID#10, 200)
      +- SortAggregate(key=[teamID#10], functions=[partial_max(CASE WHEN (type#16 = Batsman) THEN info#31 END), partial_max(CASE WHEN (type#16 = Bowler) THEN info#31 END), partial_max(CASE WHEN (type#16 = Fielder) THEN info#31 END)])
     +- *Sort [teamID#10 ASC NULLS FIRST], false, 0
        +- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, named_struct(name, xplayers#12.name, state, xplayers#12.address.state) AS info#31]
           +- Generate explode(payload#4.players), true, false, [xplayers#12]
              +- *Project [payload#4]
                 +- Scan ExistingRDD[payload#4,someID#5]

枢轴

== Physical Plan ==
SortAggregate(key=[TeamID#10], functions=[first(if ((Type#16 <=> Batsman)) Name#17 else null, true), first(if ((Type#16 <=> Batsman)) State#18 else null, true), first(if ((Type#16 <=> Bowler)) Name#17 else null, true), first(if ((Type#16 <=> Bowler)) State#18 else null, true), first(if ((Type#16 <=> Fielder)) Name#17 else null, true), first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
+- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(TeamID#10, 200)
  +- SortAggregate(key=[TeamID#10], functions=[partial_first(if ((Type#16 <=> Batsman)) Name#17 else null, true), partial_first(if ((Type#16 <=> Batsman)) State#18 else null, true), partial_first(if ((Type#16 <=> Bowler)) Name#17 else null, true), partial_first(if ((Type#16 <=> Bowler)) State#18 else null, true), partial_first(if ((Type#16 <=> Fielder)) Name#17 else null, true), partial_first(if ((Type#16 <=> Fielder)) State#18 else null, true)])
     +- *Sort [TeamID#10 ASC NULLS FIRST], false, 0
        +- *Project [payload#4.teamID AS teamID#10, xplayers#12.type AS type#16, xplayers#12.name AS name#17, xplayers#12.address.state AS state#18]
           +- Generate explode(payload#4.players), true, false, [xplayers#12]
              +- *Project [payload#4]
                 +- Scan ExistingRDD[payload#4,someID#5]

两者都会导致 shuffle (Exchange hashpartitioning(TeamID#10, 200)*)。

如果性能是您的目标,那么您可以使用这种 Scala 方法(我不懂 Python)

import org.apache.spark.sql.functions._

  val df_record = spark.read.json(Seq(row_1, row_2).toDS)

  //Define your custom player types, as many as needed
  val playerTypes = Seq("Batsman", "Bowler", "Fielder")

  //Return type for the UDF
  val returnType = StructType(playerTypes.flatMap(t => Seq(StructField(s"$t.Name", StringType), StructField(s"$t.State", StringType))))

  val unpackPlayersUDF = udf( (players: Seq[Row]) => 
    val playerValues: Map[String, Row] = players.map(p => (p.getAs[String]("type"), p)).toMap
    val arrangedValues = playerTypes.flatMap  t =>
      val playerRow = playerValues.get(t) //if type does not exist, than value will be None, which is null
      Seq(
        playerRow.map(_.getAs[String]("name"))
        , playerRow.map(_.getAs[Row]("address").getAs[String]("state"))
      )
    
    Row(arrangedValues: _*)
  
  , returnType)

  val udfRes = df_record
    .withColumn("xplayers", unpackPlayersUDF($"payload.players"))
    .select("payload.teamID", "xplayers.*")

  udfRes.show(false)
  udfRes.explain()

输出:

+------+------------+-------------+-----------+------------+------------+-------------+
|teamID|Batsman.Name|Batsman.State|Bowler.Name|Bowler.State|Fielder.Name|Fielder.State|
+------+------------+-------------+-----------+------------+------------+-------------+
|1     |Amar        |Gujarat      |Akbar      |Telangana   |Antony      |Kerala       |
|1     |John        |Queensland   |Smith      |Perth       |null        |null         |
+------+------------+-------------+-----------+------------+------------+-------------+

具有以下执行计划:

== Physical Plan ==
*Project [payload#4.teamID AS teamID#46, UDF(payload#4.players).Batsman.Name AS Batsman.Name#40, UDF(payload#4.players).Batsman.State AS Batsman.State#41, UDF(payload#4.players).Bowler.Name AS Bowler.Name#42, UDF(payload#4.players).Bowler.State AS Bowler.State#43, UDF(payload#4.players).Fielder.Name AS Fielder.Name#44, UDF(payload#4.players).Fielder.State AS Fielder.State#45]
+- Scan ExistingRDD[payload#4,someID#5]

不涉及随机播放。如果您想进一步提高性能,那么在 spark.read.schem(SCHEMA).json 中添加显式读取模式将有助于进一步,因为读者不必推断模式,从而节省时间。

【讨论】:

感谢您的快速回复!但是,我现在使用了 @Anshuman 的代码,因为这样可以避免任何硬编码。希望它会在更大的数据集上表现良好。有什么想法吗? 我的执行计划和@Anshuman 方法非常相似,都使用 groupBy 导致 Exchange hashpartitioning(TeamID#10, 200),这是 shuffle 的一部分,从性能的角度来看,这是最昂贵的部分。 Coding wise Anshuman 更好,我不知道 pivot 功能,学到了一些新东西,为 Anshuman 干杯!!!。如果您担心性能,则需要避免使用 explodegroupBy,因为数据已经在 J​​SON 中分组,我知道如何编写 Scala UDF 以避免问题,将与执行计划一起发布在单独的答案中

以上是关于Spark:如何将多行转换为具有多列的单行?的主要内容,如果未能解决你的问题,请参考以下文章

ServiceNow如何在Fufiller页面端实现String的多行与单行转换

如何将 JSON 格式的单行 Spark 数据框分解为多行?

多行变成单行多列

如何将多行标签xml文件转换为数据框

将多行加入单行中的多列 Netezza/Postgres

在 T-SQL 中将多行转换为具有多列的一行