为啥 df.limit 在 Pyspark 中不断变化?

Posted

技术标签:

【中文标题】为啥 df.limit 在 Pyspark 中不断变化?【英文标题】:Why does df.limit keep changing in Pyspark?为什么 df.limit 在 Pyspark 中不断变化? 【发布时间】:2016-05-10 19:10:56 【问题描述】:

我正在从一些数据框 df 中创建一个数据样本

rdd = df.limit(10000).rdd

这个操作需要相当长的时间(实际上为什么?在 10000 行之后它不能快捷吗?),所以我假设我现在有一个新的 RDD。

但是,当我现在处理 rdd 时,每次访问它时都会出现不同的行。好像它再次重新采样。缓存 RDD 会有所帮助,但肯定不会保存吗?

背后的原因是什么?

更新:这是 Spark 1.5.2 的复制品

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

输出是

499500
19955500
49651500

我很惊讶.rdd 没有修复数据。

编辑: 为了表明它比重​​新执行问题更棘手,这里有一个操作会在 Spark 2.0.0.2.5.0 上产生错误的结果

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

基本上,无论何时您使用limit,您的结果都可能是错误的。我的意思不是“只是众多样本中的一个”,而是真的不正确(因为在这种情况下,结果应该始终是 12345)。

【问题讨论】:

如何创建df df 直接从 HDFS parquet 文件中读取。 【参考方案1】:

由于 Spark 是分布式的,因此假设确定性结果通常是不安全的。您的示例采用 DataFrame 的“前”10,000 行。在这里,“第一”的含义存在歧义(因此是不确定的)。这将取决于 Spark 的内部结构。例如,它可能是响应驱动程序的第一个分区。该分区可能会随着网络、数据位置等而改变。

即使你缓存了数据,我仍然不会依赖每次都取回相同的数据,尽管我当然希望它比从磁盘读取更一致。

【讨论】:

当然,在不同的运行之间,我第一次阅读时会得到不同的结果。但是在这里我将它分配给rdd1,显然这会被重新计算。这基本上意味着一旦你在某处使用limit,一切都可能是错误的。 你是对的,如果rdd1 没有被缓存,它将被重新计算。我的观点是,您使用limit(N) 函数的唯一保证是您最多可以从RDD 中获得N 元素。无法保证您将获得 哪些 元素,或者正如您所注意到的,这些元素将是相同的。您不能将程序的逻辑建立在保证一致性的基础上。如果您确实需要每次都获得 same 元素,则必须使用其他东西,例如过滤器或全局排序,它们确实有一致性保证。 这很清楚。但这里有一点:我可以打一个电话,但会给出不正确的结果。我只需要在 thr DAG 的两个部分中使用 rdd1 并最后进行最终调用。也许应该更新问题以显示其含义。 你能通过“打一个电话”更具体吗?您可以定义一次 RDD(例如 rdd = df.limit(10000).rdd)。但是 spark 计算是惰性的,所以直到你调用类似rdd.first() 的东西才会发生计算。那时,spark 或多或少将其解释为“给我来自df 的 10000 个随机行中的第一个元素”。当您稍后调用rdd.first() 时,您会启动另一个计算,这可能会产生与第一次不同的结果。 我觉得还是有误会。不要认为 RDD 会真正实现。也就是说,不要将rdd1 视为 12,345 个整数。将其视为对保证最多包含 12,345 个整数的列表的计算的描述,但没有这些整数是什么的具体性。何时、如何或多久引用 RDD 并不重要,它只是对计算的描述。当您多次引用rdd1 时,您要求计算的输出两次而不能保证一致性。【参考方案2】:

Spark 是惰性的,因此您使用的每个action 都会重新计算limit() 返回的数据。如果底层数据被拆分到多个分区,那么每次评估它时,limit 可能会从不同的分区中提取(即,如果您的数据存储在 10 个 Parquet 文件中,第一个 limit 调用可能从文件 1 中提取,第二个从文件 7,依此类推)。

【讨论】:

【参考方案3】:

来自Spark docs:

LIMIT 子句用于限制SELECT 语句返回的行数。一般情况下,此子句与ORDER BY 结合使用,以确保结果具有确定性。

因此,如果您希望对 .limit() 的调用具有确定性,则需要事先对行进行排序。但是有一个问题!如果您按每行没有唯一值的列进行排序,则所谓的“绑定”行(具有相同排序键值的行)将不会被确定地排序,因此.limit() 可能仍然是不确定的。

您有两种解决方法:

确保在排序调用中包含唯一的行 ID。 例如df.orderBy('someCol', 'rowId').limit(n) 如果您只需要单次运行中的确定性结果,您可以简单地缓存限制df.limit(n).cache() 的结果,这样至少该限制的结果不会因为连续的操作调用而改变,否则会重新计算结果limit 并搞砸结果。

【讨论】:

以上是关于为啥 df.limit 在 Pyspark 中不断变化?的主要内容,如果未能解决你的问题,请参考以下文章

为啥'withColumn'在pyspark中需要这么长时间?

在 Pyspark/Hive 中处理不断变化的数据类型

PySpark:读取 pyspark 框架中的 csv 数据。为啥它在框架中显示特殊字符?除了使用熊猫之外,以表格形式显示的任何方式[重复]

为啥 date_format() 在 Pyspark 中返回错误的一周?

为啥要在 PySpark 中导入熊猫?

为啥此 python 代码在 pyspark 中有效,但在 spark-submit 中无效?