Spark rdd.count() 产生不一致的结果

Posted

技术标签:

【中文标题】Spark rdd.count() 产生不一致的结果【英文标题】:Spark rdd.count() yields inconsistent results 【发布时间】:2017-06-10 18:36:43 【问题描述】:

我有点困惑。

一个简单的 rdd.count() 在多次运行时会给出不同的结果。

这是我运行的代码:

val inputRdd = sc.newAPIHadoopRDD(inputConfig,
classOf[com.mongodb.hadoop.MongoInputFormat],
classOf[Long],
classOf[org.bson.BSONObject])

println(inputRdd.count())

它打开一个到 MondoDb 服务器的连接并简单地计算对象。 对我来说似乎很简单

根据 MongoDb 有 3,349,495 个条目

这是我的 spark 输出,都运行同一个 jar:

spark1 :    3.257.048  
spark2 :    3.303.272  
spark3 :    3.303.272  
spark4 :    3.303.272  
spark5 :    3.303.271   
spark6 :    3.303.271  
spark7 :    3.303.272  
spark8 :    3.303.272  
spark9 :    3.306.300  
spark10:    3.303.272  
spark11:    3.303.271  

Spark 和 MongoDb 在同一个集群上运行。 我们正在运行:

Spark version 1.5.0-cdh5.6.1  
Scala version 2.10.4  
MongoDb version 2.6.12  

很遗憾我们无法更新这些

Spark 是非确定性的吗? 有没有大神可以赐教?

提前致谢

编辑/更多信息 我刚刚注意到我们的 mongod.log 中有一个错误。 这个错误会导致不一致的行为吗?

[rsBackgroundSync] replSet not trying to sync from hadoop04:27017, it is vetoed for 333 more seconds
[rsBackgroundSync] replSet syncing to: hadoop05:27017
[rsBackgroundSync] replSet not trying to sync from hadoop05:27017, it is vetoed for 600 more seconds
[rsBackgroundSync] replSet not trying to sync from hadoop04:27017, it is vetoed for 333 more seconds
[rsBackgroundSync] replSet not trying to sync from hadoop05:27017, it is vetoed for 600 more seconds
[rsBackgroundSync] replSet not trying to sync from hadoop04:27017, it is vetoed for 333 more seconds
[rsBackgroundSync] replSet error RS102 too stale to catch up, at least from hadoop05:27017
[rsBackgroundSync] replSet our last optime : Jul  2 10:19:44 57777920:111
[rsBackgroundSync] replSet oldest at hadoop05:27017 : Jul  5 15:17:58 577bb386:59
[rsBackgroundSync] replSet See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember
[rsBackgroundSync] replSet error RS102 too stale to catch up

【问题讨论】:

您是否多次检查 MongoDb 中的条目数(与运行 spark count() 并行)? MongoDb 中的条目数在运行时没有改变。并感谢您重新格式化:) a) 您的 MongoDB 部署拓扑是什么? (副本集或分片集群?)也许 spark 工作人员根据 MongoDB 成员返回不同的答案,即一些成员尚未复制数据。 b) MongoDB v2.6 已于 2016 年 10 月结束生命周期,请尽可能升级。 【参考方案1】:

count 返回估计计数。因此,即使文档数量没有变化,返回的值也会发生变化。

countDocuments 被添加到 MongoDB 4.0 以提供准确的计数(也适用于多文档事务)。

【讨论】:

【参考方案2】:

正如您已经发现的那样,问题似乎不在于 spark(或 scala),而在于 MongoDB。

因此,关于差异的问题似乎已经解决。

您仍然希望解决实际的 MongoDB 错误,提供的链接可能是一个很好的起点:http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember

【讨论】:

以上是关于Spark rdd.count() 产生不一致的结果的主要内容,如果未能解决你的问题,请参考以下文章

为啥 dataset.count() 比 rdd.count() 快?

RDD缓存学习

在spark中遇到了奇怪的错误,找到了奇怪的解决方法

XGBoost:softprob 和 softmax 产生不一致的结果

执行顺序和缓存需求

运行之间的 Spark DataFrame 行数不一致