无法通过 Spark 连接到 Mongo DB

Posted

技术标签:

【中文标题】无法通过 Spark 连接到 Mongo DB【英文标题】:Can't connect to Mongo DB via Spark 【发布时间】:2017-07-17 11:06:31 【问题描述】:

我正在尝试通过 Apache Spark 主机从 Mongo DB 读取数据。

我为此使用了 3 台机器:

M1 - 上面有一个 Mongo 数据库实例 M2 - 带有 Spark Master,带有 Mongo 连接器,在其上运行 M3 - 带有连接到 M2 的 Spark master 的 python 应用程序

应用程序 (M3) 正在像这样连接到 spark master:

_sparkSession = SparkSession.builder.master(masterPath).appName(appName)\
.config("spark.mongodb.input.uri", "mongodb://10.0.3.150/db1.data.coll")\
.config("spark.mongodb.output.uri", "mongodb://10.0.3.150/db1.data.coll").getOrCreate()

应用程序(M3)正在尝试从数据库中读取数据:

sqlContext = SQLContext(_sparkSession.sparkContext)
        df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://user:pass@10.0.3.150/db1.data?readPreference=primaryPreferred").load()

但因以下异常而失败:

    py4j.protocol.Py4JJavaError: An error occurred while calling o56.load.
: java.lang.ClassNotFoundException: Failed to find data source: com.mongodb.spark.sql.DefaultSource. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:594)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:86)
        at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:86)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.mongodb.spark.sql.DefaultSource.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25$$anonfun$apply$13.apply(DataSource.scala:579)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$25.apply(DataSource.scala:579)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:579)
        ... 16 more

【问题讨论】:

查看这个解决方案,在 Jupyter Notebooks 中实现:***.com/a/53997287/7331010 【参考方案1】:

Spark 找不到 com.mongodb.spark.sql.DefaultSource 包,因此出现错误消息。

一切,其他看起来不错只需要包含 Mongo Spark 包:

> $SPARK_HOME/bin/pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0

或者确保jar文件在正确的路径上。

确保检查您的 Spark 版本所需的 Mongo-Spark 包版本:https://spark-packages.org/package/mongodb/mongo-spark

【讨论】:

感谢您的回答。我指定我通过远程 Python 应用程序运行应用程序,而不是通过 PySpark shell。所以,作为一个新手 python 开发人员,我再次问,如何使用连接器包运行我的应用程序。还是我需要使用包运行 spark master? 请更新问题,提供有关如何提交 Spark 作业的更多信息,我会期待更新我的答案。 我改变了使用 spark master 的方式。我启动 Spark master 和它的 slave。之后,我使用 mongo-spark-connector 包和 python 脚本运行 spark-submit。猜猜这是推荐的方式。谢谢大家 @Ross 我有同样的问题,似乎无法解决。有什么想法吗? 当我遇到这个问题时,我的 $SPARK_HOME/jars 中没有 mongodb-spark-connector_2.11-2.2.3.jar(例如 /usr/local/spark-2.2.jar)。 2-bin-hadoop2.7/jars)。【参考方案2】:

我是 pyspark 用户,这是我的代码的样子,并且可以正常工作:

pyspark中的MongoDB连接配置

# For spark version < 3.0
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .master('local')\
    .config('spark.mongodb.input.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.data.coll')\
    .config('spark.mongodb.output.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.data.coll')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
    .getOrCreate()
# For spark version >= 3.0
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .master('local')\
    .config('spark.mongodb.input.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.coll')\
    .config('spark.mongodb.output.uri', 'mongodb://user:password@ip.x.x.x:27017/database01.coll')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
    .getOrCreate()

从 MongoDB 读取:

df01 = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("database","database01")\
    .option("collection", "collection01")\
    .load()

写入 MongoDB:

df01.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("overwrite")\
    .option("database","database01")\
    .option("collection", "collection02")\
    .save()

【讨论】:

这应该是公认的答案。 spark.jars.packages 选项记录在 spark.apache.org/docs/2.1.0/…【参考方案3】:

我在配置与 CosmosDB (API MongoDB) 的 Spark 连接时遇到了相当困难,因此我决定发布对我有用的代码作为贡献。

我通过 Databricks 笔记本使用 Spark 2.4.0。

from pyspark.sql import SparkSession

# Connect to CosmosDB to write on the collection
userName = "userName"
primaryKey = "myReadAndWritePrimaryKey"
host = "ipAddress"
port = "10255"
database = "dbName"
collection = "collectionName"

# Structure the connection
connectionString = "mongodb://0:1@2:3/4.5?ssl=true&replicaSet=globaldb".format(userName, primaryKey, host, port, database, collection)

spark = SparkSession\
    .builder\
    .config('spark.mongodb.input.uri', connectionString)\
    .config('spark.mongodb.output.uri', connectionString)\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
    .getOrCreate()

# Reading from CosmosDB
df = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", connectionString)\
    .option("database", database)\
    .option("collection", collection)\
    .load()

# Writing on CosmosDB (Appending new information without replacing documents)
dfToAppendOnCosmosDB.write.format("com.mongodb.spark.sql.DefaultSource")\
    .mode("append")\
    .option("uri", connectionString)\
    .option("replaceDocument", False)\
    .option("maxBatchSize", 100)\
    .option("database", database)\
    .option("collection", collection)\
    .save()

我在link 找到了配置连接器的选项。

【讨论】:

以上是关于无法通过 Spark 连接到 Mongo DB的主要内容,如果未能解决你的问题,请参考以下文章

通过 R 连接时无法在 mongo DB 中查看集合

无法使用 Grails 3.1.1 连接到 mongo DB

python连接Mongo数据库

无法使用 Apache spark 2.1.0 连接到 hive 数据库

连接到 postgresql:dbserver db 通过 JDBC 连接到 Databricks 时连接被拒绝

Nodejs,通过回调关闭mongo db连接