Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)

Posted

技术标签:

【中文标题】Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)【英文标题】:Pyspark DataFrame: find difference between two DataFrames (values and column names) 【发布时间】:2018-03-19 12:35:02 【问题描述】:

我在数据框中总共有 100 多列。 我正在尝试比较两个数据框并找到与列名不匹配的记录。 我得到了一个输出波纹管代码,但是当我运行 100 多列的代码时,作业中止了。

我这样做是为了查找 SCD 类型 2 delta 进程错误。

from pyspark.sql.types import *
from pyspark.sql.functions import *

d2 = sc.parallelize([("A1", 500,1005) ,("A2", 700,10007)])
dataFrame1 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])

d2 = sc.parallelize([("A1", 600,1005),("A2", 700,10007)])
dataFrame2 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])

key_id_col_name="ID"
key_id_value="A1"
dataFrame1.select("ID","VALUE1").subtract(dataFrame2.select("ID",col("VALUE1").alias("value"))).show()

def unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value):
    chk_fst=True
    dataFrame1 = dataFrame1.where(dataFrame1[key_id_col_name] == key_id_value)
    dataFrame2 = dataFrame2.where(dataFrame2[key_id_col_name] == key_id_value)
    col_names = list(set(dataFrame1.columns).intersection(dataFrame2.columns))
    col_names.remove(key_id_col_name)
    for col_name in col_names:
        if chk_fst == True:
            df_tmp = dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name))
            chk_fst = False
        else:
            df_tmp = df_tmp.unionAll(dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name)))
    return df_tmp

res_df = unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value)

res_df.show() 

   >>> dataFrame1.show()
    +---+------+------+
    | ID|VALUE1|VALUE2|
    +---+------+------+
    | A1|   500|  1005|
    | A2|   700| 10007|
    +---+------+------+

    >>> dataFrame2.show()
    +---+------+------+
    | ID|VALUE1|VALUE2|
    +---+------+------+
    | A1|   600|  1005|
    | A2|   700| 10007|
    +---+------+------+

    >>> res_df.show()
    +------+-----+--------+
    |KEY_ID|VALUE|COL_NAME|
    +------+-----+--------+
    |    A1|  500|  VALUE1|
    +------+-----+--------+

请建议任何其他方式。

【问题讨论】:

【参考方案1】:

这是另一种方法:

使用ID 列加入两个DataFrame。 然后为每一行创建一个新列,其中包含存在差异的列。 使用pyspark.sql.functions.create_map()将此新列创建为键值对映射。1 地图的键是列名。 使用pyspark.sql.functions.when(),将值设置为dataFrame1 中的相应值(因为这似乎是您想要的示例)如果两个数据帧之间存在差异.否则,我们将值设置为None。 在映射列上使用pyspark.sql.functions.explode(),并使用pyspark.sql.functions.isnull() 过滤掉差异不为空的任何行。 选择您想要的列并使用alias()重命名。

例子:

import pyspark.sql.functions as f
columns = [c for c in dataFrame1.columns if c != 'ID']
dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
    .withColumn(
        'diffs',
        f.create_map(
            *reduce(
                list.__add__,
                [
                    [
                        f.lit(c),
                        f.when(
                            f.col('r.'+c) != f.col('l.'+c),
                            f.col('r.'+c)
                        ).otherwise(None)
                    ] 
                 for c in columns
                ]
            )
        )
    )\
    .select([f.col('ID'), f.explode('diffs')])\
    .where(~f.isnull(f.col('value')))\
    .select(
        f.col('ID').alias('KEY_ID'),
        f.col('value').alias('VALUE'),
        f.col('key').alias('COL_NAME')
    )\
    .show(truncate=False)
#+------+-----+--------+
#|KEY_ID|VALUE|COL_NAME|
#+------+-----+--------+
#|A1    |500  |VALUE1  |
#+------+-----+--------+

备注

1 语法*reduce(list.__add__, [[f.lit(c), ...] for c in columns]) 作为create_map() 的参数是一些有助于动态创建地图的python-fu。

create_map() 需要偶数个参数 - 它假定每对中的第一个参数是键,第二个是值。为了按该顺序放置参数,列表推导为每次迭代生成一个列表。我们使用 list.__add__ 将此列表缩减为平面列表。

最后使用* 操作符对列表进行解包。

这里是中间输出,可能逻辑更清晰:

dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
    .withColumn(
        'diffs',
        f.create_map(
            *reduce(
                list.__add__,
                [
                    [
                        f.lit(c),
                        f.when(
                            f.col('r.'+c) != f.col('l.'+c),
                            f.col('r.'+c)
                        ).otherwise(None)
                     ] 
                     for c in columns
                ]
            )
        )
    )\
    .select('ID', 'diffs').show(truncate=False)
#+---+-----------------------------------+
#|ID |diffs                              |
#+---+-----------------------------------+
#|A2 |Map(VALUE1 -> null, VALUE2 -> null)|
#|A1 |Map(VALUE1 -> 500, VALUE2 -> null) |
#+---+-----------------------------------+

【讨论】:

以上是关于Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?

pyspark - 使用 RDD 进行聚合比 DataFrame 快得多

如何在pyspark中查找Dataframe列是一对一或一对多映射?

pyspark案例系列4-dataframe输出到单个文件夹的解决方案

在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe

通过列 [PySpark] 连接两个 DataFrame