将 scala/spark 信息写入 MongoDB

Posted

技术标签:

【中文标题】将 scala/spark 信息写入 MongoDB【英文标题】:Writing scala/spark info into MongoDB 【发布时间】:2018-01-27 16:04:51 【问题描述】:

我有一个日志数据,其中包含类似

的数据
Started by timer
...
...
Finished: SUCCESS

对于每条日志数据,需要将日志状态(此处为“Finished”)和状态(“SUCCESS”)写入MongoDB。

case class Logs(status:String,statusCode:String)

def main(args:Array[String])

     val sc= new SparkContext("local[*]","MongoDB")
     val lines=sc.textFile("log1.txt")
     val pairs=lines.filter(value=>value.startsWith("Finished")).
     map(lines=>lines.split(": ")).
     map(lines=>(lines(0).toString(),lines(1).toString())).

    for(keyAndValue<-pairs)
      println("key: "+va._1+" has val: "+va._2)
    

我能够正确获取值。

现在我想将它们作为键和值存储在 MongoDB 中,例如 ("Finished","SUCCESS"),存储在特定数据库中的集合中。

我创建了如下的 mongodb 配置:

 val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test.test")
.config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.test")
.getOrCreate()

如何使用我创建的类来存储值?

我是否缺少任何其他配置,或者需要将数据以不同的方式放入数据库中。

我尝试使用下面的 sn-p 将信息保存到 Mongo 中,但是,它表明“写入不是 RDD 的成员”。

pairs.foreachRDD( rdd =>
import spark.implicits._
val matching = rdd.map( case (status: String, statusCode: String)
=> Logs(status, statusCode) )
matching.write.mode("append").mongo()
)

任何帮助将不胜感激。

【问题讨论】:

看看这个链接:docs.mongodb.com/spark-connector/master/scala/write-to-mongodb它可以帮助你。 【参考方案1】:

根据mongodb docs 中的说明,您必须将您的RDD 转换为BSON 文档。

也不需要创建SparkSession(来自 SparkSQL)和SparkContext,因为上下文是会话的一部分。

我在您的代码之上构建了一个简单的示例。由于Logs类必须转换成Documentanyway,所以我省略了这一步:

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.SparkSession
import org.bson.Document

object MongoDBTest 

  def main(args: Array[String]) 

    val spark = SparkSession.builder()
      .master("local")
      .appName("MongoSparkConnectorIntro")
      .config("spark.mongodb.input.uri", "mongodb://localhost:27017/test.test")
      .config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
      .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.test")
      .getOrCreate()

    val sc = spark.sparkContext
    val lines = sc.textFile("log1.txt")
    val pairs = lines.filter(value => value.startsWith("Finished"))
       .map(lines => lines.split(": "))
       .map(line => new Document((line(0)), line(1)))
    for (va <- pairs) 
      println(va)
    

    MongoSpark.save(pairs)
  

必要的依赖关系

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0"

在 mongodb 中快速检查会显示结果哦,一个最小的日志文件:

>db.test.find()
 "_id" : ObjectId("5a70a04737331a6da0a6ef96"), "Finished" : "SUCCESS" 
 "_id" : ObjectId("5a70a04737331a6da0a6ef97"), "Finished" : "FAILURE" 

【讨论】:

我们需要在哪里添加依赖? 我将它们添加到 build.sbt 但我想有几种方法可以处理 scala 中的依赖关系。您在代码中使用了 SparkContext 和 SparkSession,因此在您的设置中应该有一种处理外部依赖的现有方法。 Document 对象的第一个值 line(0) 是否包含“status”,而 Logs 的第二个参数“statusCode”是否包含在 line(1) 中? 是的,对于每一行,都会创建一个带有一个键/值对的新文档,其中键是状态,值是状态代码【参考方案2】:

DataFrameWriterDataset 的属性。

如果Logs 是一个案例类,比如:

case class Logs(status: String, statusCode: String)

只是替换

val matching = rdd.map( 
  case (status: String, statusCode: String) => Logs(status, statusCode)
)

val matching = rdd.map( 
  case (status: String, statusCode: String) => Logs(status, statusCode)
).toDF

【讨论】:

转换为数据框没有帮助。错误显示在matching.write.mode("append").mongo()行中,w.r.t写操作。

以上是关于将 scala/spark 信息写入 MongoDB的主要内容,如果未能解决你的问题,请参考以下文章

由于 Databricks 不公开支持 spark-redshift lib,使用 Scala spark 从 Redshift 读取/写入 Redshift 的最佳方法是啥

使用 Scala/Spark 列出目录中的文件(包括文件信息)

清空正在写入的日志文件(qbit)

如何将 scala spark.sql.dataFrame 转换为 Pandas 数据框

如何将 Scala Spark Dataframe 转换为 LinkedHashMap[String, String]

Scala Spark 循环加入数据框