Spark RDD 到新的 MongoDB 集合,在 Scala 中具有索引
Posted
技术标签:
【中文标题】Spark RDD 到新的 MongoDB 集合,在 Scala 中具有索引【英文标题】:Spark RDD to new MongoDB collection with index in Scala 【发布时间】:2017-10-31 22:38:36 【问题描述】:在 spark-submit 作业(用 Scala 编写的 .JAR)中,我需要访问现有的 MongoDB,在数据库中创建新集合,添加索引,从分布在 1000 多个执行程序的 RDD 写入数据收藏。
我找不到一个图书馆可以做到这一切。现在,我正在使用 mongo-spark-connector 从 RDD 写入,然后我使用 casbah 创建索引。
mongo spark 连接器(scaladoc 在哪里?)- https://docs.mongodb.com/spark-connector/current/scala-api/
casbah - http://mongodb.github.io/casbah/3.1/scaladoc/#package
流程是这样的……
创建 RDD 从 RDD 写入新集合(使用 mongo spark 连接器) 写入后在集合上创建索引(使用 casbah)这种方法会加快速度吗?任何想法如何完成它?
创建空集合 创建索引 构建 RDD 并写入此集合 使用一个库来完成这就是我现在的做法,但我怀疑还有更好的方法。
进口
// casbah - used to create index after new collection is created
import com.mongodb.casbah.Imports.MongoClient,MongoCollection,MongoClientURI
// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.WriteConfig,ReadConfig
import org.bson.Document
连接信息
object MyConnect
// mongodb connect
val host = "128.128.128.128"
val port = 12345
val db = "db"
val collection = "collection"
val user = "user"
val password = "password"
// casbah - to create index
val casbah_db_uri = MongoClientURI(
s"mongodb://$user:$password@$host:$port/$db"
)
// mongodb spark connector - to write from RDD
val collection_uri = s"mongodb://$user:$password@$host:$port/$db.$collection"
val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))
做事
object sparkSubmit
def main(args: Array[String]): Unit =
// dummy dataset - RDD[(id, cnt)]
val rdd_dummy: RDD[(String, Int)] = ???
// data as Mongo docs - as per mongo spark connector
val rdd_bson: RDD[Document] =
rdd_dummy
.map(tup => s""""hex":"$tup._1", "cnt":$tup._2""")
.map(str => Document.parse(str))
// save to mongo / create new collection in process - using mongo spark connector
MongoSpark.save(rdd_bson, MyConnect.writeConfig)
// create index on new collection - using casbah
val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
new_table.createIndex("hex")
【问题讨论】:
【参考方案1】:这种方法会加快速度吗?
通常对于任何数据库(包括 MongoDB),索引构建操作都会有成本。如果在空集合上创建索引,则在(每个)插入操作期间将产生索引构建操作成本。如果在所有插入之后创建索引,那么之后也会产生索引构建成本,这可能会锁定集合直到索引构建完成。
您可以根据您的用例进行选择,即如果您想在集合完成后立即访问它,请在空集合上创建索引。
请注意,MongoDB 有两种索引构建操作类型:前台和后台。请参阅MongoDB: Index Creation 了解更多信息。
scaladoc 在哪里?
它没有 scaladoc,但是有一个 javadoc:https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.2.1
这是因为 MongoDB Spark 连接器使用了下面的 MongoDB Java 驱动程序 jar。
您应该尝试使用官方的MongoDB Scala driver,而不是使用传统的 Scala 驱动程序 Casbah 创建索引。例如Create An Index。
collection.createIndex(ascending("i"))
【讨论】:
Hi Wan,你能回答this question吗?以上是关于Spark RDD 到新的 MongoDB 集合,在 Scala 中具有索引的主要内容,如果未能解决你的问题,请参考以下文章