Python/PySpark 并行处理示例

Posted

技术标签:

【中文标题】Python/PySpark 并行处理示例【英文标题】:Python/PySpark parallel processing example 【发布时间】:2018-03-03 21:52:16 【问题描述】:

我无法理解如何在我的 Python 脚本中利用并行处理的强大功能。

我有十亿行的食品数据库,例如:

date,item,category,number_sold
2017-01-01,bagel,bread,10
2017-03-03,skittles,candy,5

有 100 个类别。 (面包、糖果等)我的脚本:

1. Iterates list of 100 categories
2. Filter pyspark dataframe for the subset (e.g. category == 'bread')
3. Run aggregate calculations on subset
4. Generate 1 row of stats and appends to summary file

我需要在迭代循环中添加什么来触发多处理? pyspark 会自动执行此操作吗?当我只使用 Pandas 运行它时,脚本在等待查询每个类别子集时被暂停(不做任何事情)。理想情况下,该过程应该为一个类别过滤数据帧并同时为另一个类别运行计算。

提前致谢!

【问题讨论】:

【参考方案1】:

第一个短列表项(第 2 步)并在单独的线程中处理它们。多线程的实现在 python 中非常容易。将每份报告写在一个单独的文件中,最后将所有文件进行margin,制作最终报告。

【讨论】:

你能详细说明一下吗?脚本是否需要首先映射与 100 个类别中的每一个相关的数据?然后让计算发生? 您需要短路输出吗?如果是这样,您必须先执行第 2 步。否则将文件拆分为多个文件。例如,将文件拆分为 20 个文件并创建 20 个线程并将每个文件分配给每个线程。每个线程生成一份报告,因此 20 个线程将生成 20 个报告,最后将所有报告合并到一个文件中。如果您要插入数据库表,则无需合并。

以上是关于Python/PySpark 并行处理示例的主要内容,如果未能解决你的问题,请参考以下文章

《并行计算》期末总结

JAVA并行框架Fork/Join:简介和代码示例

在改造并行网络调用中处理后台刷新令牌调用

pathos:并行处理选项 - 有人可以解释其中的差异吗?

SpringBoot开发案例之多任务并行+线程池处理

一个简单示例看懂.Net 并行编程