重用pyspark缓存并在for循环中不持久
Posted
技术标签:
【中文标题】重用pyspark缓存并在for循环中不持久【英文标题】:Reusing pyspark cache and unpersist in for loop 【发布时间】:2021-02-12 23:27:57 【问题描述】:我有很多数据要分块取出 - 比如说 3 个块 - 而不是一次将它们全部缓存在内存中。但是,我想在之后同时保存它(操作)。
这是当前的简化策略:
for query in [query1,query2,query3]:
df = spark.sql(query)
df.cache()
df1 = df.filter('a')
df2 = df.filter('b')
final_output_1 = final_output_1.join(df1)
final_output_2 = final_output_2.join(df2)
df.unpersist()
final_output_1.write.saveAsTable()
final_output_2.write.saveAsTable()
所以第一个问题: unpersist()
在这里不起作用,因为尚未对 df
采取任何行动?
第二个问题:当我在 for 循环中重用 df
变量时,df.cache()
在这里如何工作?我知道它是不可变的,所以它会复制,但unpersist()
真的会清除该内存吗?
【问题讨论】:
【参考方案1】:当您想要一次又一次地重复使用数据帧时,Spark 中会使用缓存,
例如:映射表
一旦你缓存了 df,你需要一个动作操作来将数据物理移动到内存,因为 spark 是基于延迟执行的。
你的情况
df.cache()
将无法按预期工作,因为在此之后您没有执行任何操作。
要使缓存起作用,您需要运行 df.count() 或 df.show() 或任何其他操作以将数据移动到内存中,否则您的数据不会被移动到内存中,您将不会获得任何优势。所以 df.unpersist() 也是多余的。
第一个问题:
不,您的 df.cache() 和 df.unpersist() 将不起作用,因为一开始没有缓存数据,因此它们没有什么可以取消的。 p>
第二个问题:
是的,您可以使用相同的变量名称,如果执行了操作,数据将被缓存,并且在您的操作之后 df.unpersist() 将取消持久化每个循环中的数据。 所以前一个 DF 与下一个循环中的下一个 DF 没有连接。 正如您所说,它们是 不可变的 ,并且由于您在每个循环中将新查询分配给相同的变量,因此它充当新的 DF(与以前的 DF 无关)。
根据您的代码,我认为您不需要进行缓存,因为您只执行一项操作。
参考When to cache a DataFrame?和If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?
【讨论】:
感谢您的回复。我使用缓存的原因是因为实际上我是从df
创建三个数据帧而不是两个,所以如果df
没有被缓存,那么它必须获得df
数据3 次。如果在 for 循环中的某处有一个动作操作以确保 df
被缓存,那么听起来这段代码会按预期工作,所以这就是我要做的以上是关于重用pyspark缓存并在for循环中不持久的主要内容,如果未能解决你的问题,请参考以下文章