使用pyspark查找每个对应列的两个数据帧上的值差异

Posted

技术标签:

【中文标题】使用pyspark查找每个对应列的两个数据帧上的值差异【英文标题】:Find difference of values on two dataframe for each corresponding columns using pyspark 【发布时间】:2020-05-16 18:03:54 【问题描述】:

我想找出使用​​内连接连接时两个数据框的列值的差异。

df1 有 10 列,即。 key1,key2 & col1, col2 依此类推。 (列可以更多,名称可以不同) 同样 df2 有 10 列,即 key1,key2 & col1, col2 依此类推。

df3 = df1.join(df2, 'df1.key1 == df2.key1 and df1.key2 == df2.key2', 'inner')

现在我想比较两个数据框 df1 和 df2 的对应列,它们已经存在于连接的 df3 中。

现在我为 zip(df1.columns,df2.columns) 中的每个 x,y 循环它并存储在一个列表中 unmatchList.append((df3.select(df1.x,df2.y).filter(df1.x <> df2.y)))

我可以避免这个循环,因为这在这里广泛使用内存。我正在做其他计算,但这是我提供的小代码 sn-p。这背后的想法是找出对应列中的不同值以匹配两个数据帧的行。 exceptAll 不适用于此要求,因为它会根据列的位置找到差异。只有当两个数据帧的键匹配时,我才需要找到差异。

df1

key1 key2 col1 col2 col3 col4 col5

k11  k21   1    1    1    1    1

k12  k22   2    2    2    2    2

df2

key1 key2 col1 col2 col3 col4 col5

k11  k21   1    1    2    1    1

k12  k22   2    3    2    3    4

我想要的最终输出是

key1 key2 col  val1 val2

k11  k21  col3 1    2

k12  k22  col2 2    3

k12  k22  col4 2    3

k12  k22  col5 2    4

val1是从df1获取,val2是从df2获取

【问题讨论】:

【参考方案1】:

这里的问题是,如果 DataFrame 中的列数不高,则循环的性能会降低。它进一步导致输出内存结果。

我们可以使用数据框并将每次迭代的结果存储(附加或插入)到某个 hdfs 位置或 hive 表中,而不是将结果存储在列表中。

for x,y in zip(df1.columns,df2.columns)
    outputDF=joinedDF.filter(col(x) <> col(y))
                 .withColumns('key1',lit(key1))
                 .withColumns('key2',lit(key2))
                 .withColumns('col',lit(x))
                 .withColumns('val1',col(x))
                 .withColumns('val2',col(y))

    outputDF.partitionBy(x).coalesce(1).write.mode('append').format('hive').saveAsTable('DB.Table')````

#Another approach can be if no of columns are less (10-15):#
    outputDF=outputDF.union(joinedDF.filter(col(x) <> col(y))
                 .withColumns('key1',lit(key1))
                 .withColumns('key2',lit(key2))
                 .withColumns('col',lit(x))
                 .withColumns('val1',col(x))
                 .withColumns('val2',col(y)))

outputDF.partitionBy(x).coalesce(1).write.mode('append').format('hive').saveAsTable('DB.Table')


【讨论】:

以上是关于使用pyspark查找每个对应列的两个数据帧上的值差异的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据帧上的向量操作

如何使用 pySpark 使多个 json 处理更快?

PySpark:转换DataFrame中给定列的值

Pyspark数据帧:根据另一列的值提取列

具有大量列的数据帧上的 Spark 窗口函数

pyspark:比较给定列的值时从数据框中获取公共数据