如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?

Posted

技术标签:

【中文标题】如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?【英文标题】:How to copy the "first" row of a spark data frame to another data frame? Why does my minimal example fails?如何将火花数据帧的“第一”行复制到另一个数据帧?为什么我的最小示例失败了? 【发布时间】:2019-09-09 07:08:02 【问题描述】:

基本问题:

我想将 Spark 数据帧 sdf 的“第一行”复制到另一个 Spark 数据帧 sdfEmpty

我不明白下面的代码出了什么问题。 因此,我期待一个解决方案和一个解释,在我的最小示例中失败了。

一个简单的例子:

// create a spark data frame
import org.apache.spark.sql._
val sdf = Seq(
 (1, "a"),
 (12, "b"),
 (234, "b")
).toDF("A", "B")

sdf.show() 
+---+---+
|  A|  B|
+---+---+
|  1|  a|
|  2|  b|
|  3|  b|
+---+---+

// create an empty spark data frame to store the row
// declare it as var, such that I can change it later
var sdfEmpty = spark.createDataFrame(sc.emptyRDD[Row], sdf.schema)

sdfEmpty.show()
+---+---+
|  A|  B|
+---+---+
+---+---+

// take the "first" row of sdf as a spark data frame
val row = sdf.limit(1)

// combine the two spark data frames
sdfEmpty = sdfEmpty.union(row)

作为row 是:

row.show()
+---+---+
|  A|  B|
+---+---+
|  1|  a|
+---+---+

sdfEmpty 的预期结果是:

+---+---+
|  A|  B|
+---+---+
|  1|  a|
+---+---+

但我明白了:

sdfEmpty.show()
+---+---+
|  A|  B|
+---+---+
|  2|  b|
+---+---+

问题: 让我感到困惑的是:使用 val row = sdf.limit(1) 我以为我创建了一个永久/不可更改/定义明确的对象。这样当我打印一次并将其添加到某物时,我会得到相同的结果。

备注:(非常感谢丹尼尔的发言)

我知道在 scala 的分布式世界中,没有明确定义的“第一行”概念。我把它放在那里是为了简单起见,我希望那些在类似事情上苦苦挣扎的人会“不小心”使用“第一”这个词。

我试图实现的是:(在一个简化的例子中) 我有一个包含 2 列 A 和 B 的数据框。A 列是部分有序的,B 列是完全有序的。 我想过滤数据w.r.t。列。所以这个想法是某种分而治之:拆分数据框,这样两列都是完全有序的,而不是像往常一样过滤。 (并进行明显的迭代)

为了实现这一点,我需要选择一个定义明确的行并将日期拆分为 w.r.t。行.A.但正如最小示例所示,我的命令不会产生定义明确的对象。

非常感谢

【问题讨论】:

你能分享println(sdf.rdd.partitions.size)的输出吗? @moriarty007 println(sdf.rdd.partitions.size) 的输出是3。 【参考方案1】:

Spark 是分布式的,因此我们不能依赖“第一”的概念。在调用limitfirst 时,取决于分区,我们可以获得不同的结果。

要获得一致的结果,您的数据必须具有我们可以使用的基本顺序 - 这很有意义,因为除非您的数据有逻辑顺序,否则我们无法真正说出采用第一行。

假设你想取 A 列的第一行,你可以运行 orderBy("A").first()(*) 。尽管如果 A 列有不止一行具有相同的最小值,则无法保证您将获得哪一行。

(* 我假设 scala API 与 Python 具有相同的命名,所以如果它们的命名不同,请纠正我)

【讨论】:

亲爱的丹尼尔,感谢您的回答。我知道在 Spark 的分布式世界中,数据集中的行没有明确定义的顺序。这就是我使用引号的原因。此外,在我的特定用例中,我只需要选择一个任意但固定的元素 w.r.t.一些订购。让我感到困惑的是:使用val row = sdf.limit(1) 我以为我创建了一个永久/不可更改/定义明确的对象。这样当我打印一次并将其添加到某物时,我会得到相同的结果。 嗨 Christian,.limit 只是一个转换 - 在您使用操作(例如 showcollect)运行它之前,它不会被评估,因此取决于它可以返回的上下文不同的价值。如果您想始终如一地使用此行,您可以 .collect 将其驱动并用作局部变量,或确保您的转换具有确定性排序 .orderBy 嗨丹尼尔!感谢您的解释,但我的代码实际上已经是:val row = sdf.limit(1) row.show() sdfEmpty = sdfEmpty.union(row) 所以我没想到使用 show() 会开始评估。我有点害怕使用 collect(),因为如果数据不适合驱动程序内存,它会崩溃,是吗?我在上一步中使用了 orderBy 并没有提及它,以使示例尽可能小。但是,我愿意提供任何有助于我改进提问的风格提示,我也很高兴你的回答,因为我从 1 周开始就使用 Scala 并且...... .collect 只有在查询结果大于内存时才会失败——这在读取单行的情况下几乎是不可能的。很高兴我能提供帮助 - 如果您觉得有用,请接受答案并投票 :)【参考方案2】:

@Christian 你可以使用 take 函数来实现这个结果。 take(num) 获取 RDD 的前 num 个元素。它首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的附加分区数。 这里是代码sn-p。

scala> import org.apache.spark.sql.types._

scala> val sdf = Seq(
 (1, "a"),
 (12, "b"),
 (234, "b")
).toDF("A", "B")

scala> import org.apache.spark.sql._

scala> var sdfEmpty = spark.createDataFrame(sc.emptyRDD[Row], sdf.schema)

scala> var first1  =sdf.rdd.take(1)

scala> val first_row = spark.createDataFrame(sc.parallelize(first1), sdf.schema)

scala> sdfEmpty.union(first_row).show
+---+---+
|  A|  B|
+---+---+
|  1|  a|
+---+---+

有关 take() 和 first() 函数的更多信息,请阅读spark Documentation。如果您对此有任何疑问,请告诉我。

【讨论】:

嗨,mahesh gupta,(和丹尼尔)愚蠢的问题:使用take 给了我一个数组,然后我必须将其转换为数据框。在我看来这是一个额外的步骤。将此与collect() 进行比较,哪种方法更有效。最后,问题只是一个功能的一部分,我将以数百万计。不幸的是,我是 scala 的新手,不知道如何自己创建一个大而随机的示例。要求:A列是部分排序的(例如,由子集定义排序的数组),B列是完全排序的。正是我不知道如何创建A列。 @Christian 请阅读此umbertogriffo.gitbooks.io/… 您将了解获取与收集【参考方案3】:

我发布此答案是因为它包含 Daniel 建议的解决方案。一旦我通过文献提供的 mahesh gupta 或更多测试,我将更新此答案并就“现实生活”中不同方法的运行时发表评论。

基本问题:

我想将 Spark 数据帧 sdf 的“第一行”复制到另一个 Spark 数据帧 sdfEmpty

在 Spark 的分布式世界中,没有明确定义的 first 概念,但由于 orderBy,可能会实现类似的东西。

一个最小的工作示例:

// create a spark data frame
import org.apache.spark.sql._
val sdf = Seq(
 (1, "a"),
 (12, "b"),
 (234, "b")
).toDF("A", "B")

sdf.show() 
+---+---+
|  A|  B|
+---+---+
|  1|  a|
|  2|  b|
|  3|  b|
+---+---+

// create an empty spark data frame to store the row
// declare it as var, such that I can change it later
var sdfEmpty = spark.createDataFrame(sc.emptyRDD[Row], sdf.schema)

sdfEmpty.show()
+---+---+
|  A|  B|
+---+---+
+---+---+

// take the "first" row of sdf as a spark data frame
val row = sdf.limit(1).collect()

// combine the two spark data frames
sdfEmpty = sdfEmpty.union(row)

row 是:

row.show()
+---+---+
|  A|  B|
+---+---+
|  1|  a|
+---+---+

** 而sdfEmpty 的结果是:**

+---+---+
|  A|  B|
+---+---+
|  1|  a|
+---+---+

备注:Daniel 给出的解释(参见上面的 cmets).limit(n) 是一个转换 - 它不会被评估,直到像 show 或 collect 这样的操作运行。因此,根据上下文,它可以返回不同的值。要始终使用.limit 的结果,可以将.collect 用于驱动程序并将其用作局部变量。

【讨论】:

以上是关于如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?的主要内容,如果未能解决你的问题,请参考以下文章

如何将每一行熊猫数据帧附加到另一个数据帧的每一行

如何截断火花数据框列的值? [复制]

将值从一个数据帧切片复制到另一个:使用“IndexSlice”的多索引熊猫数据帧的切片是不是总是一致地排序?

如何将数据帧的2列组合到另一列中

限制火花数据帧的数据

如何有效地计算数据帧的行数? [复制]