如何使用 map 函数正确并行运行 pyspark 代码

Posted

技术标签:

【中文标题】如何使用 map 函数正确并行运行 pyspark 代码【英文标题】:How do I correctly run pyspark code in parallel using the map function 【发布时间】:2021-09-08 11:47:50 【问题描述】:

我在 emr 集群上将以下代码作为 pyspark 代码运行时遇到问题。我实际上是在尝试创建除以 2 个主键的数据组。切片是主键列的每个唯一组合,完整数据是包含整个数据作为数据框对象的数据框。该对象是使用 sqlContext 构建的。以下在并行运行时失败,抱怨 pickle 库出现序列化错误。具体_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

完成我想做的事情的最佳方法是什么?

def main(slices, fullData):
    jsons = slices.rdd.map(
         lambda i: handleSlices(i, fullData)).collect()  # Run in parallel
    # jsons = [handleSlices(i, fullData)
    #    for i in slices.collect()]  # run in serial
    return jsons

def handleSlices(row, fullData):
    entries = fullData.filter((col("fullData.vehicle_id") == row.vehicle_id)
                              & (col("fullData.start_time") == row.start_time)).select(
        "fullData.latitude", "fullData.longitude")

    folder = "/playback/" + row.vehicle_id + "/"
    fileName = folder + row.start_time.replace(" ", "_").replace(":", "-")

    return (fileName, entries)

【问题讨论】:

pickle 在你想并行运行某些东西时会出现一些问题。我过去也遇到过这种情况。检查***.com/questions/39897394/…也许我错了。但只是想让你知道。 【参考方案1】:

您的代码中有两个主要问题:

Spark 本身是并行的,因此您无需执行特殊操作即可使其并行运行

当使用map 时,代码在执行程序中运行,而不是在驱动程序中运行,这意味着您不能在函数中使用 DataFrame 或 RDD 引用(它们仅在驱动程序上工作)。

假设 slices 和 fullData 都是 DataFrame,这个实现应该可以工作并提供一个可以稍后处理的并行数据帧


from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def main(slices: DataFrame, fullData: DataFrame) -> DataFrame:
   df = (fullData.join(slices, ["vehicle_id", "start_time"], "inner")
             .select("vehicle_id", "start_time", "latitude", "longitude")
             .withColumn("file_name", F.concat_ws("/",
                                                  F.lit("/playback"),
                                                  F.col("vehicle_id"),
                                                  F.regexp_replace(F.regexp_replace("start_time", " ", "_"), ":", "-")))
             .groupBy("file_name")
             .agg(F.collect_list(F.struct("latitude","longitude")).alias("positions"))
   )
   return df

【讨论】:

以上是关于如何使用 map 函数正确并行运行 pyspark 代码的主要内容,如果未能解决你的问题,请参考以下文章

如何更改pyspark中的并行任务数

Pyspark 数据分布

如何使用 PySpark 并行化我的文件处理程序

如何正确地开始在多个参数上并行执行两个函数?

如何使用 pyspark 并行插入 Hive

如何使 pyspark 作业在多个节点上正确并行化并避免内存问题?