如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?
Posted
技术标签:
【中文标题】如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?【英文标题】:How to copy the "first" row of a spark data frame to another data frame? Why does my minimal example fails?如何将火花数据帧的“第一”行复制到另一个数据帧?为什么我的最小示例失败了? 【发布时间】:2019-09-09 07:08:02 【问题描述】:基本问题:
我想将 Spark 数据帧 sdf
的“第一行”复制到另一个 Spark 数据帧 sdfEmpty
。
我不明白下面的代码出了什么问题。 因此,我期待一个解决方案和一个解释,在我的最小示例中失败了。
一个简单的例子:
// create a spark data frame
import org.apache.spark.sql._
val sdf = Seq(
(1, "a"),
(12, "b"),
(234, "b")
).toDF("A", "B")
sdf.show()
+---+---+
| A| B|
+---+---+
| 1| a|
| 2| b|
| 3| b|
+---+---+
// create an empty spark data frame to store the row
// declare it as var, such that I can change it later
var sdfEmpty = spark.createDataFrame(sc.emptyRDD[Row], sdf.schema)
sdfEmpty.show()
+---+---+
| A| B|
+---+---+
+---+---+
// take the "first" row of sdf as a spark data frame
val row = sdf.limit(1)
// combine the two spark data frames
sdfEmpty = sdfEmpty.union(row)
作为row
是:
row.show()
+---+---+
| A| B|
+---+---+
| 1| a|
+---+---+
sdfEmpty
的预期结果是:
+---+---+
| A| B|
+---+---+
| 1| a|
+---+---+
但我明白了:
sdfEmpty.show()
+---+---+
| A| B|
+---+---+
| 2| b|
+---+---+
问题: 让我感到困惑的是:使用 val row = sdf.limit(1) 我以为我创建了一个永久/不可更改/定义明确的对象。这样当我打印一次并将其添加到某物时,我会得到相同的结果。
备注:(非常感谢丹尼尔的发言)
我知道在 scala 的分布式世界中,没有明确定义的“第一行”概念。我把它放在那里是为了简单起见,我希望那些在类似事情上苦苦挣扎的人会“不小心”使用“第一”这个词。
我试图实现的是:(在一个简化的例子中) 我有一个包含 2 列 A 和 B 的数据框。A 列是部分有序的,B 列是完全有序的。 我想过滤数据w.r.t。列。所以这个想法是某种分而治之:拆分数据框,这样两列都是完全有序的,而不是像往常一样过滤。 (并进行明显的迭代)
为了实现这一点,我需要选择一个定义明确的行并将日期拆分为 w.r.t。行.A.但正如最小示例所示,我的命令不会产生定义明确的对象。
非常感谢
【问题讨论】:
你能分享println(sdf.rdd.partitions.size)
的输出吗?
@moriarty007 println(sdf.rdd.partitions.size) 的输出是3。
【参考方案1】:
Spark 是分布式的,因此我们不能依赖“第一”的概念。在调用limit
或first
时,取决于分区,我们可以获得不同的结果。
要获得一致的结果,您的数据必须具有我们可以使用的基本顺序 - 这很有意义,因为除非您的数据有逻辑顺序,否则我们无法真正说出采用第一行。
假设你想取 A 列的第一行,你可以运行 orderBy("A").first()
(*) 。尽管如果 A 列有不止一行具有相同的最小值,则无法保证您将获得哪一行。
(* 我假设 scala API 与 Python 具有相同的命名,所以如果它们的命名不同,请纠正我)
【讨论】:
亲爱的丹尼尔,感谢您的回答。我知道在 Spark 的分布式世界中,数据集中的行没有明确定义的顺序。这就是我使用引号的原因。此外,在我的特定用例中,我只需要选择一个任意但固定的元素 w.r.t.一些订购。让我感到困惑的是:使用val row = sdf.limit(1)
我以为我创建了一个永久/不可更改/定义明确的对象。这样当我打印一次并将其添加到某物时,我会得到相同的结果。
嗨 Christian,.limit
只是一个转换 - 在您使用操作(例如 show
或 collect
)运行它之前,它不会被评估,因此取决于它可以返回的上下文不同的价值。如果您想始终如一地使用此行,您可以 .collect
将其驱动并用作局部变量,或确保您的转换具有确定性排序 .orderBy
嗨丹尼尔!感谢您的解释,但我的代码实际上已经是:val row = sdf.limit(1) row.show() sdfEmpty = sdfEmpty.union(row)
所以我没想到使用 show() 会开始评估。我有点害怕使用 collect(),因为如果数据不适合驱动程序内存,它会崩溃,是吗?我在上一步中使用了 orderBy 并没有提及它,以使示例尽可能小。但是,我愿意提供任何有助于我改进提问的风格提示,我也很高兴你的回答,因为我从 1 周开始就使用 Scala 并且......
.collect
只有在查询结果大于内存时才会失败——这在读取单行的情况下几乎是不可能的。很高兴我能提供帮助 - 如果您觉得有用,请接受答案并投票 :)【参考方案2】:
@Christian 你可以使用 take 函数来实现这个结果。 take(num) 获取 RDD 的前 num 个元素。它首先扫描一个分区,然后使用该分区的结果来估计满足限制所需的附加分区数。 这里是代码sn-p。
scala> import org.apache.spark.sql.types._
scala> val sdf = Seq(
(1, "a"),
(12, "b"),
(234, "b")
).toDF("A", "B")
scala> import org.apache.spark.sql._
scala> var sdfEmpty = spark.createDataFrame(sc.emptyRDD[Row], sdf.schema)
scala> var first1 =sdf.rdd.take(1)
scala> val first_row = spark.createDataFrame(sc.parallelize(first1), sdf.schema)
scala> sdfEmpty.union(first_row).show
+---+---+
| A| B|
+---+---+
| 1| a|
+---+---+
有关 take() 和 first() 函数的更多信息,请阅读spark Documentation。如果您对此有任何疑问,请告诉我。
【讨论】:
嗨,mahesh gupta,(和丹尼尔)愚蠢的问题:使用take
给了我一个数组,然后我必须将其转换为数据框。在我看来这是一个额外的步骤。将此与collect()
进行比较,哪种方法更有效。最后,问题只是一个功能的一部分,我将以数百万计。不幸的是,我是 scala 的新手,不知道如何自己创建一个大而随机的示例。要求:A列是部分排序的(例如,由子集定义排序的数组),B列是完全排序的。正是我不知道如何创建A列。
@Christian 请阅读此umbertogriffo.gitbooks.io/… 您将了解获取与收集【参考方案3】:
我发布此答案是因为它包含 Daniel 建议的解决方案。一旦我通过文献提供的 mahesh gupta 或更多测试,我将更新此答案并就“现实生活”中不同方法的运行时发表评论。
基本问题:
我想将 Spark 数据帧 sdf
的“第一行”复制到另一个 Spark 数据帧 sdfEmpty
。
在 Spark 的分布式世界中,没有明确定义的 first 概念,但由于 orderBy
,可能会实现类似的东西。
一个最小的工作示例:
// create a spark data frame
import org.apache.spark.sql._
val sdf = Seq(
(1, "a"),
(12, "b"),
(234, "b")
).toDF("A", "B")
sdf.show()
+---+---+
| A| B|
+---+---+
| 1| a|
| 2| b|
| 3| b|
+---+---+
// create an empty spark data frame to store the row
// declare it as var, such that I can change it later
var sdfEmpty = spark.createDataFrame(sc.emptyRDD[Row], sdf.schema)
sdfEmpty.show()
+---+---+
| A| B|
+---+---+
+---+---+
// take the "first" row of sdf as a spark data frame
val row = sdf.limit(1).collect()
// combine the two spark data frames
sdfEmpty = sdfEmpty.union(row)
row
是:
row.show()
+---+---+
| A| B|
+---+---+
| 1| a|
+---+---+
** 而sdfEmpty
的结果是:**
+---+---+
| A| B|
+---+---+
| 1| a|
+---+---+
备注:Daniel 给出的解释(参见上面的 cmets).limit(n)
是一个转换 - 它不会被评估,直到像 show 或 collect 这样的操作运行。因此,根据上下文,它可以返回不同的值。要始终使用.limit
的结果,可以将.collect
用于驱动程序并将其用作局部变量。
【讨论】:
以上是关于如何将火花数据帧的“第一”行复制到另一个数据帧?为啥我的最小示例失败了?的主要内容,如果未能解决你的问题,请参考以下文章