我们如何在 pyspark 的不同模块中使用相同的连接数据框用法

Posted

技术标签:

【中文标题】我们如何在 pyspark 的不同模块中使用相同的连接数据框用法【英文标题】:How can we use a same joined dataframe usage in different module in pyspark 【发布时间】:2020-11-06 19:15:34 【问题描述】:

我们有从多个源表读取数据并根据业务规则连接并应用映射的场景。在某些情况下,从几个表中读取的数据可用于多个目标加载。因此,为避免在不同模块中运行时多次读取相同的数据,是否可以选择如何在不同的 pyspark 模块中使用相同的数据帧输出。

df1 = spark.sql(select * from table1)
df2 = spark.sql(select * from table2)

df_out = df1.join(df2, ['customer_id'], inner)

我想在 pyspark_module1.py 和 pyspark_module2.py 中使用 df_out,有什么方法可以通过不多次读取相同的数据来实现,因为我们通过调度工具并行运行程序。

【问题讨论】:

将结果保存到其他地方并使用? 【参考方案1】:

您可以通过bucketBy 在一定程度上加入和预处理并保存数据,然后在此预加入和预处理的数据上并行运行下游。

此https://luminousmen.com/post/the-5-minute-guide-to-using-bucketing-in-pyspark 与 Spark 文档一样提供指导。

【讨论】:

【参考方案2】:

这就是 cache()persist() 发挥作用的地方,cache() 将您的数据保存到内存中(即默认)直到 spark 应用程序正在执行并且 persist() 允许您将选择扩展到磁盘/内存等。完整读取 here 和 here

现在,回答您的问题 - 您可能需要根据您实施 catchpersist

的方式重新审视您的应用程序逻辑

如果,你写一个 main function 并且你的 module-1module-2 函数调用该主函数,甚至缓存到内存后,它可能不是有益的,因为每次发生函数调用时,它都会为该特定函数调用调用底层逻辑,因此请尝试您是否可以编写相同的代码并利用 caching.

【讨论】:

以上是关于我们如何在 pyspark 的不同模块中使用相同的连接数据框用法的主要内容,如果未能解决你的问题,请参考以下文章

如何在pyspark中连接具有相同名称的列的值

在 Pyspark 中,我如何比较两列并在它们不相同时使用 x

在 PySpark 中写入镶木地板的问题

没有模块名称pyspark错误

PySpark 根据特定列重新分区

如何在pyspark中使用具有相同客户ID的新数据框的值更新一行旧数据框