PySpark 将 Dataframe 作为额外参数传递给映射
Posted
技术标签:
【中文标题】PySpark 将 Dataframe 作为额外参数传递给映射【英文标题】:PySpark passing Dataframe as extra parameter to map 【发布时间】:2021-08-08 13:32:10 【问题描述】:我想并行化一个 python 列表,在该列表上使用一个映射,并将一个 Dataframe 也传递给映射器函数
def output_age_split(df):
ages= [18, 19, 20, 21, 22]
age_dfs= spark.sparkContext.parallelize(ages).map(lambda x: test(x, df)
# Unsure of type of age_dfs, but should be able to split into the smaller dfs like this somehow
return age_dfs[0], age_dfs[1] ...
def test(age, df):
return df.where(col("age")==age)
这会导致酸洗错误
raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
我应该如何并行化这个操作,以便返回一个 Dataframes 集合?
编辑:df 样本
|age|name|salary|
|---|----|------|
|18 |John|40000 |
|22 |Joseph|60000 |
【问题讨论】:
假设df
是一个单独的数据帧,您是否期望多个数据帧作为output_age_split
的输出?在我看来,test
函数将返回一个数据帧,而df
数据帧将被拆分为 5 个较小的数据帧。那是你要的吗?另外,请提供df
数据框的示例。
@KumarRohit 是的 output_age_split 应该为每个年龄返回数据帧,我想要的是 output_age_split 中的 age_dfs 变量是某种集合,我可以将它分成 5 个较小的数据帧并分别返回,已编辑代码清晰
在答案部分添加了解决方案。希望有帮助!
【参考方案1】:
问题是ages_dfs
不是数据框,而是RDD
。现在,当您在其中应用带有test
函数的map
(返回dataframe
)时,我们最终会遇到一个奇怪的情况,其中ages_dfs
实际上是RDD
类型的PipelinedRDD
,它既不是dataframe
也不是iterable
。
TypeError: 'PipelinedRDD' object is not iterable
您可以尝试以下解决方法,您只需在列表上迭代并创建dataframe
的集合并随意迭代它们。
from pyspark.sql.functions import *
def output_age_split(df):
ages = [18, 19, 20, 21, 22]
result = []
for age in ages:
temp_df = test(age, df)
if(not len(temp_df.head(1)) == 0):
result.append(temp_df)
return result
def test(age, df):
return df.where(col("age")==age)
# +---+------+------+
# |age|name |salary|
# +---+------+------+
# |18 |John |40000 |
# |22 |Joseph|60000 |
# +---+------+------+
df = spark.sparkContext.parallelize(
[
(18, "John", 40000),
(22, "Jpseph", 60000)
]
).toDF(["age", "name", "salary"])
df.show()
result = output_age_split(df)
# Output type is: <class 'list'>
print(f"Output type is: type(result)")
for r in result:
r.show()
# +---+----+------+
# |age|name|salary|
# +---+----+------+
# | 18|John| 40000|
# +---+----+------+
# +---+------+------+
# |age| name|salary|
# +---+------+------+
# | 22|Jpseph| 60000|
# +---+------+------+
我还附上了我工作区的屏幕截图供您参考。
问题:
解决方案:
【讨论】:
嗨@KumarRohit,感谢您的解决方案,不幸的是,这是我想避免的情况,因为此解决方案不能并行运行,由于数据集。这将按顺序过滤每个年龄,我希望找到一个可以并行完成的解决方案 您可以在age
上repartition
您的数据框并执行foreachPartition
,这将在逻辑上拆分您的数据,您仍然可以利用 spark 的并行处理功能。 spark.apache.org/docs/latest/api/python/reference/api/…spark.apache.org/docs/latest/api/python/reference/api/…
你有一个如何使用它的例子吗? forEachPartition 不返回任何内容,我假设这里的相关函数实际上是 rdd.mapPartitions 我曾尝试如下使用它:df= df.repartition("age")
age_dfs = df.rdd.mapPartitions(lambda x: x)
但问题是,它将它作为一个 Rdd 而不是拆分返回,所以它本质上什么都不做
您能否向我提供您想对这些拆分数据集进行的具体转换?
我只想在单独的数据框中返回每个分区以上是关于PySpark 将 Dataframe 作为额外参数传递给映射的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark DataFrame 中添加汇总输出作为新行
PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列
使用 pySpark 将 DataFrame 写入 mysql 表
Pyspark RDD 到具有强制模式的 DataFrame:值错误