为啥 df.limit 在 Pyspark 中不断变化?
Posted
技术标签:
【中文标题】为啥 df.limit 在 Pyspark 中不断变化?【英文标题】:Why does df.limit keep changing in Pyspark?为什么 df.limit 在 Pyspark 中不断变化? 【发布时间】:2016-05-10 19:10:56 【问题描述】:我正在从一些数据框 df
中创建一个数据样本
rdd = df.limit(10000).rdd
这个操作需要相当长的时间(实际上为什么?在 10000 行之后它不能快捷吗?),所以我假设我现在有一个新的 RDD。
但是,当我现在处理 rdd
时,每次访问它时都会出现不同的行。好像它再次重新采样。缓存 RDD 会有所帮助,但肯定不会保存吗?
背后的原因是什么?
更新:这是 Spark 1.5.2 的复制品
from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
print(rdd1.map(lambda row:row.i).reduce(add))
输出是
499500
19955500
49651500
我很惊讶.rdd
没有修复数据。
编辑: 为了表明它比重新执行问题更棘手,这里有一个操作会在 Spark 2.0.0.2.5.0 上产生错误的结果
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join
基本上,无论何时您使用limit
,您的结果都可能是错误的。我的意思不是“只是众多样本中的一个”,而是真的不正确(因为在这种情况下,结果应该始终是 12345)。
【问题讨论】:
如何创建df
?
df
直接从 HDFS parquet 文件中读取。
【参考方案1】:
由于 Spark 是分布式的,因此假设确定性结果通常是不安全的。您的示例采用 DataFrame 的“前”10,000 行。在这里,“第一”的含义存在歧义(因此是不确定的)。这将取决于 Spark 的内部结构。例如,它可能是响应驱动程序的第一个分区。该分区可能会随着网络、数据位置等而改变。
即使你缓存了数据,我仍然不会依赖每次都取回相同的数据,尽管我当然希望它比从磁盘读取更一致。
【讨论】:
当然,在不同的运行之间,我第一次阅读时会得到不同的结果。但是在这里我将它分配给rdd1
,显然这会被重新计算。这基本上意味着一旦你在某处使用limit
,一切都可能是错误的。
你是对的,如果rdd1
没有被缓存,它将被重新计算。我的观点是,您使用limit(N)
函数的唯一保证是您最多可以从RDD 中获得N
元素。无法保证您将获得 哪些 元素,或者正如您所注意到的,这些元素将是相同的。您不能将程序的逻辑建立在保证一致性的基础上。如果您确实需要每次都获得 same 元素,则必须使用其他东西,例如过滤器或全局排序,它们确实有一致性保证。
这很清楚。但这里有一点:我可以打一个电话,但会给出不正确的结果。我只需要在 thr DAG 的两个部分中使用 rdd1 并最后进行最终调用。也许应该更新问题以显示其含义。
你能通过“打一个电话”更具体吗?您可以定义一次 RDD(例如 rdd = df.limit(10000).rdd
)。但是 spark 计算是惰性的,所以直到你调用类似rdd.first()
的东西才会发生计算。那时,spark 或多或少将其解释为“给我来自df
的 10000 个随机行中的第一个元素”。当您稍后调用rdd.first()
时,您会启动另一个计算,这可能会产生与第一次不同的结果。
我觉得还是有误会。不要认为 RDD 会真正实现。也就是说,不要将rdd1
视为 12,345 个整数。将其视为对保证最多包含 12,345 个整数的列表的计算的描述,但没有这些整数是什么的具体性。何时、如何或多久引用 RDD 并不重要,它只是对计算的描述。当您多次引用rdd1
时,您要求计算的输出两次而不能保证一致性。【参考方案2】:
Spark 是惰性的,因此您使用的每个action 都会重新计算limit() 返回的数据。如果底层数据被拆分到多个分区,那么每次评估它时,limit 可能会从不同的分区中提取(即,如果您的数据存储在 10 个 Parquet 文件中,第一个 limit 调用可能从文件 1 中提取,第二个从文件 7,依此类推)。
【讨论】:
【参考方案3】:来自Spark docs:
LIMIT
子句用于限制SELECT
语句返回的行数。一般情况下,此子句与ORDER BY
结合使用,以确保结果具有确定性。
因此,如果您希望对 .limit()
的调用具有确定性,则需要事先对行进行排序。但是有一个问题!如果您按每行没有唯一值的列进行排序,则所谓的“绑定”行(具有相同排序键值的行)将不会被确定地排序,因此.limit()
可能仍然是不确定的。
您有两种解决方法:
确保在排序调用中包含唯一的行 ID。 例如df.orderBy('someCol', 'rowId').limit(n)
如果您只需要单次运行中的确定性结果,您可以简单地缓存限制df.limit(n).cache()
的结果,这样至少该限制的结果不会因为连续的操作调用而改变,否则会重新计算结果limit
并搞砸结果。
【讨论】:
以上是关于为啥 df.limit 在 Pyspark 中不断变化?的主要内容,如果未能解决你的问题,请参考以下文章
为啥'withColumn'在pyspark中需要这么长时间?
PySpark:读取 pyspark 框架中的 csv 数据。为啥它在框架中显示特殊字符?除了使用熊猫之外,以表格形式显示的任何方式[重复]