基于Spark Scala中的条件转置Dataframe中的特定列和行

Posted

技术标签:

【中文标题】基于Spark Scala中的条件转置Dataframe中的特定列和行【英文标题】:Transpose Specific Columns and Rows in Dataframe based on Condition in Spark Scala 【发布时间】:2020-09-10 04:48:20 【问题描述】:

我有一个如下场景,其中源数据帧需要使用 spark scala 从列转换为行

源数据帧:

|||||||||||||||||||||||||||||||||||||||||||||||
|ID|LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4 |
|||||||||||||||||||||||||||||||||||||||||||||||
| 1| 100|    1| 35|   |   |   |444|   |   |   |
| 2| 200|    3| 30| 15| 18|   |111|222|333|   |
| 3| 300|    2| 18| 20|   |   |555|666|   |   |
| 4| 400|    4| 28| 60| 80| 90|777|888|123|456|
| 5| 500|    1| 45|   |   |   |245|   |   |   |
|||||||||||||||||||||||||||||||||||||||||||||||

期望下面的结果需要根据 COUNT 字段的值/条件转换为行

预期的数据帧:

|||||||||||||||||
|ID|LOAN|  A|  B|
|||||||||||||||||
| 1| 100| 35|444|
| 2| 200| 30|111|
| 2| 200| 15|222|
| 2| 200| 18|333|
| 3| 300| 18|555|
| 3| 300| 20|666|
| 4| 400| 28|777|
| 4| 400| 60|888|
| 4| 400| 80|123|
| 4| 400| 90|456|
| 5| 500| 45|245|
|||||||||||||||||

【问题讨论】:

试图解决这个问题,如果解决了你的问题,请点赞+接受 【参考方案1】:

我认为,您的用例是取消透视表。

我尝试使用以下方法解决此问题-

    Read the input
    val spark = sqlContext.sparkSession
    val implicits = spark.implicits
    import implicits._
    val schema = StructType(
      "ID|LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4"
        .split("\\|")
        .map(f => StructField(f.trim, DataTypes.IntegerType))
    )
    val data =
      """
        | 1| 100|    1| 35|   |   |   |444|   |   |
        | 2| 200|    3| 30| 15| 18|   |111|222|333|
        | 3| 300|    2| 18| 20|   |   |555|666|   |
        | 4| 400|    4| 28| 60| 80| 90|777|888|123|456
        | 5| 500|    1| 45|   |   |   |245|   |   |
      """.stripMargin
    val df = spark.read
      .schema(schema)
      .option("sep", "|")
      .csv(data.split(System.lineSeparator()).map(_.replaceAll("\\s*", "")).toSeq.toDS())
    df.show(false)
    df.printSchema()

结果-

+---+----+-----+---+----+----+----+---+----+----+----+
|ID |LOAN|COUNT|A1 |A2  |A3  |A4  |B1 |B2  |B3  |B4  |
+---+----+-----+---+----+----+----+---+----+----+----+
|1  |100 |1    |35 |null|null|null|444|null|null|null|
|2  |200 |3    |30 |15  |18  |null|111|222 |333 |null|
|3  |300 |2    |18 |20  |null|null|555|666 |null|null|
|4  |400 |4    |28 |60  |80  |90  |777|888 |123 |456 |
|5  |500 |1    |45 |null|null|null|245|null|null|null|
+---+----+-----+---+----+----+----+---+----+----+----+

root
 |-- ID: integer (nullable = true)
 |-- LOAN: integer (nullable = true)
 |-- COUNT: integer (nullable = true)
 |-- A1: integer (nullable = true)
 |-- A2: integer (nullable = true)
 |-- A3: integer (nullable = true)
 |-- A4: integer (nullable = true)
 |-- B1: integer (nullable = true)
 |-- B2: integer (nullable = true)
 |-- B3: integer (nullable = true)
 |-- B4: integer (nullable = true)

    unpivot the table and remove null entry
df.selectExpr(
      "ID",
      "LOAN",
      "stack(4, A1, B1, A2, B2, A3, B3, A4, B4) as (A, B)"
    ).where("A is not null and B is not null").show(false)

结果-

+---+----+---+---+
|ID |LOAN|A  |B  |
+---+----+---+---+
|1  |100 |35 |444|
|2  |200 |30 |111|
|2  |200 |15 |222|
|2  |200 |18 |333|
|3  |300 |18 |555|
|3  |300 |20 |666|
|4  |400 |28 |777|
|4  |400 |60 |888|
|4  |400 |80 |123|
|4  |400 |90 |456|
|5  |500 |45 |245|
+---+----+---+---+

如果您以字符串类型读取数据,则可以使用空字符串而不是 null 来过滤结果

【讨论】:

我忘记了这个堆栈函数.. 很好的解决方案,我的有点复杂.. :)【参考方案2】:

arrays_zip 合并 a1,a2,a3,a4b1,b2,b3,b4 列。

array_except 删除 empty values

explode 分解使用 arrays_zip 创建的组合值

检查下面的代码。

scala> adf.show(false)
+---+----+-----+---+---+---+---+---+---+---+---+
|id |loan|count|a1 |a2 |a3 |a4 |b1 |b2 |b3 |b4 |
+---+----+-----+---+---+---+---+---+---+---+---+
|1  |100 |1    |35 |   |   |   |444|   |   |   |
|2  |200 |3    |30 |15 |18 |   |111|222|333|   |
|3  |300 |2    |18 |20 |   |   |555|666|   |   |
|4  |400 |4    |28 |60 |80 |90 |777|888|123|456|
|5  |500 |1    |45 |   |   |   |245|   |   |   |
+---+----+-----+---+---+---+---+---+---+---+---+


scala> :paste
// Entering paste mode (ctrl-D to finish)

adf
.withColumn("ab",explode(
    arrays_zip(
        array_except(array($"a1",$"a2",$"a3",$"a4"),array(lit(""))),
        array_except(array($"b1",$"b2",$"b3",$"b4"),array(lit("")))
        )
    )
)
.select($"id",$"loan",$"ab".cast("struct<a:string,b:string>"))
.select($"id",$"loan",$"ab.a".as("a"),$"ab.b".as("b"))
.show(false)

// Exiting paste mode, now interpreting.

+---+----+---+---+
|id |loan|a  |b  |
+---+----+---+---+
|1  |100 |35 |444|
|2  |200 |30 |111|
|2  |200 |15 |222|
|2  |200 |18 |333|
|3  |300 |18 |555|
|3  |300 |20 |666|
|4  |400 |28 |777|
|4  |400 |60 |888|
|4  |400 |80 |123|
|4  |400 |90 |456|
|5  |500 |45 |245|
+---+----+---+---+

【讨论】:

以上是关于基于Spark Scala中的条件转置Dataframe中的特定列和行的主要内容,如果未能解决你的问题,请参考以下文章

NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件

如何使用 scala 根据 spark 中的条件获取 row_number()

如何根据spark scala中的条件进行累积和

在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件

使用 Scala 在以 Spark 中的列值为条件的广播 Map 上执行查找

Scala - 基于 Spark 中的键合并两个 RDD