Spark在MongoDB上的读写操作

Posted Hadoop技术学习

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark在MongoDB上的读写操作相关的知识,希望对你有一定的参考价值。


1、首先Spark连接MongoDB库需要事先安装两个相应的jar包,这里下载的是mongo-spark-connector_2.11-2.2.5.jar包和mongo-java-driver-3.6.3.jar包,然后将jar包放在Spark安装目录的jars目录下(<Spark_home>/jars/)。每个节点都一样。


2、配置Saprk的配置文件,默认是<Spark_home>/conf目录下的spark-defaults.conf文件。

3、spark在MongoDB库的写操作

当需要将RDD数据集写入MongoDB时,需要先将RDD数据集转换成BSON文档。

例子1,创建一个1到10的RDD数据并写入MongoDB集合。

import com.mongodb.spark._     

import org.bson.Document       

val document = sc.parallelize((1 to 10).map(i => Document.parse(s”{test:$i}”)))

MongoSpark.save(document)


由于此前已经在spark的配置文件中配置了MongoDB的默认输出库spark,默认输出集合是mongo_spark_out,所以上面代码执行结果可以直接查看mongo_spark_out集合。

Spark在MongoDB上的读写操作

 

如果用户想写入MongoDB另外的集合而不是在spark配置文件中默认的mongo_spark_out集合。则可以利用com.mongodb.spark.config包下的WriteConfig方法。

例子2:将上面例子,写入到MongoDB的address集合。

import com.mongodb.spark._

import com.mongodb.spark.config._

import org.bson.Document

val writeConfig = WriteConfig(Map(“collection” -> “address”,”writeConcern.w” -> “majority”),Some(WriteConfig(sc)))

val sparkDocuments = sc.parallelize((1 to 10).map(i => Document.parse(s”{spark: $i}”)))

MongoSpark.save(sparkDocuments,writeConfig)

Spark在MongoDB上的读写操作

Spark在MongoDB上的读写操作

RDD有个隐式方法saveToMongoDB(),可以不带任何参数将文档保存到WriteConfig对象指定的集合中。

例子3:上面的例子2使用saveToMongoDB()方法可以用下面方式实现。

import com.mongodb.spark._

import com.mongodb.spark.config._

import org.bson.Document

val documents = sc.parallelize((1 to 10).map(i => Document.parse(s"{spark: $i}")))

documents.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://192.168.1.106:27017/spark.mongo_20181114")))

Spark在MongoDB上的读写操作

Spark在MongoDB上的读写操作

 

4、spark在MongoDB上的读操作

通过前面了解spark可以通过MongoSpark.save方法和RDD的saveToMongoDB()实现对MongoDB数据库的写操作。而读操作则可以使用MongoSpark.load()方法和SparkContext对象的loadFromMongoDB()方法从特定的集合读取数据。

例子5:读取默认MongoDB集合mongo_spark中的第一个文档

import com.mongodb.spark._

val rdd = MongoSpark.load(sc)

println(rdd.first.toJson)

Spark在MongoDB上的读写操作

Spark在MongoDB上的读写操作

当需要读取MongoDB库上的特定数据库总的集合时,则可以使用ReadConfig()方法,用法和WriteConfig()方法相似。

例子6:用loadFromMongoDB()方法读取spark库下的mongo_20181115集合的数据。

import com.mongodb.spark.config._

val readConfig = ReadConfig(Map("collection" -> "mongo_20181115", "readPreference.name" -> "secondaryPreferred"), Some(ReadConfig(sc)))

val rdd = MongoSpark.load(sc,readConfig)

println(rdd.first.toJson)

Spark在MongoDB上的读写操作

 

同样的,读操作有个loadFromMongoDB()方法可以读取uri指定的MongoDB库中集合。这个方法和saveToMongoDB()方法相似。不同的是loadFromMongoDB()是SparkContext对象的方法,而saveToMongoDB()方法是RDD对象的方法。

例子7:用loadFromMongoDB()方法实现例子6的读操作

import com.mongodb.spark.config._

import com.mongodb.spark._

val rdd = sc.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://192.168.1.106:27017/spark.mongo_20181115")))

println(rdd.first.toJson)

以上是关于Spark在MongoDB上的读写操作的主要内容,如果未能解决你的问题,请参考以下文章

pyspark对Mysql数据库进行读写

Spark学习笔记——读写Hbase

Spark SQL读写方法

MongoDB主从复制+集群

Mongodb副本集实现及读写分离

Spark读写HBase实践