Pyspark中组连接函数的持久循环数据帧

Posted

技术标签:

【中文标题】Pyspark中组连接函数的持久循环数据帧【英文标题】:Persisting loop dataframes for group concat functions in Pyspark 【发布时间】:2022-01-07 13:51:22 【问题描述】:

我正在尝试将 spark 数据框聚合到唯一 ID,并从给定排序列的该 ID 的该列中选择第一个非空值。基本上复制了 mysql 的 group_concat 函数。

Spark SQL replacement for MySQL's GROUP_CONCAT aggregate function 此处的 SO 帖子对于为单个列复制 group_concat 非常有帮助。我需要为列的动态列表执行此操作。

我宁愿不必为每一列复制此代码(十几个 +,将来可能是动态的),所以我试图在一个循环中实现(我知道在 spark 中皱眉!)给定列名列表.循环成功运行,但即使中间 df 被缓存/持久化(re: Cacheing and Loops in (Py)Spark),之前的迭代也不会持续存在。

任何帮助、指针或更优雅的非循环解决方案将不胜感激(如果有更合适的函数式编程方法,不要害怕尝试一些 scala)!

鉴于以下df:

unique_id row_id first_name last_name middle_name score
1000000 1000002 Simmons Bonnie Darnell 88
1000000 1000006 Dowell Crawford Anne 87
1000000 1000007 NULL Eric Victor 89
1000000 1000000 Zachary Fields Narik 86
1000000 1000003 NULL NULL Warren 92
1000000 1000008 Paulette Ronald Irvin 85
group_column = "unique_id"
concat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
  df_helper=df
  df_helper=df_helper.groupBy(group_column)\
  .agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
  .withColumn("sorted_list",col("collect_list."+str(i)))\
  .withColumn("first_item",slice(col("sorted_list"),1,1))\
  .withColumn(i,concat_ws(",",col("first_item")))\
  .drop("collect_list")\
  .drop("sorted_list")\
  .drop("first_item")
  print(i)
  df_final=df_final.join(df_helper,group_column,"inner")
  df_final.cache()
df_final.display() #I'm using databricks

我的结果如下:

unique_id middle_name
1000000 Warren

我想要的结果是:

unique_id first_name last_name middle_name
1000000 Simmons Eric Warren

Second set of tables if they don't pretty print above

【问题讨论】:

您需要的所有工具都在编辑器工具栏中。也不需要 html 换行符。 @Lamanus 感谢您的编辑帮助。 【参考方案1】:

我找到了自己问题的解决方案:在我加入数据框时添加.collect() 调用,而不是persist()cache();这将产生预期的数据帧。

group_column = "unique_id"
enter code hereconcat_list = ['first_name','last_name','middle_name']
sort_column = "score"
sort_order = False
df_final=df.select(group_column).distinct()
for i in concat_list:\
  df_helper=df
  df_helper=df_helper.groupBy(group_column)\
  .agg(sort_array(collect_list(struct(sort_column,i)),sort_order).alias('collect_list'))\
  .withColumn("sorted_list",col("collect_list."+str(i)))\
  .withColumn("first_item",slice(col("sorted_list"),1,1))\
  .withColumn(i,concat_ws(",",col("first_item")))\
  .drop("collect_list")\
  .drop("sorted_list")\
  .drop("first_item")
  print(i)
  df_final=df_final.join(df_helper,group_column,"inner")
  df_final.collect()
df_final.display() #I'm using databricks

【讨论】:

以上是关于Pyspark中组连接函数的持久循环数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何:Pyspark 数据帧持久使用和回读

pyspark 行列表的 RDD 到 DataFrame

如何在pyspark的循环中合并数据帧

如何在 for 循环中附加 pyspark 数据帧?

Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列

pyspark 内连接的替代方法来比较 pyspark 中的两个数据帧