PySpark collect_set 中仅针对字符串列不保留顺序
Posted
技术标签:
【中文标题】PySpark collect_set 中仅针对字符串列不保留顺序【英文标题】:Order is not preserved in PySpark collect_set only for string column 【发布时间】:2019-10-28 16:34:20 【问题描述】:我在 DataFrame 上使用 collect_set 方法并添加 3 列。
我的df如下:
id acc_no acc_name cust_id
1 111 ABC 88
1 222 XYZ 99
下面是sn-p的代码:
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('id').orderBy('acc_no')
df1 = df.withColumn(
'cust_id_new',
F.collect_set(cust_id).over(w)
).withColumn(
'acc_no_new',
F.collect_set(acc_no).over(w)
).withColumn(
'acc_name_new',
F.collect_set(acc_name).over(w)
).drop('cust_id').drop('acc_no').drop('acc_name')
在这种情况下,我的输出如下:
id acc_no acc_name cust_id
1 [111,222] [XYZ,ABC] [88,99]
所以这里 acc_no 和 cust_id 是正确的,但是 acc_name 的顺序是不正确的。 acc_no 111 对应 acc_name ABC,但我们得到的是 XYZ。
谁能告诉我为什么会发生这种情况以及解决方案是什么?
我怀疑这个问题只发生在字符串列,但我可能错了。 请帮忙...
这类似于下面的线程,但我仍然收到错误。
How to maintain sort order in PySpark collect_list and collect multiple lists
【问题讨论】:
set
本质上是无序的
什么版本的pyspark?在 2.4+ 中,您可能可以使用 collect_list
和 array_distinct
。或者在排序之前压缩数组。
我使用的是 spark 2.3。
我想在这里提到的一件事是,出于特定原因,我已将所有列类型转换为字符串。
【参考方案1】:
我们可以在 id 列中使用 row_number 函数,并使用 collect_list 和 sorted_array 来保留顺序。
from pyspark.sql import Window
import pyspark.sql.functions as F
w = Window.partitionBy('id').orderBy('cust_id')
df1 = df.withColumn(
'rn',
row_number(cust_id).over(w)
).groupBy("id").agg(sort_array(collect_list(struct('rn','acc_no','acc_name','cust_id'))).alias('data')) \
.withColumn('grp_acc_no',col('data.acc_no')) \
.withColumn('grp_acc_name',col('data.acc_name')) \
.withColumn('grp_cust_id',col('data.cust_id')) \
.drop('data','acc_no','acc_name','cust_id').show(truncate=False) `````
【讨论】:
以上是关于PySpark collect_set 中仅针对字符串列不保留顺序的主要内容,如果未能解决你的问题,请参考以下文章
基于pyspark中仅一列的两个DataFrame之间的差异[重复]
如何从 pyspark 中的数据框中仅选择 70% 的重新编码?
如何解决 Snowflake 中仅针对任务运行出现的 Stream Not Found 错误?