Pyspark:排序/排序,然后分组和连接字符串

Posted

技术标签:

【中文标题】Pyspark:排序/排序,然后分组和连接字符串【英文标题】:Pyspark : order/sort by then group by and concat string 【发布时间】:2020-04-24 20:05:48 【问题描述】:

我有一个这样的数据框:

   usr     sec    scrpt
0  1        5     This
1  2        10      is
2  3        12       a
3  1        7    string
4  2        4      oreo

我正在尝试按用户、秒和 groupby 对用户进行排序/排序,并在那里连接字符串。所以这个表包含了每个用户在哪一秒他说了什么。所以生成的数据框应该看起来像

user   concated
1      this string
2      oreo is
3      a

我在下面的python中尝试过,效果很好

df.sort_values(by=['usr','sec'],ascending=[True, True]).groupby(['usr')['scrpt'].apply(lambda x: ','.join(x)).reset_index()

有人可以在 pyspark 中给我类似的吗?

【问题讨论】:

【参考方案1】:

来自 Spark-2.4+ 在这种情况下使用 array_joinsort_arraytransform 函数。

#sample dataframe

df=spark.createDataFrame([(1,5,"This"),(2,10,"is"),(3,12,"a"),(1,7,"string"),(2,4,"oreo")],["usr","sec","scrpt"])

df.show()
#+---+---+------+
#|usr|sec| scrpt|
#+---+---+------+
#|  1|  5|  This|
#|  2| 10|    is|
#|  3| 12|     a|
#|  1|  7|string|
#|  2|  4|  oreo|
#+---+---+------+

df.groupBy("usr").agg(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ").alias("concated")).orderBy("usr").show(10,False)

df.groupBy("usr").agg(concat_ws(" ",expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated   |
#+---+-----------+
#|1  |This string|
#|2  |oreo is    |
#|3  |a          |
#+---+-----------+

#lower case
df.groupBy("usr").agg(lower(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated   |
#+---+-----------+
#|1  |this string|
#|2  |oreo is    |
#|3  |a          |
#+---+-----------+

【讨论】:

有趣的是sort_array 能够仅基于整数值对array of structs 进行排序。很好的发现 @Shu,您将如何更改代码以独立连接两列中的字符串?【参考方案2】:

您可以使用Window 功能在 PySpark 中完成您想要的。

    import pyspark.sql.functions as sf

    # Construct a window to construct sentences
    sentence_window = Window.partitionBy('usr').orderBy(sf.col('sec').asc())

    # Construct a window to get the last sentence. The others will be sentence fragments spoken by the user.
    rank_window = Window.partitionBy('usr').orderBy(sf.col('sec').desc())

    user_sentences = spark_data_df.select('usr',
                                      sf.collect_list(sf.col('scrpt')).over(sentence_window).alias('sentence'),
                                      sf.rank().over(rank_window).alias('rank'))

     user_sentences = user_sentences.filter("rank = 1").drop('rank')
     user_sentences = user_sentences.withColumn('sentence', sf.concat_ws(' ', sf.col('sentence')))

     user_sentences.show(10, False)

【讨论】:

以上是关于Pyspark:排序/排序,然后分组和连接字符串的主要内容,如果未能解决你的问题,请参考以下文章

ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)

连接后如何在 Pyspark Dataframe 中选择和排序多个列

mysql 等值连接案例之添加筛选分组排序多表

在 Pyspark 中,如何在 partitionBy 和 orderBy 之后进行分组?

聚合和分组数据然后根据列排序

Pyspark groupby 然后在组内排序