基于Spark Scala中的条件转置Dataframe中的特定列和行
Posted
技术标签:
【中文标题】基于Spark Scala中的条件转置Dataframe中的特定列和行【英文标题】:Transpose Specific Columns and Rows in Dataframe based on Condition in Spark Scala 【发布时间】:2020-09-10 04:48:20 【问题描述】:我有一个如下场景,其中源数据帧需要使用 spark scala 从列转换为行
源数据帧:
|||||||||||||||||||||||||||||||||||||||||||||||
|ID|LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4 |
|||||||||||||||||||||||||||||||||||||||||||||||
| 1| 100| 1| 35| | | |444| | | |
| 2| 200| 3| 30| 15| 18| |111|222|333| |
| 3| 300| 2| 18| 20| | |555|666| | |
| 4| 400| 4| 28| 60| 80| 90|777|888|123|456|
| 5| 500| 1| 45| | | |245| | | |
|||||||||||||||||||||||||||||||||||||||||||||||
期望下面的结果需要根据 COUNT 字段的值/条件转换为行
预期的数据帧:
|||||||||||||||||
|ID|LOAN| A| B|
|||||||||||||||||
| 1| 100| 35|444|
| 2| 200| 30|111|
| 2| 200| 15|222|
| 2| 200| 18|333|
| 3| 300| 18|555|
| 3| 300| 20|666|
| 4| 400| 28|777|
| 4| 400| 60|888|
| 4| 400| 80|123|
| 4| 400| 90|456|
| 5| 500| 45|245|
|||||||||||||||||
【问题讨论】:
试图解决这个问题,如果解决了你的问题,请点赞+接受 【参考方案1】:我认为,您的用例是取消透视表。
我尝试使用以下方法解决此问题-
Read the input
val spark = sqlContext.sparkSession
val implicits = spark.implicits
import implicits._
val schema = StructType(
"ID|LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4"
.split("\\|")
.map(f => StructField(f.trim, DataTypes.IntegerType))
)
val data =
"""
| 1| 100| 1| 35| | | |444| | |
| 2| 200| 3| 30| 15| 18| |111|222|333|
| 3| 300| 2| 18| 20| | |555|666| |
| 4| 400| 4| 28| 60| 80| 90|777|888|123|456
| 5| 500| 1| 45| | | |245| | |
""".stripMargin
val df = spark.read
.schema(schema)
.option("sep", "|")
.csv(data.split(System.lineSeparator()).map(_.replaceAll("\\s*", "")).toSeq.toDS())
df.show(false)
df.printSchema()
结果-
+---+----+-----+---+----+----+----+---+----+----+----+
|ID |LOAN|COUNT|A1 |A2 |A3 |A4 |B1 |B2 |B3 |B4 |
+---+----+-----+---+----+----+----+---+----+----+----+
|1 |100 |1 |35 |null|null|null|444|null|null|null|
|2 |200 |3 |30 |15 |18 |null|111|222 |333 |null|
|3 |300 |2 |18 |20 |null|null|555|666 |null|null|
|4 |400 |4 |28 |60 |80 |90 |777|888 |123 |456 |
|5 |500 |1 |45 |null|null|null|245|null|null|null|
+---+----+-----+---+----+----+----+---+----+----+----+
root
|-- ID: integer (nullable = true)
|-- LOAN: integer (nullable = true)
|-- COUNT: integer (nullable = true)
|-- A1: integer (nullable = true)
|-- A2: integer (nullable = true)
|-- A3: integer (nullable = true)
|-- A4: integer (nullable = true)
|-- B1: integer (nullable = true)
|-- B2: integer (nullable = true)
|-- B3: integer (nullable = true)
|-- B4: integer (nullable = true)
unpivot the table and remove null entry
df.selectExpr(
"ID",
"LOAN",
"stack(4, A1, B1, A2, B2, A3, B3, A4, B4) as (A, B)"
).where("A is not null and B is not null").show(false)
结果-
+---+----+---+---+
|ID |LOAN|A |B |
+---+----+---+---+
|1 |100 |35 |444|
|2 |200 |30 |111|
|2 |200 |15 |222|
|2 |200 |18 |333|
|3 |300 |18 |555|
|3 |300 |20 |666|
|4 |400 |28 |777|
|4 |400 |60 |888|
|4 |400 |80 |123|
|4 |400 |90 |456|
|5 |500 |45 |245|
+---+----+---+---+
如果您以字符串类型读取数据,则可以使用空字符串而不是 null 来过滤结果
【讨论】:
我忘记了这个堆栈函数.. 很好的解决方案,我的有点复杂.. :)【参考方案2】:arrays_zip
合并 a1,a2,a3,a4
和 b1,b2,b3,b4
列。
array_except
删除 empty values
explode
分解使用 arrays_zip
创建的组合值
检查下面的代码。
scala> adf.show(false)
+---+----+-----+---+---+---+---+---+---+---+---+
|id |loan|count|a1 |a2 |a3 |a4 |b1 |b2 |b3 |b4 |
+---+----+-----+---+---+---+---+---+---+---+---+
|1 |100 |1 |35 | | | |444| | | |
|2 |200 |3 |30 |15 |18 | |111|222|333| |
|3 |300 |2 |18 |20 | | |555|666| | |
|4 |400 |4 |28 |60 |80 |90 |777|888|123|456|
|5 |500 |1 |45 | | | |245| | | |
+---+----+-----+---+---+---+---+---+---+---+---+
scala> :paste
// Entering paste mode (ctrl-D to finish)
adf
.withColumn("ab",explode(
arrays_zip(
array_except(array($"a1",$"a2",$"a3",$"a4"),array(lit(""))),
array_except(array($"b1",$"b2",$"b3",$"b4"),array(lit("")))
)
)
)
.select($"id",$"loan",$"ab".cast("struct<a:string,b:string>"))
.select($"id",$"loan",$"ab.a".as("a"),$"ab.b".as("b"))
.show(false)
// Exiting paste mode, now interpreting.
+---+----+---+---+
|id |loan|a |b |
+---+----+---+---+
|1 |100 |35 |444|
|2 |200 |30 |111|
|2 |200 |15 |222|
|2 |200 |18 |333|
|3 |300 |18 |555|
|3 |300 |20 |666|
|4 |400 |28 |777|
|4 |400 |60 |888|
|4 |400 |80 |123|
|4 |400 |90 |456|
|5 |500 |45 |245|
+---+----+---+---+
【讨论】:
以上是关于基于Spark Scala中的条件转置Dataframe中的特定列和行的主要内容,如果未能解决你的问题,请参考以下文章
NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件
如何使用 scala 根据 spark 中的条件获取 row_number()
在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件