在pyspark中替换循环到并行进程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在pyspark中替换循环到并行进程相关的知识,希望对你有一定的参考价值。

我在我的脚本中使用for循环来为size_DF(数据框)的每个元素调用一个函数,但这需要花费很多时间。我尝试通过地图删除for循环,但我没有得到任何输出。 size_DF是我从表中获取的大约300个元素的列表。

用于:

import call_functions

newObject = call_functions.call_functions_class()
size_RDD = sc.parallelize(size_DF) 

if len(size_DF) == 0:
    print "No record present in the truncated list"
else:

    for row in size_DF:
        length = row[0]
        print "length: ", length
        insertDF = newObject.full_item(sc, dataBase, length, end_date)

使用地图

if len(size_DF) == 0:
    print "No record present in the list"
else:
    size_RDD.mapPartition(lambda l: newObject.full_item(sc, dataBase, len(l[0]), end_date))

newObject.full_item(sc,dataBase,len(l [0]),end_date)在full_item()中 - 我正在做一些select ope并连接2个表并将数据插入表中。

请帮助我,让我知道我做错了什么。

答案

pyspark.rdd.RDD.mapPartition方法被懒惰地评估。通常为了强制进行评估,您可以在返回的延迟RDD实例上返回一个值。

有更高级别的功能负责强制评估RDD值。例如pyspark.rdd.RDD.foreach

由于您并不真正关心操作的结果,您可以使用pyspark.rdd.RDD.foreach而不是pyspark.rdd.RDD.mapPartition

def first_of(it):
    for first in it:
        return first
    return []

def insert_first(it):
    first = first_of(it)
    item_count = len(first)
    newObject.full_item(sc, dataBase, item_count, end_date)


if len(size_DF) == 0:
    print('No record present in the truncated list')
else:
    size_DF.forEach(insert_first)

以上是关于在pyspark中替换循环到并行进程的主要内容,如果未能解决你的问题,请参考以下文章

在 Python 多处理进程中运行较慢的 OpenCV 代码片段

如何在 AWS Glue PySpark 中运行并行线程?

在 PySpark SQL 中并行执行读写 API 调用

pyspark 在 for 循环下的每个进程之后附加非常大的多个数据帧(例如:在每日 ETL 之后附加)

如何将项目添加到字典“并行循环安全”

在 PySpark 中涉及带有管道的子进程的映射步骤失败