pyspark 诱人的行为
Posted
技术标签:
【中文标题】pyspark 诱人的行为【英文标题】:pyspark temptable behaviour 【发布时间】:2018-02-20 05:05:34 【问题描述】:我想实现一个功能,我会有一些基础数据,我会得到增量数据。
我将结合两者并执行一些操作(SQL 查询),成功后我将获得 BASE = BASE + 增量用于下一次运行。
在例外情况下,我的 baseData 将是 BASE_Data(增量不应该是这里的一部分)。
我已经尝试通过下面的代码进行解释。
我对 spark temp 表的行为感到困惑......
# i am reading 2 files and persisting them in MEMORY_ONLY
df = spark.read.csv('BASE_data.csv', header=True)
df.persist()
print(df.count()) #o/p:4
df1 = spark.read.csv('data.csv', header=True)
df1.persist()
print(df1.count()) #o/p:4
# i will register temp tables
df.registerTempTable('BASE_data')
spark.sql('select count(1) from BASE_data').show() # 4 which is fine
# i will append rows from df1 to df(BASE_data) and registered as combined_data
spark.sql("select * from 0".format('BASE_data')).union(df1).registerTempTable('combined_data')
spark.sql('select count(1) from combined_data').show() # 8 which is fine too
# Now i am going to unpersist df1 from memory and also change the variable
df1.unpersist()
df1=[]
spark.sql('select count(1) from combined_data').show()
# o/p=8, i am confused here, it should be 4
# when i unpersisted, spark might try to rebuild df1, by reading that file,
# so to be double sure, i reassign df1 to some empty list.
我需要帮助来了解此行为以及如何实现此功能。
如果有其他方法,我正在计划以下简单的方法
-- 我不想保持 BASE_data 和 Combine_DATA 状态,我可以通过一个 temp_table 定义来实现。 spark.sql("select * from 0".format('BASE_data')).union(df1).registerTempTable('BASE_data')
-- 我不想创建一些在某个时间后将被使用的东西,并且执行会消耗内存。异常时的 BASE_data 应回退到原始 BASE_data,即:应从异常时的 BASE_data 中删除新的附加数据(df1)或通过取消持久化。
如果有不清楚的地方请告诉我,我会尽力解释,谢谢。
try:
# create combine_data by union
# do sql ops
# BASE_DATA = select * from cobine_data
except Exception:
# BASE_data = BASE_Data # Basically do nothing
还帮助我解决“我可以在异常块中针对我可能在 Try-block 中创建的任何(曾经)垃圾进行何种清理”。我真的很关心内存管理。谢谢。
【问题讨论】:
【参考方案1】:您将 df1 与 df(已在名为“BASE_data”的临时表中注册)联合,然后使用名为 combine_data 的数据创建一个表。方法 registerTempTable() 是一个动作,因此当时 DAG(有向无环图)被评估(使用 df 和 temp_table 'BASE_data' 的当前值),因此数据被复制到内存中的不同位置并且现在独立于df1 和 df。此时删除 df1 对 combine_data 中的值没有影响,因为它们已经被评估过。
我不明白为什么您期望第二次计数得到 4 而不是 8 这是正确的。该表是在您执行并集的行上创建的,并且从那时起不会更改,因此结果不会更改。
【讨论】:
我的错,我不确定“当时数据被复制到内存中的不同位置”。但我认为 spark 是懒惰的,为什么在声明时它会进入记忆。 Spark 在触发动作之前是惰性的。注册为表是一个动作,所以在那个时候,所有的东西都被评估并复制到不同的内存位置。 另外,如果答案有帮助,请投票并关闭问题 这里还有一个问题,如果 tempTable 发生在内存中,这是否意味着,如果我有 2GB 数据,那么在临时表 2GB 上坚持 2GB + 即:我将消耗 4GB RAM 内存? ?如果它在内存中,那么在注册临时表后我可以取消持久化数据帧以释放一些内存,在这里纠正我???在临时表的情况下,容错如何工作?我问的问题太多了,但我很想知道所有这些,如果需要时间,请给我一些链接,这也足够了。非常感谢您的帮助。 你的基本逻辑是正确的。当您注册一个表时,数据以 Hive 的列格式保存,添加到字典显着减少了内存消耗,因此即使您的文件是 2gB 作为临时表,它也会更少。执行这些操作时,您始终可以检查内存,以了解各种策略如何影响内存消耗。两个注意事项:首先 registerTemTable 在 Spark 2.0 中已弃用,因此如果您在 Spark 2.0+ 中工作,请使用 createOrReplaceTempView。其次,您确实需要考虑为什么要保留 df。你有什么收获?以上是关于pyspark 诱人的行为的主要内容,如果未能解决你的问题,请参考以下文章
pyspark:在同一列(使用数组)上使用多个 UDF 函数时出现意外行为