如何在pyspark中连接具有相同名称的列的值

Posted

技术标签:

【中文标题】如何在pyspark中连接具有相同名称的列的值【英文标题】:how to concat values of columns with same name in pyspark 【发布时间】:2021-06-24 00:14:58 【问题描述】:

我们有一个功能请求,我们希望根据请求从数据库中提取一个表并对其执行一些转换。但是这些表可能有重复的列 [具有相同名称的列]。我想将这些列合并为一列

例如:

Request  for input table named ages:


+---+----+------+-----+
|age| ids | ids | ids |
+---+----+------+-----+
| 25|  1  |  2  |  3  |
+---+----+------+-----+
| 26|  4  |  5  |  6  |
+---+----+------+-----+


the output table  is :

+---+----+------+-----+
|age|       ids       |
+---+----+------+-----+
| 25| [1  ,  2  ,  3] |
+---+----+------+-----+
| 26| [4  ,  5  ,  6] |
+---+----+------+-----+


next time  we might get a request for input table names:

+---+----+------+-----+
|name| company | company| 
+---+----+------+-----+
| abc|  a      |  b     | 
+---+----+------+-----+
| xyc|  c      |  d     |  
+---+----+------+-----+

The output table should be:

+---+----+------+
|name| company  | 
+---+----+------+
| abc|  [a,b]   | 
+---+----+------+
| xyc|  [c,d]   |  
+---+----+------+

所以基本上我需要找到具有相同名称的列,然后合并其中的值。

【问题讨论】:

你可以检查这个 - ***.com/questions/68028695/… 【参考方案1】:

您可以将 spark 数据帧转换为 pandas 数据帧,执行必要的操作并将其转换回 spark 数据帧。

为了清楚起见,我添加了必要的 cmets。

使用熊猫:

import pandas as pd
from collections import Counter

pd_df = spark_df.toPandas() #converting spark dataframe to pandas dataframe

pd_df.head()

def concatDuplicateColumns(df):
    duplicate_cols = [] #to store duplicate column names
    for col in dict(Counter(df.columns)):
        if dict(Counter(df.columns))[col] >1:
            duplicate_cols.append(col)

    final_dict = 
    for cols in duplicate_cols:
        final_dict[cols] = []  #initialize dict

    for cols in duplicate_cols:
        for ind in df.index.tolist():
            final_dict[cols].append(df.loc[ind, cols].tolist())

    df.drop(duplicate_cols, axis=1, inplace=True)
    for cols in duplicate_cols:
        df[cols] = final_dict[cols]
    return df

final_df = concatDuplicateColumns(pd_df)

spark_df = spark.createDataFrame(final_df)

spark_df.show()

【讨论】:

如果我事先不知道列名怎么办?我们可以动态知道同名的列吗? 我没听懂你想说什么。请通过更新问题更清楚地解释 @emulator 好的,有你的问题。我创建了一个函数,它将df 作为输入并返回您想要的df。看看我更新的答案 感谢您的回答,但我们在 spark 中是否有任何内置功能可以完成这项工作?如果我们有大量数据输入,这将如何优雅地扩展? @emulator 每次我回答您想要的内容时更改您的要求是一种不好的做法。现在对于您的新问题,没有任何内置的 pyspark 功能可以满足您的需求。你必须为它写作。

以上是关于如何在pyspark中连接具有相同名称的列的值的主要内容,如果未能解决你的问题,请参考以下文章

如何在 pyspark 中验证 Dataframe 的架构(列的编号和名称)?

PySpark:如何将具有 SparseVector 类型的列的 Spark 数据帧写入 CSV 文件?

连接具有相同值的行的列值(不同列的)

如何使用具有多个源列的 pandas_udf 将多个列添加到 pyspark DF?

使用pyspark查找每个对应列的两个数据帧上的值差异

如何获取和比较pyspark中两个数据框中相似列的所有值的数据类型