Pyspark中组连接函数的持久循环数据帧
Posted
技术标签:
【中文标题】Pyspark中组连接函数的持久循环数据帧【英文标题】:Persisting loop dataframes for group concat functions in Pyspark 【发布时间】:2022-01-07 13:51:22 【问题描述】:我正在尝试将 spark 数据框聚合到唯一 ID,并从给定排序列的该 ID 的该列中选择第一个非空值。基本上复制了 mysql 的 group_concat 函数。
Spark SQL replacement for MySQL's GROUP_CONCAT aggregate function 此处的 SO 帖子对于为单个列复制 group_concat 非常有帮助。我需要为列的动态列表执行此操作。
我宁愿不必为每一列复制此代码(十几个 +,将来可能是动态的),所以我试图在一个循环中实现(我知道在 spark 中皱眉!)给定列名列表.循环成功运行,但即使中间 df 被缓存/持久化(re: Cacheing and Loops in (Py)Spark),之前的迭代也不会持续存在。
任何帮助、指针或更优雅的非循环解决方案将不胜感激(如果有更合适的函数式编程方法,不要害怕尝试一些 scala)!
鉴于以下df:
unique_id | row_id | first_name | last_name | middle_name | score |
---|---|---|---|---|---|
1000000 | 1000002 | Simmons | Bonnie | Darnell | 88 |
1000000 | 1000006 | Dowell | Crawford | Anne | 87 |
1000000 | 1000007 | NULL | Eric | Victor | 89 |
1000000 | 1000000 | Zachary | Fields | Narik | 86 |
1000000 | 1000003 | NULL | NULL | Warren | 92 |
1000000 | 1000008 | Paulette | Ronald | Irvin | 85 |
group_column = "unique_id"
concat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
.withColumn("sorted_list",col("collect_list."+str(i)))\
.withColumn("first_item",slice(col("sorted_list"),1,1))\
.withColumn(i,concat_ws(",",col("first_item")))\
.drop("collect_list")\
.drop("sorted_list")\
.drop("first_item")
print(i)
df_final=df_final.join(df_helper,group_column,"inner")
df_final.cache()
df_final.display() #I'm using databricks
我的结果如下:
unique_id | middle_name |
---|---|
1000000 | Warren |
我想要的结果是:
unique_id | first_name | last_name | middle_name |
---|---|---|---|
1000000 | Simmons | Eric | Warren |
Second set of tables if they don't pretty print above
【问题讨论】:
您需要的所有工具都在编辑器工具栏中。也不需要 html 换行符。 @Lamanus 感谢您的编辑帮助。 【参考方案1】:我找到了自己问题的解决方案:在我加入数据框时添加.collect()
调用,而不是persist()
或cache()
;这将产生预期的数据帧。
group_column = "unique_id"
enter code hereconcat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
df_helper=df
df_helper=df_helper.groupBy(group_column)\
.agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
.withColumn("sorted_list",col("collect_list."+str(i)))\
.withColumn("first_item",slice(col("sorted_list"),1,1))\
.withColumn(i,concat_ws(",",col("first_item")))\
.drop("collect_list")\
.drop("sorted_list")\
.drop("first_item")
print(i)
df_final=df_final.join(df_helper,group_column,"inner")
df_final.collect()
df_final.display() #I'm using databricks
【讨论】:
以上是关于Pyspark中组连接函数的持久循环数据帧的主要内容,如果未能解决你的问题,请参考以下文章