PySpark 将 Dataframe 作为额外参数传递给映射

Posted

技术标签:

【中文标题】PySpark 将 Dataframe 作为额外参数传递给映射【英文标题】:PySpark passing Dataframe as extra parameter to map 【发布时间】:2021-08-08 13:32:10 【问题描述】:

我想并行化一个 python 列表,在该列表上使用一个映射,并将一个 Dataframe 也传递给映射器函数

def output_age_split(df):
   ages= [18, 19, 20, 21, 22]
   age_dfs= spark.sparkContext.parallelize(ages).map(lambda x: test(x, df)
# Unsure of type of age_dfs, but should be able to split into the smaller dfs like this somehow
return age_dfs[0], age_dfs[1] ...

def test(age, df):
   return df.where(col("age")==age)

这会导致酸洗错误

  raise pickle.PicklingError(msg)
  _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects

我应该如何并行化这个操作,以便返回一个 Dataframes 集合?

编辑:df 样本

|age|name|salary|
|---|----|------|
|18 |John|40000 |
|22 |Joseph|60000 |

【问题讨论】:

假设df 是一个单独的数据帧,您是否期望多个数据帧作为output_age_split 的输出?在我看来,test 函数将返回一个数据帧,而df 数据帧将被拆分为 5 个较小的数据帧。那是你要的吗?另外,请提供df 数据框的示例。 @KumarRohit 是的 output_age_split 应该为每个年龄返回数据帧,我想要的是 output_age_split 中的 age_dfs 变量是某种集合,我可以将它分成 5 个较小的数据帧并分别返回,已编辑代码清晰 在答案部分添加了解决方案。希望有帮助! 【参考方案1】:

问题是ages_dfs 不是数据框,而是RDD。现在,当您在其中应用带有test 函数的map(返回dataframe)时,我们最终会遇到一个奇怪的情况,其中ages_dfs 实际上是RDD 类型的PipelinedRDD,它既不是dataframe 也不是iterable

TypeError: 'PipelinedRDD' object is not iterable

您可以尝试以下解决方法,您只需在列表上迭代并创建dataframe 的集合并随意迭代它们。

from pyspark.sql.functions import *

def output_age_split(df):
  ages = [18, 19, 20, 21, 22]
  result = []
  for age in ages:
    temp_df = test(age, df)
    if(not len(temp_df.head(1)) == 0):
      result.append(temp_df)
  return result 

def test(age, df):
   return df.where(col("age")==age)

# +---+------+------+
# |age|name  |salary|
# +---+------+------+
# |18 |John  |40000 |
# |22 |Joseph|60000 |
# +---+------+------+
df = spark.sparkContext.parallelize(
  [
    (18, "John", 40000),
    (22, "Jpseph", 60000)
  ]
).toDF(["age", "name", "salary"])
                                                      
df.show()

result = output_age_split(df)
# Output type is: <class 'list'>

print(f"Output type is: type(result)")
for r in result:
  r.show()

# +---+----+------+
# |age|name|salary|
# +---+----+------+
# | 18|John| 40000|
# +---+----+------+

# +---+------+------+
# |age|  name|salary|
# +---+------+------+
# | 22|Jpseph| 60000|
# +---+------+------+

我还附上了我工作区的屏幕截图供您参考。

问题:

解决方案:

【讨论】:

嗨@KumarRohit,感谢您的解决方案,不幸的是,这是我想避免的情况,因为此解决方案不能并行运行,由于数据集。这将按顺序过滤每个年龄,我希望找到一个可以并行完成的解决方案 您可以在agerepartition 您的数据框并执行foreachPartition,这将在逻辑上拆分您的数据,您仍然可以利用 spark 的并行处理功能。 spark.apache.org/docs/latest/api/python/reference/api/…spark.apache.org/docs/latest/api/python/reference/api/… 你有一个如何使用它的例子吗? forEachPartition 不返回任何内容,我假设这里的相关函数实际上是 rdd.mapPartitions 我曾尝试如下使用它:df= df.repartition("age") age_dfs = df.rdd.mapPartitions(lambda x: x) 但问题是,它将它作为一个 Rdd 而不是拆分返回,所以它本质上什么都不做 您能否向我提供您想对这些拆分数据集进行的具体转换? 我只想在单独的数据框中返回每个分区

以上是关于PySpark 将 Dataframe 作为额外参数传递给映射的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark DataFrame 中添加汇总输出作为新行

PySpark。将 Dataframe 传递给 pandas_udf 并返回一个系列

使用 pySpark 将 DataFrame 写入 mysql 表

Pyspark RDD 到具有强制模式的 DataFrame:值错误

PySpark 将算法转换为 UDF 并将其应用于 DataFrame

pyspark - 将两个数据帧与目标中的额外列合并