用 Pandas 或 Pyspark 用两列表示的关系展平“树”

Posted

技术标签:

【中文标题】用 Pandas 或 Pyspark 用两列表示的关系展平“树”【英文标题】:Flatten 'trees' with relationships expressed in two columns with Pandas or Pyspark 【发布时间】:2021-04-16 14:20:56 【问题描述】:

我收集了类似于以下格式的家谱:

    A
   / \
  B   C
 / \ / \
D  E F  G
  / \
 .. ..

表示为以下两列(其中包含多棵树):

child parent
A
B A
C A
D B
... ...

将其展平以便在新列中获得最高父级的最有效方法是什么?

即B = A,D = A...?

child parent highest_parent
A A
B A A
C A A
D B A
... ... ...

理想情况下,我想在 Spark 中执行此操作(考虑到数据集的大小),但也可以尝试 Pandas?

如果每个级别没有一些非常密集的递归函数(即使我的树最多有 3 级深),我目前无法高效地完成这项工作。

【问题讨论】:

在 pyspark 中并且知道你只有 3 个级别的深度,我认为使用两个 join 将是最容易实现并且可能足够快 【参考方案1】:

在 pandas 中,您可以通过 networkx 进行检查

df=df.dropna()
import networkx as nx
G=nx.from_pandas_edgelist(df, 'parent', 'child',create_using=nx.DiGraph())
def find_root(G,node):
    if len(list(G.predecessors(node)))>0:
        root = find_root(G,list(G.predecessors(node))[0])
    else:
        root = node
    return root

df['child'].apply(lambda x : find_root(G,x))

Out[109]: 
1    A
2    A
3    A

【讨论】:

BENY...不错的一个。 +1【参考方案2】:

这是 pyspark 中的一种方式,只有 3 个级别。请注意,示例中的最后一行有 4 个级别并且失败,希望不是您的情况,而是看到它

import pandas as pd
import pyspark.sql.functions as F

# create toy data
pdf = pd.DataFrame('child':list('ABCDEFGHIJKLM'), 
                    'parent':['','A','A','B','B','C','C','E', '', 'I','J','K','L'])
# convert to spark dataframe
df = spark.createDataFrame(pdf)

# coalesce the column parent
df = df.withColumn('parent', F.when(F.col('parent')!='', F.col('parent'))
                              .otherwise(F.col('child')))

# do self join using alias to direct to the right columns
res = (
    df.alias('df1')
      .join(df.alias('df2'), F.col('df1.parent') == F.col('df2.child'))
      .join(df.alias('df3'), F.col('df2.parent') == F.col('df3.child'))
      .select(['df1.child', 'df1.parent',F.col('df3.parent').alias('highest_parent')])
)

你得到

res.orderBy('child').show()
+-----+------+--------------+
|child|parent|highest_parent|
+-----+------+--------------+
|    A|     A|             A|
|    B|     A|             A|
|    C|     A|             A|
|    D|     B|             A|
|    E|     B|             A|
|    F|     C|             A|
|    G|     C|             A|
|    H|     E|             A|
|    I|     I|             I|
|    J|     I|             I|
|    K|     J|             I|
|    L|     K|             I|
|    M|     L|             J| <--this one 4 levels so fail, could add a join if needed
+-----+------+--------------+

【讨论】:

【参考方案3】:

import pandas as pd
import numpy as np
df = pd.DataFrame(
    "child": ['A','B','C','D','E','F','G','H','I','A1','B1','C1','D1','E1','F1','G1','H1','I1'],
    "parent": [np.NaN,'A','A','B','B','C','C','G','G',np.NaN,'A1','A1','B1','B1','C1','C1','G1','G1']
)
upper_parent_list = list(df[df['parent'].isna()]['child'])
['A', 'A1']
df['upper_parent'] = df['parent'].fillna(df['child'])

   child parent upper_parent
0      A    NaN            A
1      B      A            A
2      C      A            A
3      D      B            B
4      E      B            B
5      F      C            C
6      G      C            C
7      H      G            G
8      I      G            G
9     A1    NaN           A1
10    B1     A1           A1
11    C1     A1           A1
12    D1     B1           B1
13    E1     B1           B1
14    F1     C1           C1
15    G1     C1           C1
16    H1     G1           G1
17    I1     G1           G1
while df['upper_parent'].isin(upper_parent_list).sum()!=df.shape[0]:
    for up_par in upper_parent_list:
        child_list = list(df[df['upper_parent'].isin([up_par])]['child'])
        df['upper_parent'] = np.where(df['parent'].isin(child_list), up_par, df['upper_parent'])
print(df)

   child parent upper_parent
0      A    NaN            A
1      B      A            A
2      C      A            A
3      D      B            A
4      E      B            A
5      F      C            A
6      G      C            A
7      H      G            A
8      I      G            A
9     A1    NaN           A1
10    B1     A1           A1
11    C1     A1           A1
12    D1     B1           A1
13    E1     B1           A1
14    F1     C1           A1
15    G1     C1           A1
16    H1     G1           A1
17    I1     G1           A1

【讨论】:

以上是关于用 Pandas 或 Pyspark 用两列表示的关系展平“树”的主要内容,如果未能解决你的问题,请参考以下文章

将 pandas 转换为 pyspark 表达式

Pandas-分组函数和分层索引的展开

pyspark数据框中两列之间的时间差

Pandas to PySpark给出OOM错误而不是溢出到磁盘[重复]

如何在 PySpark 或 Pandas 中将一列的中间行大写

用两列垂直对齐高图中的图例