将 scala/spark 信息写入 MongoDB
Posted
技术标签:
【中文标题】将 scala/spark 信息写入 MongoDB【英文标题】:Writing scala/spark info into MongoDB 【发布时间】:2018-01-27 16:04:51 【问题描述】:我有一个日志数据,其中包含类似
的数据Started by timer
...
...
Finished: SUCCESS
对于每条日志数据,需要将日志状态(此处为“Finished”)和状态(“SUCCESS”)写入MongoDB。
case class Logs(status:String,statusCode:String)
def main(args:Array[String])
val sc= new SparkContext("local[*]","MongoDB")
val lines=sc.textFile("log1.txt")
val pairs=lines.filter(value=>value.startsWith("Finished")).
map(lines=>lines.split(": ")).
map(lines=>(lines(0).toString(),lines(1).toString())).
for(keyAndValue<-pairs)
println("key: "+va._1+" has val: "+va._2)
我能够正确获取值。
现在我想将它们作为键和值存储在 MongoDB 中,例如 ("Finished","SUCCESS"),存储在特定数据库中的集合中。
我创建了如下的 mongodb 配置:
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test.test")
.config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.test")
.getOrCreate()
如何使用我创建的类来存储值?
我是否缺少任何其他配置,或者需要将数据以不同的方式放入数据库中。
我尝试使用下面的 sn-p 将信息保存到 Mongo 中,但是,它表明“写入不是 RDD 的成员”。
pairs.foreachRDD( rdd =>
import spark.implicits._
val matching = rdd.map( case (status: String, statusCode: String)
=> Logs(status, statusCode) )
matching.write.mode("append").mongo()
)
任何帮助将不胜感激。
【问题讨论】:
看看这个链接:docs.mongodb.com/spark-connector/master/scala/write-to-mongodb它可以帮助你。 【参考方案1】:根据mongodb docs 中的说明,您必须将您的RDD 转换为BSON 文档。
也不需要创建SparkSession
(来自 SparkSQL)和SparkContext
,因为上下文是会话的一部分。
我在您的代码之上构建了一个简单的示例。由于Logs
类必须转换成Document
anyway,所以我省略了这一步:
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.SparkSession
import org.bson.Document
object MongoDBTest
def main(args: Array[String])
val spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/test.test")
.config("spark.mongodb.input.readPreference.name", "secondaryPreferred")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.test")
.getOrCreate()
val sc = spark.sparkContext
val lines = sc.textFile("log1.txt")
val pairs = lines.filter(value => value.startsWith("Finished"))
.map(lines => lines.split(": "))
.map(line => new Document((line(0)), line(1)))
for (va <- pairs)
println(va)
MongoSpark.save(pairs)
必要的依赖关系
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.0"
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.2.0"
在 mongodb 中快速检查会显示结果哦,一个最小的日志文件:
>db.test.find()
"_id" : ObjectId("5a70a04737331a6da0a6ef96"), "Finished" : "SUCCESS"
"_id" : ObjectId("5a70a04737331a6da0a6ef97"), "Finished" : "FAILURE"
【讨论】:
我们需要在哪里添加依赖? 我将它们添加到 build.sbt 但我想有几种方法可以处理 scala 中的依赖关系。您在代码中使用了 SparkContext 和 SparkSession,因此在您的设置中应该有一种处理外部依赖的现有方法。 Document 对象的第一个值 line(0) 是否包含“status”,而 Logs 的第二个参数“statusCode”是否包含在 line(1) 中? 是的,对于每一行,都会创建一个带有一个键/值对的新文档,其中键是状态,值是状态代码【参考方案2】:DataFrameWriter
是 Dataset
的属性。
如果Logs
是一个案例类,比如:
case class Logs(status: String, statusCode: String)
只是替换
val matching = rdd.map(
case (status: String, statusCode: String) => Logs(status, statusCode)
)
与
val matching = rdd.map(
case (status: String, statusCode: String) => Logs(status, statusCode)
).toDF
【讨论】:
转换为数据框没有帮助。错误显示在matching.write.mode("append").mongo()行中,w.r.t写操作。以上是关于将 scala/spark 信息写入 MongoDB的主要内容,如果未能解决你的问题,请参考以下文章
由于 Databricks 不公开支持 spark-redshift lib,使用 Scala spark 从 Redshift 读取/写入 Redshift 的最佳方法是啥
使用 Scala/Spark 列出目录中的文件(包括文件信息)
如何将 scala spark.sql.dataFrame 转换为 Pandas 数据框