如何使用 PySpark 并行化我的文件处理程序

Posted

技术标签:

【中文标题】如何使用 PySpark 并行化我的文件处理程序【英文标题】:How to parallelize my file-processing program using PySpark 【发布时间】:2020-06-03 18:14:07 【问题描述】:

我现在有一个大型 python 项目,其中驱动程序有一个函数,该函数使用 for 循环遍历我的 GCP(谷歌云平台)存储桶上的每个文件。我正在使用 CLI 将作业提交到 GCP 并让作业在 GCP 上运行。

对于在此 for 循环中遍历的每个文件,我正在调用一个函数 parse_file(...) 来解析文件并调用处理此文件的其他函数的序列。

整个项目运行需要几分钟,速度很慢,而且驱动程序还没有使用太多的 PySpark。问题是该文件级 for 循环中的每个 parse_file(...) 都按顺序执行。是否可以使用 PySpark 并行化该文件级 for 循环以对所有这些文件并行运行 parse_file(...) 函数以减少程序执行时间并提高效率?如果是这样,由于程序没有使用 PySpark,是否需要进行大量代码修改才能使其并行化?

所以程序的功能是这样的

# ... some other codes
attributes_table = ....
for obj in gcp_bucket.objects(path):
    if obj.key.endswith('sys_data.txt'):
        #....some other codes
        file_data = (d for d in obj.download().decode('utf-8').split('\n'))
        parse_file(file_data, attributes_table)
        #....some other codes ....

如何使用 PySpark 并行化这部分,而不是一次使用 for 循环遍历文件?

【问题讨论】:

【参考方案1】:

感谢您提出问题。

我建议根据您的 gcp_bucket.objects(path) 创建一个 RDD。

你有你的 SparkContext,所以创建 RDD 应该很简单:my_rdd = sc.parallelize(gcp_bucket.objects(path)

对于外行来说,约定是将 SparkContext 赋值给变量sc。你的 for 循环的内容必须放入一个函数中,我们称之为my_function。你现在已经拥有了所有的碎片。

您的下一步将是这样映射您的函数:

results_dag = my_rdd.map(my_function)
results = results_dag.collect()

回想一下,Spark 执行惰性求值。这就是为什么我们需要在最后执行collect 操作。

其他一些建议。第一个是在 GCP 存储桶中的一小组对象上运行您的代码。了解时间安排。为了促进良好的编码实践,另一个建议是考虑将 for 循环中的操作进一步分解为额外的 RDD。您可以随时将它们链接在一起...

my_rdd = sc.parallelize(gcp_bucket.objects(path)
dag1 = my_rdd.map(function1)
dag2 = dag1.map(function2)
dag3 = dag2.map(function3)
results = dag3.collect()

【讨论】:

非常感谢!我现在就试试这个。 非常感谢您的建议和建议。我一直在尝试你的方法,我确实使用了 collect() 但程序出现了一些错误,尤其是在我有 collect() 的那一行:PicklingError: Could not serialize object: TypeError: can't pickle _thead.lock objects我不确定这个错误是如何产生的.如果我删除 collect(),程序就会运行,但是当我执行 dag1 = my_rdd.map(function1) 时,function1 没有执行,这是因为转换是惰性的,它只是告诉 Spark,一旦一个动作,这实际上就会发生collect() 执行了吗? 另外,“myfunction”是否向驱动程序返回任何内容是否重要?在您的示例中,我的“myfunction”所做的是对传入的文件数据执行一系列操作,因此我认为这些系列操作不一定会将任何内容返回给 Spark 上的 Driver 节点。这是否会使使用 collect() 的意义失效? 解决线程锁问题很复杂。我建议分解您的问题并隔离您想要并行化的代码部分。我会在一小部分路径上测试您的并行化代码,以确保您的代码正常运行。你具体想做的事情我以前做过,没有任何问题。您必须调用 collect 否则 DAG 将不会被评估。我建议看一些 spark 教程。 非常感谢@VanBantam。我将我的问题简化为小块。我现在遇到了一个大问题。我试过了:dag1 = my_rdd.map(lambda file: function1(file)) ``` def function1(file): my_publisher = MyPublisher(attrs) result = function2(my_publisher) ``` 这里my_publisher 调用MyPublisher 构造函数,但程序报告错误Could not serialize object: TypeError: can't pickle _thread.lock objects。问题来自 my_publisher 不可序列化以进行腌制。但是我的程序必须使用my_publisher。有没有办法解决这个问题?

以上是关于如何使用 PySpark 并行化我的文件处理程序的主要内容,如果未能解决你的问题,请参考以下文章

如何在 TeamCity 中正确并行化我的测试套件?

如何使用 PySpark 在桌面本地文件夹的目录中执行文件(pdf、docs、txt、xls)的并行处理?

Pyspark:如何在 HDFS 中并行化多 gz 文件处理

Python / PySpark并行处理示例

Python/PySpark 并行处理示例

Qt 无法弄清楚如何在我的程序中线程化我的返回值