如何使用 map 函数正确并行运行 pyspark 代码
Posted
技术标签:
【中文标题】如何使用 map 函数正确并行运行 pyspark 代码【英文标题】:How do I correctly run pyspark code in parallel using the map function 【发布时间】:2021-09-08 11:47:50 【问题描述】:我在 emr 集群上将以下代码作为 pyspark 代码运行时遇到问题。我实际上是在尝试创建除以 2 个主键的数据组。切片是主键列的每个唯一组合,完整数据是包含整个数据作为数据框对象的数据框。该对象是使用 sqlContext 构建的。以下在并行运行时失败,抱怨 pickle 库出现序列化错误。具体_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
完成我想做的事情的最佳方法是什么?
def main(slices, fullData):
jsons = slices.rdd.map(
lambda i: handleSlices(i, fullData)).collect() # Run in parallel
# jsons = [handleSlices(i, fullData)
# for i in slices.collect()] # run in serial
return jsons
def handleSlices(row, fullData):
entries = fullData.filter((col("fullData.vehicle_id") == row.vehicle_id)
& (col("fullData.start_time") == row.start_time)).select(
"fullData.latitude", "fullData.longitude")
folder = "/playback/" + row.vehicle_id + "/"
fileName = folder + row.start_time.replace(" ", "_").replace(":", "-")
return (fileName, entries)
【问题讨论】:
pickle 在你想并行运行某些东西时会出现一些问题。我过去也遇到过这种情况。检查***.com/questions/39897394/…也许我错了。但只是想让你知道。 【参考方案1】:您的代码中有两个主要问题:
Spark 本身是并行的,因此您无需执行特殊操作即可使其并行运行
当使用map
时,代码在执行程序中运行,而不是在驱动程序中运行,这意味着您不能在函数中使用 DataFrame 或 RDD 引用(它们仅在驱动程序上工作)。
假设 slices 和 fullData 都是 DataFrame,这个实现应该可以工作并提供一个可以稍后处理的并行数据帧
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def main(slices: DataFrame, fullData: DataFrame) -> DataFrame:
df = (fullData.join(slices, ["vehicle_id", "start_time"], "inner")
.select("vehicle_id", "start_time", "latitude", "longitude")
.withColumn("file_name", F.concat_ws("/",
F.lit("/playback"),
F.col("vehicle_id"),
F.regexp_replace(F.regexp_replace("start_time", " ", "_"), ":", "-")))
.groupBy("file_name")
.agg(F.collect_list(F.struct("latitude","longitude")).alias("positions"))
)
return df
【讨论】:
以上是关于如何使用 map 函数正确并行运行 pyspark 代码的主要内容,如果未能解决你的问题,请参考以下文章