找出 2 个表 (`tbl_spark`) 是不是相等而不使用 sparklyr 收集它们

Posted

技术标签:

【中文标题】找出 2 个表 (`tbl_spark`) 是不是相等而不使用 sparklyr 收集它们【英文标题】:Find out if 2 tables (`tbl_spark`) are equal without collecting them using sparklyr找出 2 个表 (`tbl_spark`) 是否相等而不使用 sparklyr 收集它们 【发布时间】:2019-01-03 05:03:16 【问题描述】:

考虑在 spark 中有 2 个表或表引用要比较,例如以确保您的备份正常工作。有没有可能在火花中做那个遥控器?因为使用collect()将所有数据复制到R没有用。

library(sparklyr)
library(dplyr)
library(DBI)

##### create spark connection here
# sc <- spark_connect(<yourcodehere>)
spark_connection(sc)
spark_context(sc)

trees1_tbl <- sdf_copy_to(sc, trees, "trees1")
trees2_tbl <- sdf_copy_to(sc, trees, "trees2")
identical(trees1_tbl, trees2_tbl) # FALSE
identical(collect(trees1_tbl), collect(trees2_tbl)) # TRUE
setequal(trees1_tbl, trees2_tbl) # FALSE
setequal(collect(trees1_tbl), (trees2_tbl)) # TRUE

spark_disconnect(sc)

如果dplyr::setequal()可以直接使用就好了。

【问题讨论】:

【参考方案1】:

这是行不通的。这里要记住的要点是 Spark DataFrames* 不是数据容器。一旦执行管道,就会有转换的描述,这些转换将应用于数据。这意味着,每次评估数据时,结果都可能不同。您可以在这里问的唯一有意义的问题是,DataFrames 是否都描述了相同的执行计划,这显然对您的情况没有用处。

那么如何比较数据呢?这里真的没有万能的答案。

测试

如果它是单元测试的一部分,收集数据并比较本地对象是可行的方法(尽管请记住,使用集合可能会遗漏一些微妙但常见的问题)。

生产

外部单元测试你可以尝试检查是否

大小 A 等于 B 大小 A 除外 B 为 ∅,B 除外 A 为 ∅

然而,这非常昂贵,如果可行的话,可能会显着增加该过程的成本。因此,在实践中,您可能更喜欢不提供严格保证但具有更好性能的方法。这些将根据输入和输出源以及故障模型而有所不同(例如,基于文件的源比使用数据库或消息队列的源更可靠)。

在最简单的情况下,您可以使用 Spark Web UI 手动检查基本不变量,例如读取和写入的行数。对于更高级的监控,您可以实现自己的 Spark 侦听器(例如 Spark: how to get the number of written rows?)、查询侦听器或累加器,但所有这些组件都不会在 sparklyr 中公开,并且需要编写本机(Scala 或 Java)代码。


* 我在这里指的是 Spark,但使用 dplyr 和数据库后端并没有什么不同。

【讨论】:

【参考方案2】:

我写了一个我认为你可以做到的例子。基本上,您只需合并两个表,然后将 distinct() 应用于合并的结果。在 distinct() 之后,只需将结果数据帧的行数与初始行数进行比较。

>>> rdd = spark.sparkContext.parallelize([("test","test1")])
>>> rdd.collect()
[('test', 'test1')]
>>> df1 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df1.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+

>>> df2 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df2.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+

>>> df3 = df1.union(df2)
>>> df3.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
|test|test1|
+----+-----+

>>> df3.distinct().show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+

>>> df1.count()
1
>>> df3.distinct().count()
1

【讨论】:

【参考方案3】:

感谢@Cosmin 的提示!

首先使用setdiff(),其中有dplyr提供的tbl_lazy对象的方法(与setequal不同),统计行数并与0比较。

trees1_tbl %>% setdiff(trees2_tbl) %>% sdf_nrow() == 0
## TRUE

如果来自trees1_tbl 的所有数据都包含在trees2_tbl 中,则将导致TRUE。 如果它们不同,可以省略== 0 以获取trees2_tbl 中缺失的行数。

【讨论】:

以上是关于找出 2 个表 (`tbl_spark`) 是不是相等而不使用 sparklyr 收集它们的主要内容,如果未能解决你的问题,请参考以下文章

找出oracle中具有不同列号的2个表之间的区别

如何找出 Hive 数据库的总大小

Laravel 8 如何连接 2 个表并检查值是不是存在

检查值是不是存在于 2 个表之间

组合 3 个表,其中 2 列的组合不是唯一的

c#,判断2个dataTable是不是一样的问题。。