在 PySpark 中查找两个数据帧之间的变化
Posted
技术标签:
【中文标题】在 PySpark 中查找两个数据帧之间的变化【英文标题】:Find changes beween two dataframes in PySpark 【发布时间】:2018-01-09 09:40:40 【问题描述】:我有两个数据框,比如说 dfA 和 dfB。
dfA:
IdCol | Col2 | Col3
id1 | val2 | val3
dfB:
IdCol | Col2 | Col3
id1 | val2 | val4
两个数据框在 IdCol 中连接。我想每行比较它们并保持列不同以及它们在另一个数据框中的值。例如,从上述两个数据框中,我想要一个结果:
dfChanges:
RowId | Col | dfA_value | dfB_value |
id1 | Col3 | val_3 | val_4 |
我有点坚持如何做到这一点。任何人都可以提供方向吗? 提前致谢
编辑
我的尝试是这样的。但它不是很清楚或具有良好的性能。有更好的方法吗?
dfChanges = None
#for all column excpet id
for colName in dfA.column[1:]:
#Select whole columns of id and targeted column
#from both datasets and subtract to find differences
changedRows = dfA.select(['IdCol',colName]).subtract(dfB.select(['IdCol',colName]))
#Join with dfB to take the value of targeted column from there
temp = changedRows.join(dfB.select(col('IdCol'),col(colName).alias("dfB_value")),dfA.IdCol == dfB.IdCol, 'inner'). \
drop(dfB.IdCol)
#Proper Rename columns
temp = temp.withColumnRenamed(colname,"dfA_value")
temp = temp.withColumn("Col",lit(colName))
#Append to a single dataframe
if (dfChanges is None):
dfChanges = temp
else:
dfChanges = dfChanges.union(temp)
【问题讨论】:
【参考方案1】:通过 id 加入两个数据框:
dfA = spark.createDataFrame(
[("id1", "val2", "val3")], ("Idcol1", "Col2", "Col3")
)
dfB = spark.createDataFrame(
[("id1", "val2", "val4")], ("Idcol1", "Col2", "Col3")
)
dfAB = dfA.alias("dfA").join(dfB.alias("dfB"), "idCol1")
重塑:
from pyspark.sql.functions import col, struct
ids = ["Idcol1"]
vals = [struct(
col("dfA.".format(c)).alias("dfA_value"),
col("dfB.".format(c)).alias("dfB_value")
).alias(c) for c in dfA.columns if c not in ids]
和melt
(定义为here)
(melt(dfAB.select(ids + vals), ids, [c for c in dfA.columns if c not in ids])
.where(col("value.dfA_value") != col("value.dfB_value"))
.select(ids + ["variable" , "value.dfA_value", "value.dfB_value"])
.show())
+------+--------+---------+---------+
|Idcol1|variable|dfA_value|dfB_value|
+------+--------+---------+---------+
| id1| Col3| val3| val4|
+------+--------+---------+---------+
【讨论】:
这工作得很好,但我遇到了这个问题:当我的原始数据帧在列中混合数据类型(例如,一个是整数,其他是字符串)时,融化函数崩溃,无法解析数组异常。我目前将所有变量都转换为 String 但我认为这是不必要的。有什么建议可以修改 melt 函数以处理所有数据类型?我应该在melt函数中使用除struct之外的其他数据类型吗? 如果您想在一个表中进行所有更改,则需要单一类型。 Spark 使用关系模型,不支持联合类型。因此,如果输入是混合的,那么您必须进行投射。您可以尝试在熔化前比较数据以避免不精确的问题。 非常感谢,这真的很有帮助以上是关于在 PySpark 中查找两个数据帧之间的变化的主要内容,如果未能解决你的问题,请参考以下文章
为啥在使用 pyspark 加入 Spark 数据帧时出现这些 Py4JJavaError showString 错误?
Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)
Pyspark 错误:“Py4JJavaError:调用 o655.count 时出错。”在数据帧上调用 count() 方法时