如何在 PySpark 中与爆炸相反?

Posted

技术标签:

【中文标题】如何在 PySpark 中与爆炸相反?【英文标题】:How to do opposite of explode in PySpark? 【发布时间】:2017-04-11 23:17:54 【问题描述】:

假设我有一个DataFrame,其中有一列是用户,另一列是他们写的词:

Row(user='Bob', word='hello')
Row(user='Bob', word='world')
Row(user='Mary', word='Have')
Row(user='Mary', word='a')
Row(user='Mary', word='nice')
Row(user='Mary', word='day')

我想将word 列聚合成一个向量:

Row(user='Bob', words=['hello','world'])
Row(user='Mary', words=['Have','a','nice','day'])

看来我不能使用任何 Sparks 分组函数,因为它们需要后续的聚合步骤。我的用例是我想将这些数据输入Word2Vec,而不是使用其他 Spark 聚合。

【问题讨论】:

【参考方案1】:

感谢@titipat 提供 RDD 解决方案。在我的帖子发布后不久,我确实意识到 实际上是一个使用 collect_set(或 collect_list)的 DataFrame 解决方案:

from pyspark.sql import Row
from pyspark.sql.functions import collect_set
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)
group_user = df.groupBy('user').agg(collect_set('word').alias('words'))
print(group_user.collect())

>[Row(user='Mary', words=['Have', 'nice', 'day', 'a']), Row(user='Bob', words=['world', 'hello'])]

【讨论】:

很好的解决方案埃文!我也打算发布 pyspark 数据框解决方案,但你已经想到了 :) collect_list 是否保留订单? @Evan 我知道用 collet_list 做 oderby 不会保留顺序。 @Evan 这是另一种情况。不遵守 orderby 顺序。我知道这是因为它咬了我,但我永远无法弄清楚 collet_list 是否保留了原始顺序。如果列表来自跨分区的数据会发生什么?该行为没有得到很好的记录。 在我的情况下,词袋顺序并不重要,但我确信它在某些应用程序中可能很重要。我想作为一项规则,除非文档明确说明,否则我不会假设保留顺序。【参考方案2】:
from pyspark.sql import functions as F

df.groupby("user").agg(F.collect_list("word"))

【讨论】:

@lfvv collect_set 删除重复项。【参考方案3】:

这是使用rdd 的解决方案。

from pyspark.sql import Row
rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
group_user = rdd.groupBy(lambda x: x.user)
group_agg = group_user.map(lambda x: Row(**'user': x[0], 'word': [t.word for t in x[1]]))

输出来自group_agg.collect()

[Row(user='Bob', word=['hello', 'world']),
Row(user='Mary', word=['Have', 'a', 'nice', 'day'])]

【讨论】:

【参考方案4】:

从 spark 2.3 版本开始,我们现在有了 Pandas UDF(又名 Vectorized UDF)。下面的函数将完成 OP 的任务......使用这个函数的一个好处是保证订单被保留。在许多情况下,例如时间序列分析,顺序是必不可少的。

import pandas as pd
import findspark

findspark.init()
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, ArrayType

spark = SparkSession.builder.appName('test_collect_array_grouped').getOrCreate()

def collect_array_grouped(df, groupbyCols, aggregateCol, outputCol):
    """
    Aggregate function: returns a new :class:`DataFrame` such that for a given column, aggregateCol,
    in a DataFrame, df, collect into an array the elements for each grouping defined by the groupbyCols list.
    The new DataFrame will have, for each row, the grouping columns and an array of the grouped
    values from aggregateCol in the outputCol.

    :param groupbyCols: list of columns to group by.
            Each element should be a column name (string) or an expression (:class:`Column`).
    :param aggregateCol: the column name of the column of values to aggregate into an array
            for each grouping.
    :param outputCol: the column name of the column to output the aggregeted array to.
    """
    groupbyCols = [] if groupbyCols is None else groupbyCols
    df = df.select(groupbyCols + [aggregateCol])
    schema = df.select(groupbyCols).schema
    aggSchema = df.select(aggregateCol).schema
    arrayField = StructField(name=outputCol, dataType=ArrayType(aggSchema[0].dataType, False))
    schema = schema.add(arrayField)
    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def _get_array(pd_df):
        vals = pd_df[groupbyCols].iloc[0].tolist()
        vals.append(pd_df[aggregateCol].values)
        return pd.DataFrame([vals])
    return df.groupby(groupbyCols).apply(_get_array)

rdd = spark.sparkContext.parallelize([Row(user='Bob', word='hello'),
                                      Row(user='Bob', word='world'),
                                      Row(user='Mary', word='Have'),
                                      Row(user='Mary', word='a'),
                                      Row(user='Mary', word='nice'),
                                      Row(user='Mary', word='day')])
df = spark.createDataFrame(rdd)

collect_array_grouped(df, ['user'], 'word', 'users_words').show()

+----+--------------------+
|user|         users_words|
+----+--------------------+
|Mary|[Have, a, nice, day]|
| Bob|      [hello, world]|
+----+--------------------+

【讨论】:

【参考方案5】:

你有一个本机聚合函数,collect_set(文档here)。

那么,你可以使用:

from pyspark.sql import functions as F
df.groupby("user").agg(F.collect_set("word"))

【讨论】:

以上是关于如何在 PySpark 中与爆炸相反?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 2.2 爆炸删除空行(如何实现explode_outer)? [复制]

如何在 pyspark 中创建新列,其中条件取决于列的后续值?

在 Pyspark 中爆炸不是数组的结构列

如何在 Scala Spark 项目中使用 PySpark UDF?

PYSPARK:-在数据框中爆炸数组而不丢失空值:'DataFrame'对象没有属性'_get_object_id'

pyspark中的条件爆炸