如何在 PySpark 中的大型 Spark 数据框中对行的每个子集进行映射操作

Posted

技术标签:

【中文标题】如何在 PySpark 中的大型 Spark 数据框中对行的每个子集进行映射操作【英文标题】:How to do map operations on each subset of rows in a big Spark data frame in PySpark 【发布时间】:2017-10-30 02:26:16 【问题描述】:

我正在使用 PySpark,我想做的是:

一个大的 Spark 数据框 df 包含所有记录。我想对这个df中除以'id'列的每个记录子集进行并行计算。我目前能想到的方式如下:(我会用一个简单的例子来说明)

dicts = [
    'id': 1,  'name': 'a',  'score':  100,
    'id': 1,  'name': 'b',  'score':  150,
    'id': 2,  'name': 'c',  'score':  200,
    'id': 2,  'name': 'd',  'score':  300,
]
df = spark.createDataFrame(dicts)

from pyspark.sql.functions import (
    collect_list, 
    struct
)

# df_agg will have the following schema:   id,  a list of structs 
df_agg = df.groupBy('id').agg(
    collect_list(struct(df.columns)).alias('records')
)

但是,当我尝试这样做时

 df_agg.rdd.map(my_func)

其中“my_func”是一些主要进行Spark数据帧计算的函数,我遇到了一些问题,不知道如何进行。 my_func 对一行进行操作,其中 row['records'] 现在保存结构列表。如何将此结构列表转换回 Spark DataFrame?

toDF() 不起作用。我尝试了 spark.createDataFrame(list, schema) ,我什至输入了原始 DF 使用的模式,但它仍然无法正常工作。

我对这些 PySpark 操作比较陌生,如果您能告诉我处理这种情况的正确方法是什么,我将不胜感激。

谢谢!

【问题讨论】:

什么是my_func?错误是什么? 任何想要将上面'records'中的“list_of_struct”转换回Spark数据帧并继续在这个df上运行的函数。一个简单的例子就是写成rdd_new = df_agg.rdd.map(lambda r: spark.createDataFrame(r.records)),当我们执行rdd_new.collect()时,会出现如下错误: Py4JError: 调用 o25.__getnewargs__ 时出错。跟踪:py4j.Py4JException:方法 __getnewargs__([]) 在 py4j.Gateway 的 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) 的 py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) 中不存在。在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 处调用(Gateway.java:272) 在 py4j.GatewayConnection.run(GatewayConnection.java:214) 处 py4j.commands.CallCommand.execute(CallCommand.java:79) ) 在 java.lang.Thread.run(Thread.java:745) ,不要在 cmets 中发布错误消息或长代码 sn-ps - 请编辑和更新帖子! 【参考方案1】:

无法评论您在尝试df_agg.rdd.map(my_func) 时遇到的错误(如果您提供my_func 的示例,我可以试一试)。但是,您提到您无法转换为 DataFrame,因此这是该部分的解决方案:

from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType

schema=StructType(
               [StructField("id", IntegerType(), True), \
                StructField("records", 
                    ArrayType(StructType([StructField("id", IntegerType(), True),\
                        StructField("name", StringType(), True),\
                        StructField("score", IntegerType(), True)])))
               ])

df_agg.rdd.toDF(schema=schema).show(2)

【讨论】:

以上是关于如何在 PySpark 中的大型 Spark 数据框中对行的每个子集进行映射操作的主要内容,如果未能解决你的问题,请参考以下文章

如何避免pyspark中加入操作中的过度洗牌?

我如何让 pandas 使用 spark 集群

如何在 pyspark 中对 spark 数据框中的多列求和?

如何在 pyspark 中对 spark 数据框中的多列求和?

如何删除 Spark 表列中的空格(Pyspark)

如何使用 spark.read.jdbc 读取不同 Pyspark 数据帧中的多个文件