找出 2 个表 (`tbl_spark`) 是不是相等而不使用 sparklyr 收集它们
Posted
技术标签:
【中文标题】找出 2 个表 (`tbl_spark`) 是不是相等而不使用 sparklyr 收集它们【英文标题】:Find out if 2 tables (`tbl_spark`) are equal without collecting them using sparklyr找出 2 个表 (`tbl_spark`) 是否相等而不使用 sparklyr 收集它们 【发布时间】:2019-01-03 05:03:16 【问题描述】:考虑在 spark 中有 2 个表或表引用要比较,例如以确保您的备份正常工作。有没有可能在火花中做那个遥控器?因为使用collect()
将所有数据复制到R没有用。
library(sparklyr)
library(dplyr)
library(DBI)
##### create spark connection here
# sc <- spark_connect(<yourcodehere>)
spark_connection(sc)
spark_context(sc)
trees1_tbl <- sdf_copy_to(sc, trees, "trees1")
trees2_tbl <- sdf_copy_to(sc, trees, "trees2")
identical(trees1_tbl, trees2_tbl) # FALSE
identical(collect(trees1_tbl), collect(trees2_tbl)) # TRUE
setequal(trees1_tbl, trees2_tbl) # FALSE
setequal(collect(trees1_tbl), (trees2_tbl)) # TRUE
spark_disconnect(sc)
如果dplyr::setequal()
可以直接使用就好了。
【问题讨论】:
【参考方案1】:这是行不通的。这里要记住的要点是 Spark DataFrames
* 不是数据容器。一旦执行管道,就会有转换的描述,这些转换将应用于数据。这意味着,每次评估数据时,结果都可能不同。您可以在这里问的唯一有意义的问题是,DataFrames
是否都描述了相同的执行计划,这显然对您的情况没有用处。
那么如何比较数据呢?这里真的没有万能的答案。
测试
如果它是单元测试的一部分,收集数据并比较本地对象是可行的方法(尽管请记住,使用集合可能会遗漏一些微妙但常见的问题)。
生产
外部单元测试你可以尝试检查是否
大小 A 等于 B 大小 A 除外 B 为 ∅,B 除外 A 为 ∅然而,这非常昂贵,如果可行的话,可能会显着增加该过程的成本。因此,在实践中,您可能更喜欢不提供严格保证但具有更好性能的方法。这些将根据输入和输出源以及故障模型而有所不同(例如,基于文件的源比使用数据库或消息队列的源更可靠)。
在最简单的情况下,您可以使用 Spark Web UI 手动检查基本不变量,例如读取和写入的行数。对于更高级的监控,您可以实现自己的 Spark 侦听器(例如 Spark: how to get the number of written rows?)、查询侦听器或累加器,但所有这些组件都不会在 sparklyr
中公开,并且需要编写本机(Scala 或 Java)代码。
* 我在这里指的是 Spark,但使用 dplyr
和数据库后端并没有什么不同。
【讨论】:
【参考方案2】:我写了一个我认为你可以做到的例子。基本上,您只需合并两个表,然后将 distinct() 应用于合并的结果。在 distinct() 之后,只需将结果数据帧的行数与初始行数进行比较。
>>> rdd = spark.sparkContext.parallelize([("test","test1")])
>>> rdd.collect()
[('test', 'test1')]
>>> df1 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df1.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+
>>> df2 = spark.createDataFrame(rdd).toDF("col1","col2")
>>> df2.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+
>>> df3 = df1.union(df2)
>>> df3.show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
|test|test1|
+----+-----+
>>> df3.distinct().show()
+----+-----+
|col1| col2|
+----+-----+
|test|test1|
+----+-----+
>>> df1.count()
1
>>> df3.distinct().count()
1
【讨论】:
【参考方案3】:感谢@Cosmin 的提示!
首先使用setdiff()
,其中有dplyr
提供的tbl_lazy
对象的方法(与setequal
不同),统计行数并与0比较。
trees1_tbl %>% setdiff(trees2_tbl) %>% sdf_nrow() == 0
## TRUE
如果来自trees1_tbl
的所有数据都包含在trees2_tbl
中,则将导致TRUE
。
如果它们不同,可以省略== 0
以获取trees2_tbl
中缺失的行数。
【讨论】:
以上是关于找出 2 个表 (`tbl_spark`) 是不是相等而不使用 sparklyr 收集它们的主要内容,如果未能解决你的问题,请参考以下文章