Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)
Posted
技术标签:
【中文标题】Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)【英文标题】:Pyspark DataFrame: find difference between two DataFrames (values and column names) 【发布时间】:2018-03-19 12:35:02 【问题描述】:我在数据框中总共有 100 多列。 我正在尝试比较两个数据框并找到与列名不匹配的记录。 我得到了一个输出波纹管代码,但是当我运行 100 多列的代码时,作业中止了。
我这样做是为了查找 SCD 类型 2 delta 进程错误。
from pyspark.sql.types import *
from pyspark.sql.functions import *
d2 = sc.parallelize([("A1", 500,1005) ,("A2", 700,10007)])
dataFrame1 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])
d2 = sc.parallelize([("A1", 600,1005),("A2", 700,10007)])
dataFrame2 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])
key_id_col_name="ID"
key_id_value="A1"
dataFrame1.select("ID","VALUE1").subtract(dataFrame2.select("ID",col("VALUE1").alias("value"))).show()
def unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value):
chk_fst=True
dataFrame1 = dataFrame1.where(dataFrame1[key_id_col_name] == key_id_value)
dataFrame2 = dataFrame2.where(dataFrame2[key_id_col_name] == key_id_value)
col_names = list(set(dataFrame1.columns).intersection(dataFrame2.columns))
col_names.remove(key_id_col_name)
for col_name in col_names:
if chk_fst == True:
df_tmp = dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name))
chk_fst = False
else:
df_tmp = df_tmp.unionAll(dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name)))
return df_tmp
res_df = unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value)
res_df.show()
>>> dataFrame1.show()
+---+------+------+
| ID|VALUE1|VALUE2|
+---+------+------+
| A1| 500| 1005|
| A2| 700| 10007|
+---+------+------+
>>> dataFrame2.show()
+---+------+------+
| ID|VALUE1|VALUE2|
+---+------+------+
| A1| 600| 1005|
| A2| 700| 10007|
+---+------+------+
>>> res_df.show()
+------+-----+--------+
|KEY_ID|VALUE|COL_NAME|
+------+-----+--------+
| A1| 500| VALUE1|
+------+-----+--------+
请建议任何其他方式。
【问题讨论】:
【参考方案1】:这是另一种方法:
使用ID
列加入两个DataFrame。
然后为每一行创建一个新列,其中包含存在差异的列。
使用pyspark.sql.functions.create_map()
将此新列创建为键值对映射。1
地图的键是列名。
使用pyspark.sql.functions.when()
,将值设置为dataFrame1
中的相应值(因为这似乎是您想要的示例)如果两个数据帧之间存在差异.否则,我们将值设置为None
。
在映射列上使用pyspark.sql.functions.explode()
,并使用pyspark.sql.functions.isnull()
过滤掉差异不为空的任何行。
选择您想要的列并使用alias()
重命名。
例子:
import pyspark.sql.functions as f
columns = [c for c in dataFrame1.columns if c != 'ID']
dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
.withColumn(
'diffs',
f.create_map(
*reduce(
list.__add__,
[
[
f.lit(c),
f.when(
f.col('r.'+c) != f.col('l.'+c),
f.col('r.'+c)
).otherwise(None)
]
for c in columns
]
)
)
)\
.select([f.col('ID'), f.explode('diffs')])\
.where(~f.isnull(f.col('value')))\
.select(
f.col('ID').alias('KEY_ID'),
f.col('value').alias('VALUE'),
f.col('key').alias('COL_NAME')
)\
.show(truncate=False)
#+------+-----+--------+
#|KEY_ID|VALUE|COL_NAME|
#+------+-----+--------+
#|A1 |500 |VALUE1 |
#+------+-----+--------+
备注
1 语法*reduce(list.__add__, [[f.lit(c), ...] for c in columns])
作为create_map()
的参数是一些有助于动态创建地图的python-fu。
create_map()
需要偶数个参数 - 它假定每对中的第一个参数是键,第二个是值。为了按该顺序放置参数,列表推导为每次迭代生成一个列表。我们使用 list.__add__
将此列表缩减为平面列表。
最后使用*
操作符对列表进行解包。
这里是中间输出,可能逻辑更清晰:
dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
.withColumn(
'diffs',
f.create_map(
*reduce(
list.__add__,
[
[
f.lit(c),
f.when(
f.col('r.'+c) != f.col('l.'+c),
f.col('r.'+c)
).otherwise(None)
]
for c in columns
]
)
)
)\
.select('ID', 'diffs').show(truncate=False)
#+---+-----------------------------------+
#|ID |diffs |
#+---+-----------------------------------+
#|A2 |Map(VALUE1 -> null, VALUE2 -> null)|
#|A1 |Map(VALUE1 -> 500, VALUE2 -> null) |
#+---+-----------------------------------+
【讨论】:
以上是关于Pyspark DataFrame:查找两个 DataFrame 之间的差异(值和列名)的主要内容,如果未能解决你的问题,请参考以下文章
在 PySpark 中,如何根据另一个 DataFrame 中的查找来填充新列?
pyspark - 使用 RDD 进行聚合比 DataFrame 快得多
如何在pyspark中查找Dataframe列是一对一或一对多映射?
pyspark案例系列4-dataframe输出到单个文件夹的解决方案
在 PySpark 的两个不同 pyspark.sql.dataframes 中的两列中创建一个 pyspark.sql.dataframe