重用pyspark缓存并在for循环中不持久

Posted

技术标签:

【中文标题】重用pyspark缓存并在for循环中不持久【英文标题】:Reusing pyspark cache and unpersist in for loop 【发布时间】:2021-02-12 23:27:57 【问题描述】:

我有很多数据要分块取出 - 比如说 3 个块 - 而不是一次将它们全部缓存在内存中。但是,我想在之后同时保存它(操作)。

这是当前的简化策略:

for query in [query1,query2,query3]:

    df = spark.sql(query)

    df.cache()

    df1 = df.filter('a')
    df2 = df.filter('b')

    final_output_1 = final_output_1.join(df1)
    final_output_2 = final_output_2.join(df2)

    df.unpersist()


final_output_1.write.saveAsTable()
final_output_2.write.saveAsTable()

所以第一个问题: unpersist() 在这里不起作用,因为尚未对 df 采取任何行动?

第二个问题:当我在 for 循环中重用 df 变量时,df.cache() 在这里如何工作?我知道它是不可变的,所以它会复制,但unpersist() 真的会清除该内存吗?

【问题讨论】:

【参考方案1】:

当您想要一次又一次地重复使用数据帧时,Spark 中会使用缓存,

例如:映射表

一旦你缓存了 df,你需要一个动作操作来将数据物理移动到内存,因为 spark 是基于延迟执行的。

你的情况

df.cache()

将无法按预期工作,因为在此之后您没有执行任何操作。

要使缓存起作用,您需要运行 df.count()df.show() 或任何其他操作以将数据移动到内存中,否则您的数据不会被移动到内存中,您将不会获得任何优势。所以 df.unpersist() 也是多余的。

第一个问题:

不,您的 df.cache()df.unpersist() 将不起作用,因为一开始没有缓存数据,因此它们没有什么可以取消的。 p>

第二个问题:

是的,您可以使用相同的变量名称,如果执行了操作,数据将被缓存,并且在您的操作之后 df.unpersist() 将取消持久化每个循环中的数据。 所以前一个 DF 与下一个循环中的下一个 DF 没有连接。 正如您所说,它们是 不可变的 ,并且由于您在每个循环中将新查询分配给相同的变量,因此它充当新的 DF(与以前的 DF 无关)。

根据您的代码,我认为您不需要进行缓存,因为您只执行一项操作。

参考When to cache a DataFrame?和If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?

【讨论】:

感谢您的回复。我使用缓存的原因是因为实际上我是从df 创建三个数据帧而不是两个,所以如果df 没有被缓存,那么它必须获得df 数据3 次。如果在 for 循环中的某处有一个动作操作以确保 df 被缓存,那么听起来这段代码会按预期工作,所以这就是我要做的

以上是关于重用pyspark缓存并在for循环中不持久的主要内容,如果未能解决你的问题,请参考以下文章

如何在 for 循环中附加 pyspark 数据帧?

在 pyspark 中同时而不是按顺序运行 for 循环

如何在 pyspark 的 For 循环中插入自定义函数?

删除或加速 PySpark 中的显式 for 循环

如何使用 PySpark 进行嵌套的 for-each 循环

在 for 循环中使用 udf 在 Pyspark 中创建多个列