如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?

Posted

技术标签:

【中文标题】如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?【英文标题】:How to Split rows to different columns in Spark DataFrame/DataSet? 【发布时间】:2016-11-14 10:16:04 【问题描述】:

假设我有这样的数据集:

Name | Subject | Y1  | Y2 
A    | math    | 1998| 2000
B    |         | 1996| 1999
     | science | 2004| 2005

我想拆分这个数据集的行,这样 Y2 列将被删除,例如:

Name | Subject | Y1
A    | math    | 1998
A    | math    | 1999
A    | math    | 2000
B    |         | 1996
B    |         | 1997
B    |         | 1998
B    |         | 1999
     | science | 2004
     | science | 2005

有人可以在这里提出一些建议吗?我希望我已经把我的问题说清楚了。提前致谢。

【问题讨论】:

@cheseaux 你为什么删除你的答案?这对我来说似乎是有效的。 【参考方案1】:

我认为您只需要创建一个udf 即可创建范围。然后您可以使用explode 创建必要的行:

val createRange = udf  (yearFrom: Int, yearTo: Int) =>
    (yearFrom to yearTo).toList


df.select($"Name", $"Subject", functions.explode(createRange($"Y1", $"Y2"))).show()

编辑:此代码的 python 版本将类似于:

from pyspark.sql import Row
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import IntegerType

createRange=udf( lambda (yearFrom, yearTo): list(range(yearFrom, yearTo)), IntegerType())

df.select($"Name", $"Subject", explode(createRange($"Y1", $"Y2"))).show()

【讨论】:

我们可以使用 python pandas 吗?我无法理解您的火花代码。【参考方案2】:

我已经在 pyspark 中测试了这段代码,它按预期工作:

data= sc.parallelize([["A","math",1998,2000],["B","",1996,1999],["","science",2004,2005]]

data.map(lambda reg: ((reg[0],reg[1]),(range(reg[2],reg[3]+1))) )
    .flatMapValues(lambda reg: reg).collect()

更详细地说,您需要将输入数据转换为 (key,value) 形式的一对 RDD,其中 key 由前两个字段组成,因为结果将被展平,保持 key 不变,@987654322 @。要映射的值构造为从Y1Y2range。所有这些都在第一个map 中完成。

flatMapValues 将返回与其key 关联的range 中的每个值。

输出如下:

[(('A', 'math'), 1998),
 (('A', 'math'), 1999),
 (('A', 'math'), 2000),
 (('B', ''), 1996),
 (('B', ''), 1997),
 (('B', ''), 1998),
 (('B', ''), 1999),
 (('', 'science'), 2004),
 (('', 'science'), 2005)]

【讨论】:

【参考方案3】:

您可以通过以下方式实现此功能:

  val resultantDF= df.rdd.flatMaprow =>
    val rangeInitial = row.getInt(2)
    val rangeEnd = row.getInt(3)
    val array = rangeInitial to rangeEnd
    (List.fill(array.size)(row.getString(0)),List.fill(array.size)(row.getString(1)),array).zipped.toList
    .toDF("Name","Subject","Y1")

resultantDF.show()

【讨论】:

【参考方案4】:

您可以轻松地使用 spark select 在 Data frame 甚至 RDD 中获取您想要的内容。

Dataset<Row> sqlDF = spark.sql("SELECT Name,Subject,Y1 FROM tableName");

如果你从已经存在的数据框开始,比如用户,你可以使用这样的东西:

resultDF = usersDF.select("Name","Subject","Y1");

【讨论】:

查看输出。主要目的是将行拆分次数为 (Y2-Y1) 而不仅仅是删除 Y2。 我想根据年份范围拆分行,例如,如果数据类似于 ``A |数学 |1998 | 2000` 那么输出就像A | math| 1998 A | math | 1999 A | math| 2000 import org.apache.spark.sql.functions._ val toRange = udf (y1: Int, y2: Int) =&gt; (y1 to y2).toArray input .withColumn("years", toRange($"Y1", $"Y2")) .select($"Name", explode($"years") as "Year") 但它显示 NumberFormatException : null,由于存在空值。

以上是关于如何在 Spark DataFrame/DataSet 中将行拆分为不同的列?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中加速 leftouterjoin?

Spark:如何在 pyspark 或 scala spark 中分解数据并添加列名?

如何在Ubuntu下搭建Spark集群

如何在单个 Spark 作业中摄取不同的 Spark 数据帧

如何在万亿级别规模的数据量上使用Spark

如何在 spark 2(java) 中创建广播变量?