如何在pyspark中查找Dataframe列是一对一或一对多映射?

Posted

技术标签:

【中文标题】如何在pyspark中查找Dataframe列是一对一或一对多映射?【英文标题】:How to find Dataframe columns are one to one or one to many mapping in pyspark? 【发布时间】:2018-11-04 09:33:24 【问题描述】:

我有一个如下的数据框:

df0 = sc.parallelize([
    (1, 3),
    (2, 3),
    (1, 2)
   ]).toDF(["id",'t'])

当我执行表演时:

df0.show()
+---+---+
| id|  t|
+---+---+
|  1|  3|
|  2|  3|
|  1|  2|
+---+---+

我想确定列 id、t 之间的关系。 在id 列和t 之间给出df0 关系是一对多的,因为id 列1 和t 列3 即(1,3)和下一个(1,2)。一对多

我的预期输出如下:

+---+---+---+
|idt| id|  t|
+---+---+---+
| id| OO| OM|
|  t| OM| OO|
+---+---+---+

【问题讨论】:

您是否尝试过使用join 【参考方案1】:

您可以通过分组和计数来做到这一点。

from pyspark.sql import functions as F
from pyspark.sql.functions import when
from pyspark.sql.types import *

def relation_type(df, fromCol, toCol):
    df2 = df.groupBy(fromCol)\
    .agg(F.countDistinct(toCol).alias('val_count'))\
    .agg(F.max('val_count').alias('max_rel_count'))

    return df2.withColumn('mapping', when(df2['max_rel_count'] > 1, 'OM')\
                   .otherwise('OO'))\
                    .drop('max_rel_count')

def relation_types(df, cols):
    schemaArr = [StructField('ColName', StringType(), True)]
    for i in cols:
        schemaArr.append(StructField(i, StringType(), True))
    schema = StructType(schemaArr)
    result = sqlContext.createDataFrame(sc.emptyRDD(), schema)
    for i in cols:
        rowDict = []
        rowDict.append(i)
        for j in cols:
            val = relation_type(df, i, j).collect()[0]
            rowDict.append(val['mapping'])
        row = sqlContext.createDataFrame([rowDict])
        result = result.union(row)
    return result

然后用你想要的列调用它

relation_types(df, ['id', 't']).show()

结果

+-------+---+---+
|ColName| id|  t|
+-------+---+---+
|     id| OO| OM|
|      t| OM| OO|
+-------+---+---+

【讨论】:

您好 Lev Denisov,非常感谢您的帮助,它按预期工作,但我们返回的数据帧不同。但我想要一个包含所有结果的数据框,请帮助我。 你为什么想要它?你想解决的问题是什么?我无法想象你为什么需要你想要的输出。告诉我更多关于这个案例的信息,也许它有一个更简单的解决方案。 @RkC 我更新了代码,所以现在输出是你想要的格式

以上是关于如何在pyspark中查找Dataframe列是一对一或一对多映射?的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?

PySpark - 如何将列表传递给用户定义函数?

如何在 PySpark 中与爆炸相反?

在 PySpark Dataframe 中计数零次出现

Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)

如何快速检查 PySpark Dataframe 中是不是存在行?