如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?
Posted
技术标签:
【中文标题】如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?【英文标题】:How to Split rows to different columns in Spark DataFrame/DataSet? 【发布时间】:2016-11-14 10:16:04 【问题描述】:假设我有这样的数据集:
Name | Subject | Y1 | Y2
A | math | 1998| 2000
B | | 1996| 1999
| science | 2004| 2005
我想拆分这个数据集的行,这样 Y2 列将被删除,例如:
Name | Subject | Y1
A | math | 1998
A | math | 1999
A | math | 2000
B | | 1996
B | | 1997
B | | 1998
B | | 1999
| science | 2004
| science | 2005
有人可以在这里提出一些建议吗?我希望我已经把我的问题说清楚了。提前致谢。
【问题讨论】:
@cheseaux 你为什么删除你的答案?这对我来说似乎是有效的。 【参考方案1】:我认为您只需要创建一个udf
即可创建范围。然后您可以使用explode 创建必要的行:
val createRange = udf (yearFrom: Int, yearTo: Int) =>
(yearFrom to yearTo).toList
df.select($"Name", $"Subject", functions.explode(createRange($"Y1", $"Y2"))).show()
编辑:此代码的 python 版本将类似于:
from pyspark.sql import Row
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import IntegerType
createRange=udf( lambda (yearFrom, yearTo): list(range(yearFrom, yearTo)), IntegerType())
df.select($"Name", $"Subject", explode(createRange($"Y1", $"Y2"))).show()
【讨论】:
我们可以使用 python pandas 吗?我无法理解您的火花代码。【参考方案2】:我已经在 pyspark 中测试了这段代码,它按预期工作:
data= sc.parallelize([["A","math",1998,2000],["B","",1996,1999],["","science",2004,2005]]
data.map(lambda reg: ((reg[0],reg[1]),(range(reg[2],reg[3]+1))) )
.flatMapValues(lambda reg: reg).collect()
更详细地说,您需要将输入数据转换为 (key,value) 形式的一对 RDD,其中 key 由前两个字段组成,因为结果将被展平,保持 key 不变,@987654322 @。要映射的值构造为从Y1
到Y2
的range
。所有这些都在第一个map
中完成。
flatMapValues
将返回与其key
关联的range
中的每个值。
输出如下:
[(('A', 'math'), 1998),
(('A', 'math'), 1999),
(('A', 'math'), 2000),
(('B', ''), 1996),
(('B', ''), 1997),
(('B', ''), 1998),
(('B', ''), 1999),
(('', 'science'), 2004),
(('', 'science'), 2005)]
【讨论】:
【参考方案3】:您可以通过以下方式实现此功能:
val resultantDF= df.rdd.flatMaprow =>
val rangeInitial = row.getInt(2)
val rangeEnd = row.getInt(3)
val array = rangeInitial to rangeEnd
(List.fill(array.size)(row.getString(0)),List.fill(array.size)(row.getString(1)),array).zipped.toList
.toDF("Name","Subject","Y1")
resultantDF.show()
【讨论】:
【参考方案4】:您可以轻松地使用 spark select 在 Data frame 甚至 RDD 中获取您想要的内容。
Dataset<Row> sqlDF = spark.sql("SELECT Name,Subject,Y1 FROM tableName");
如果你从已经存在的数据框开始,比如用户,你可以使用这样的东西:
resultDF = usersDF.select("Name","Subject","Y1");
【讨论】:
查看输出。主要目的是将行拆分次数为 (Y2-Y1) 而不仅仅是删除 Y2。 我想根据年份范围拆分行,例如,如果数据类似于 ``A |数学 |1998 | 2000` 那么输出就像A | math| 1998 A | math | 1999 A | math| 2000
import org.apache.spark.sql.functions._ val toRange = udf (y1: Int, y2: Int) => (y1 to y2).toArray input .withColumn("years", toRange($"Y1", $"Y2")) .select($"Name", explode($"years") as "Year")
但它显示 NumberFormatException : null,由于存在空值。以上是关于如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?的主要内容,如果未能解决你的问题,请参考以下文章
Spark:如何在 pyspark 或 scala spark 中分解数据并添加列名?