将 pyspark 数据帧与另一个数据帧进行比较
Posted
技术标签:
【中文标题】将 pyspark 数据帧与另一个数据帧进行比较【英文标题】:Compare a pyspark dataframe to another dataframe 【发布时间】:2018-08-16 13:04:28 【问题描述】:我有 2 个数据框要比较,它们的列数相同,比较结果应该包含不匹配的字段和值以及 ID。
数据框一
+-----+---+--------+
| name| id| City|
+-----+---+--------+
| Sam| 3| Toronto|
| BALU| 11| YYY|
|CLAIR| 7|Montreal|
|HELEN| 10| London|
|HELEN| 16| Ottawa|
+-----+---+--------+
数据框二
+-------------+-----------+-------------+
|Expected_name|Expected_id|Expected_City|
+-------------+-----------+-------------+
| SAM| 3| Toronto|
| BALU| 11| YYY|
| CLARE| 7| Montreal|
| HELEN| 10| Londn|
| HELEN| 15| Ottawa|
+-------------+-----------+-------------+
预期输出
+---+------------+--------------+-----+
| ID|Actual_value|Expected_value|Field|
+---+------------+--------------+-----+
| 7| CLAIR| CLARE| name|
| 3| Sam| SAM| name|
| 10| London| Londn| City|
+---+------------+--------------+-----+
代码
创建示例数据
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession
sc = SparkContext()
sql_context = SQLContext(sc)
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR") # log only on fails
df_Actual = sql_context.createDataFrame(
[("Sam", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLAIR", 7,'Montreal'),
("HELEN", 10,'London'), ("HELEN", 16,'Ottawa')],
["name", "id","City"]
)
df_Expected = sql_context.createDataFrame(
[("SAM", 3,'Toronto'), ("BALU", 11,'YYY'), ("CLARE", 7,'Montreal'),
("HELEN", 10,'Londn'), ("HELEN", 15,'Ottawa')],
["Expected_name", "Expected_id","Expected_City"]
)
为结果创建空数据框
field = [
StructField("ID",StringType(), True),
StructField("Actual_value", StringType(), True),
StructField("Expected_value", StringType(), True),
StructField("Field", StringType(), True)
]
schema = StructType(field)
Df_Result = sql_context.createDataFrame(sc.emptyRDD(), schema)
在 id 上加入预期和实际
df_cobined = df_Actual.join(df_Expected, (df_Actual.id == df_Expected.Expected_id))
col_names=df_Actual.schema.names
遍历每一列以查找不匹配项
for col_name in col_names:
#Filter for column values not matching
df_comp= df_cobined.filter(col(col_name)!=col("Expected_"+col_name ))\
.select(col('id'),col(col_name),col("Expected_"+col_name ))
#Add not matching column name
df_comp = df_comp.withColumn("Field", lit(col_name))
#Add to final result
Df_Result = Df_Result.union(df_comp)
Df_Result.show()
此代码按预期工作。但是,在实际情况下,我有更多的列和数百万行要比较。使用此代码,完成比较需要更多时间。有没有更好的方法来提高性能并获得相同的结果?
【问题讨论】:
【参考方案1】:避免使用union
的一种方法如下:
to_compare
接下来选择id
列并使用pyspark.sql.functions.when
比较这些列。对于那些不匹配的,构建一个包含 3 个字段的结构数组:(Actual_value, Expected_value, Field)
为 to_compare
中的每一列
分解临时数组列并删除空值
最后选择id
并使用col.*
将结构中的值展开为列。
代码:
StructType
存储不匹配的字段。
import pyspark.sql.functions as f
# these are the fields you want to compare
to_compare = [c for c in df_Actual.columns if c != "id"]
df_new = df_cobined.select(
"id",
f.array([
f.when(
f.col(c) != f.col("Expected_"+c),
f.struct(
f.col(c).alias("Actual_value"),
f.col("Expected_"+c).alias("Expected_value"),
f.lit(c).alias("Field")
)
).alias(c)
for c in to_compare
]).alias("temp")
)\
.select("id", f.explode("temp"))\
.dropna()\
.select("id", "col.*")
df_new.show()
#+---+------------+--------------+-----+
#| id|Actual_value|Expected_value|Field|
#+---+------------+--------------+-----+
#| 7| CLAIR| CLARE| name|
#| 10| London| Londn| City|
#| 3| Sam| SAM| name|
#+---+------------+--------------+-----+
【讨论】:
谢谢@pault,我会试试这个方法让你知道【参考方案2】:仅加入那些预期 id 等于实际并且在任何其他列中不匹配的记录:
df1.join(df2, df1.id=df2.id and (df1.name != df2.name or df1.age != df2.age...))
这意味着您将只对不匹配的行执行 for 循环,而不是整个数据集。
【讨论】:
【参考方案3】:对于这个正在寻找答案的人,我转置了数据框,然后做了对比。
from pyspark.sql.functions import array, col, explode, struct, lit
def Transposedf(df, by,colheader):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([ struct(lit(c).alias("Field"), col(c).alias(colheader)) for c in cols ])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.Field", "kvs."+colheader])
那么对比看起来是这样的
def Compare_df(df_Expected,df_Actual):
df_combined = (df_Actual
.join(df_Expected, ((df_Actual.id == df_Expected.id)
& (df_Actual.Field == df_Expected.Field)
& (df_Actual.Actual_value != df_Expected.Expected_value)))
.select([df_Actual.account_unique_id,df_Actual.Field,df_Actual.Actual_value,df_Expected.Expected_value])
)
return df_combined
我把这两个函数称为
df_Actual=Transposedf(df_Actual, ["id"],'Actual_value')
df_Expected=Transposedf(df_Expected, ["id"],'Expected_value')
#Compare the expected and actual
df_result=Compare_df(df_Expected,df_Actual)
【讨论】:
以上是关于将 pyspark 数据帧与另一个数据帧进行比较的主要内容,如果未能解决你的问题,请参考以下文章
有没有办法将数据帧的一列中的所有行与另一个数据帧的另一列(火花)中的所有行进行比较?