MongoDB Spark连接器中的withPipeline函数在哪里

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MongoDB Spark连接器中的withPipeline函数在哪里相关的知识,希望对你有一定的参考价值。

我正在尝试将MongoDB中的一些数据加载到Spark中。我已经定义了一个ReadConfig来指定数据库和集合。我还想应用一个过滤器,以避免丢失所有的集合。我跟随https://docs.mongodb.com/spark-connector/master/scala/aggregation/的例子如下:

val rc = ReadConfig(Map(“database” - >“myDB”,“collection” - >“myCol”),Some(ReadConfig(spark)))

val rdd = MongoSpark.load(spark,rc)

但是rdd没有任何名为withPipeline的函数(似乎它生成了一个regualr DataFrame而不是MongoRDD)我是否错过了导入的东西?我已经进口了

import com.mongodb.spark._

import spark.implicits._

答案

我猜你在使用Spark.sparkContext时使用spark 2.0使用MongoSpark.load

val collectionDf = MongoSpark.load(spark.sparkContext, readConfig)
val aggregatedRdd = collectionDf.withPipeline(Seq(Document.parse("{ $match: { _id: 'value' } }")))

以上是关于MongoDB Spark连接器中的withPipeline函数在哪里的主要内容,如果未能解决你的问题,请参考以下文章

Spark与MongoDB连接

spark连接MongoDB

MongoDB 遇见 spark(进行整合)

MongoDB 遇见 spark(进行整合)

如何使用适用于 Spark 的 Mongo-Hadoop 连接器删除文档(记录)

无法从使用 mongo spark 连接器读取的 spark DF 中显示/写入。