Spark RDD 到新的 MongoDB 集合,在 Scala 中具有索引

Posted

技术标签:

【中文标题】Spark RDD 到新的 MongoDB 集合,在 Scala 中具有索引【英文标题】:Spark RDD to new MongoDB collection with index in Scala 【发布时间】:2017-10-31 22:38:36 【问题描述】:

在 spark-submit 作业(用 Scala 编写的 .JAR)中,我需要访问现有的 MongoDB,在数据库中创建新集合,添加索引,从分布在 1000 多个执行程序的 RDD 写入数据收藏。

我找不到一个图书馆可以做到这一切。现在,我正在使用 mongo-spark-connector 从 RDD 写入,然后我使用 casbah 创建索引。

mongo spark 连接器(scaladoc 在哪里?)- https://docs.mongodb.com/spark-connector/current/scala-api/

casbah - http://mongodb.github.io/casbah/3.1/scaladoc/#package

流程是这样的……

创建 RDD 从 RDD 写入新集合(使用 mongo spark 连接器) 写入后在集合上创建索引(使用 casbah)

这种方法会加快速度吗?任何想法如何完成它?

创建空集合 创建索引 构建 RDD 并写入此集合 使用一个库来完成

这就是我现在的做法,但我怀疑还有更好的方法。

进口

// casbah - used to create index after new collection is created 
import com.mongodb.casbah.Imports.MongoClient,MongoCollection,MongoClientURI

// mongo-spark-connector used to write to Mongo from Spark cluster (and create new collection in process)
import com.mongodb.spark.MongoSpark 
import com.mongodb.spark.config.WriteConfig,ReadConfig
import org.bson.Document 

连接信息

object MyConnect 
  // mongodb connect
  val host       = "128.128.128.128"
  val port       = 12345
  val db         = "db"
  val collection = "collection"
  val user       = "user"
  val password   = "password"

  // casbah - to create index 
  val casbah_db_uri = MongoClientURI(
    s"mongodb://$user:$password@$host:$port/$db"
  )

  // mongodb spark connector - to write from RDD 
  val collection_uri = s"mongodb://$user:$password@$host:$port/$db.$collection"
  val writeConfig: WriteConfig = WriteConfig(Map("uri"->collection_uri))

做事

object sparkSubmit 

  def main(args: Array[String]): Unit = 

    // dummy dataset - RDD[(id, cnt)]
    val rdd_dummy: RDD[(String, Int)] = ???

    // data as Mongo docs - as per mongo spark connector
    val rdd_bson: RDD[Document] = 
      rdd_dummy
      .map(tup => s""""hex":"$tup._1", "cnt":$tup._2""")
      .map(str => Document.parse(str))
    

    // save to mongo / create new collection in process - using mongo spark connector
    MongoSpark.save(rdd_bson, MyConnect.writeConfig)

    // create index on new collection - using casbah
    val new_table: MongoCollection = MongoClient(MyConnect.casbah_db_uri)(MyConnect.db)(MyConnect.collection)
    new_table.createIndex("hex")
  

【问题讨论】:

【参考方案1】:

这种方法会加快速度吗?

通常对于任何数据库(包括 MongoDB),索引构建操作都会有成本。如果在空集合上创建索引,则在(每个)插入操作期间将产生索引构建操作成本。如果在所有插入之后创建索引,那么之后也会产生索引构建成本,这可能会锁定集合直到索引构建完成。

您可以根据您的用例进行选择,即如果您想在集合完成后立即访问它,请在空集合上创建索引。

请注意,MongoDB 有两种索引构建操作类型:前台和后台。请参阅MongoDB: Index Creation 了解更多信息。

scaladoc 在哪里?

它没有 scaladoc,但是有一个 javadoc:https://www.javadoc.io/doc/org.mongodb.spark/mongo-spark-connector_2.11/2.2.1

这是因为 MongoDB Spark 连接器使用了下面的 MongoDB Java 驱动程序 jar。

您应该尝试使用官方的MongoDB Scala driver,而不是使用传统的 Scala 驱动程序 Casbah 创建索引。例如Create An Index。

collection.createIndex(ascending("i"))

【讨论】:

Hi Wan,你能回答this question吗?

以上是关于Spark RDD 到新的 MongoDB 集合,在 Scala 中具有索引的主要内容,如果未能解决你的问题,请参考以下文章

MongoDB聚合结果输出到新的集合方法与案例实践

MongoDB聚合结果输出到新的集合方法与案例实践

Spark入门02

Spark面试题——说下对RDD的理解?RDD特点算子?

Spark核心-RDD

Spark-RDD 模型 以及运行原理