Pyspark:排序/排序,然后分组和连接字符串
Posted
技术标签:
【中文标题】Pyspark:排序/排序,然后分组和连接字符串【英文标题】:Pyspark : order/sort by then group by and concat string 【发布时间】:2020-04-24 20:05:48 【问题描述】:我有一个这样的数据框:
usr sec scrpt
0 1 5 This
1 2 10 is
2 3 12 a
3 1 7 string
4 2 4 oreo
我正在尝试按用户、秒和 groupby 对用户进行排序/排序,并在那里连接字符串。所以这个表包含了每个用户在哪一秒他说了什么。所以生成的数据框应该看起来像
user concated
1 this string
2 oreo is
3 a
我在下面的python中尝试过,效果很好
df.sort_values(by=['usr','sec'],ascending=[True, True]).groupby(['usr')['scrpt'].apply(lambda x: ','.join(x)).reset_index()
有人可以在 pyspark 中给我类似的吗?
【问题讨论】:
【参考方案1】:来自 Spark-2.4+
在这种情况下使用 array_join
、sort_array
、transform
函数。
#sample dataframe
df=spark.createDataFrame([(1,5,"This"),(2,10,"is"),(3,12,"a"),(1,7,"string"),(2,4,"oreo")],["usr","sec","scrpt"])
df.show()
#+---+---+------+
#|usr|sec| scrpt|
#+---+---+------+
#| 1| 5| This|
#| 2| 10| is|
#| 3| 12| a|
#| 1| 7|string|
#| 2| 4| oreo|
#+---+---+------+
df.groupBy("usr").agg(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ").alias("concated")).orderBy("usr").show(10,False)
df.groupBy("usr").agg(concat_ws(" ",expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated |
#+---+-----------+
#|1 |This string|
#|2 |oreo is |
#|3 |a |
#+---+-----------+
#lower case
df.groupBy("usr").agg(lower(array_join(expr("""transform(sort_array(collect_list(struct(sec,scrpt)),True), x -> x.scrpt)""")," ")).alias("concated")).orderBy("usr").show(10,False)
#+---+-----------+
#|usr|concated |
#+---+-----------+
#|1 |this string|
#|2 |oreo is |
#|3 |a |
#+---+-----------+
【讨论】:
有趣的是sort_array
能够仅基于整数值对array of structs
进行排序。很好的发现
@Shu,您将如何更改代码以独立连接两列中的字符串?【参考方案2】:
您可以使用Window
功能在 PySpark 中完成您想要的。
import pyspark.sql.functions as sf
# Construct a window to construct sentences
sentence_window = Window.partitionBy('usr').orderBy(sf.col('sec').asc())
# Construct a window to get the last sentence. The others will be sentence fragments spoken by the user.
rank_window = Window.partitionBy('usr').orderBy(sf.col('sec').desc())
user_sentences = spark_data_df.select('usr',
sf.collect_list(sf.col('scrpt')).over(sentence_window).alias('sentence'),
sf.rank().over(rank_window).alias('rank'))
user_sentences = user_sentences.filter("rank = 1").drop('rank')
user_sentences = user_sentences.withColumn('sentence', sf.concat_ws(' ', sf.col('sentence')))
user_sentences.show(10, False)
【讨论】:
以上是关于Pyspark:排序/排序,然后分组和连接字符串的主要内容,如果未能解决你的问题,请参考以下文章
ODPS Spark PySpark分组排序打序号并自关联(包含中文乱码问题解决)
连接后如何在 Pyspark Dataframe 中选择和排序多个列