Pyspark groupby 然后在组内排序
Posted
技术标签:
【中文标题】Pyspark groupby 然后在组内排序【英文标题】:Pyspark groupby then sort within group 【发布时间】:2017-08-16 14:33:27 【问题描述】:我有一个包含 id、偏移量、文本的表。假设输入:
id offset text
1 1 hello
1 7 world
2 1 foo
我想要这样的输出:
id text
1 hello world
2 foo
我正在使用:
df.groupby(id).agg(concat_ws("",collect_list(text))
但我不知道如何确保文本中的顺序。我在groupby
数据之前做了sort
,但我听说groupby
可能会洗牌数据。有没有办法在groupby
数据之后在组内做sort
?
【问题讨论】:
【参考方案1】:这将创建一个所需的df:
df1 = sqlContext.createDataFrame([("1", "1","hello"), ("1", "7","world"), ("2", "1","foo")], ("id", "offset" ,"text" ))
display(df1)
那么你可以使用下面的代码,可以进一步优化:
@udf
def sort_by_offset(col):
result =""
text_list = col.split("-")
for i in range(len(text_list)):
text_list[i] = text_list[i].split(" ")
text_list[i][0]=int(text_list[i][0])
text_list = sorted(text_list, key=lambda x: x[0], reverse=False)
for i in range(len(text_list)):
result = result+ " " +text_list[i][1]
return result.lstrip()
df2 = df1.withColumn("offset_text",concat(col("offset"),lit(" "),col("text")))
df3 = df2.groupby(col("id")).agg(concat_ws("-",collect_list(col("offset_text"))).alias("offset_text"))
df4 = df3.withColumn("text",sort_by_offset(col("offset_text")))
display(df4)
最终输出:
【讨论】:
【参考方案2】:添加sort_array
:
from pyspark.sql.functions import sort_array
df.groupby(id).agg(concat_ws("", sort_array(collect_list(text))))
【讨论】:
这不行,我的排序键是偏移的,而不是字符串本身。以上是关于Pyspark groupby 然后在组内排序的主要内容,如果未能解决你的问题,请参考以下文章
ROWNUMBER() OVER( PARTITION BY COL1 ORDER BY COL2)用法,先分组,然后在组内排名,分组计算等