在 PySpark 中查找两个数据帧之间的变化

Posted

技术标签:

【中文标题】在 PySpark 中查找两个数据帧之间的变化【英文标题】:Find changes beween two dataframes in PySpark 【发布时间】:2018-01-09 09:40:40 【问题描述】:

我有两个数据框,比如说 dfA 和 dfB。

dfA:
IdCol | Col2  | Col3
id1   | val2  | val3

dfB:
IdCol | Col2  | Col3
id1   | val2  | val4

两个数据框在 IdCol 中连接。我想每行比较它们并保持列不同以及它们在另一个数据框中的值。例如,从上述两个数据框中,我想要一个结果:

dfChanges:
RowId | Col  | dfA_value | dfB_value |
id1   | Col3 | val_3     | val_4     |

我有点坚持如何做到这一点。任何人都可以提供方向吗? 提前致谢

编辑

我的尝试是这样的。但它不是很清楚或具有良好的性能。有更好的方法吗?

dfChanges = None

#for all column excpet id
for colName in dfA.column[1:]:

    #Select whole columns of id and targeted column 
    #from both datasets and subtract to find differences
    changedRows = dfA.select(['IdCol',colName]).subtract(dfB.select(['IdCol',colName]))

    #Join with dfB to take the value of targeted column from there
    temp = changedRows.join(dfB.select(col('IdCol'),col(colName).alias("dfB_value")),dfA.IdCol == dfB.IdCol, 'inner'). \
    drop(dfB.IdCol)

    #Proper Rename columns
    temp = temp.withColumnRenamed(colname,"dfA_value")
    temp = temp.withColumn("Col",lit(colName))

    #Append to a single dataframe
    if (dfChanges is None):
        dfChanges = temp
    else:
        dfChanges = dfChanges.union(temp)

【问题讨论】:

【参考方案1】:

通过 id 加入两个数据框:

dfA = spark.createDataFrame(
    [("id1", "val2", "val3")], ("Idcol1", "Col2", "Col3")
)

dfB = spark.createDataFrame(
    [("id1", "val2", "val4")], ("Idcol1", "Col2", "Col3")
)

dfAB = dfA.alias("dfA").join(dfB.alias("dfB"), "idCol1")

重塑:

from pyspark.sql.functions import col, struct

ids = ["Idcol1"]

vals = [struct(
    col("dfA.".format(c)).alias("dfA_value"),
    col("dfB.".format(c)).alias("dfB_value")
).alias(c) for c in dfA.columns if c not in ids]

melt(定义为here)

(melt(dfAB.select(ids + vals), ids, [c for c in dfA.columns if c not in ids])
    .where(col("value.dfA_value") != col("value.dfB_value"))
    .select(ids + ["variable" , "value.dfA_value", "value.dfB_value"])
    .show())

+------+--------+---------+---------+                                           
|Idcol1|variable|dfA_value|dfB_value|
+------+--------+---------+---------+
|   id1|    Col3|     val3|     val4|
+------+--------+---------+---------+

【讨论】:

这工作得很好,但我遇到了这个问题:当我的原始数据帧在列中混合数据类型(例如,一个是整数,其他是字符串)时,融化函数崩溃,无法解析数组异常。我目前将所有变量都转换为 String 但我认为这是不必要的。有什么建议可以修改 melt 函数以处理所有数据类型?我应该在melt函数中使用除struct之外的其他数据类型吗? 如果您想在一个表中进行所有更改,则需要单一类型。 Spark 使用关系模型,不支持联合类型。因此,如果输入是混合的,那么您必须进行投射。您可以尝试在熔化前比较数据以避免不精确的问题。 非常感谢,这真的很有帮助

以上是关于在 PySpark 中查找两个数据帧之间的变化的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据框比较以根据关键字段查找列差异

为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?

Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)

Pyspark 在查找前一行时按组迭代数据帧

Pyspark 错误:“Py4JJavaError:调用 o655.count 时出错。”在数据帧上调用 count() 方法时

x00 出现在 Pyspark 数据帧中的每个字符之间