让 Spark、Python 和 MongoDB 协同工作

Posted

技术标签:

【中文标题】让 Spark、Python 和 MongoDB 协同工作【英文标题】:Getting Spark, Python, and MongoDB to work together 【发布时间】:2016-01-28 06:33:12 【问题描述】:

我很难将这些组件正确地结合在一起。我已经安装了 Spark 并成功运行,我可以在本地、独立以及通过 YARN 运行作业。我已按照建议的步骤(据我所知)here 和 here

我正在开发 Ubuntu,我拥有的各种组件版本是

Spark spark-1.5.1-bin-hadoop2.6 Hadoop hadoop-2.6.1 蒙古 2.6.10 Mongo-Hadoop 连接器 克隆自 https://github.com/mongodb/mongo-hadoop.git Python 2.7.10

我在执行各个步骤时遇到了一些困难,例如将哪些罐子添加到哪个路径,所以我添加的是

in /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce 我已添加 mongo-hadoop-core-1.5.0-SNAPSHOT.jar 以下环境变量 export HADOOP_HOME="/usr/local/share/hadoop-2.6.1" export PATH=$PATH:$HADOOP_HOME/bin export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6" export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python" export PATH=$PATH:$SPARK_HOME/bin

我的 Python 程序是基本的

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

我正在使用命令运行它

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

结果我得到以下输出

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

根据here

Java 客户端发生异常时引发此异常 代码。例如,如果您尝试从空堆栈中弹出一个元素。 抛出的 Java 异常的实例存储在 java_exception 成员。

查看pymongo_spark.py 的源代码和抛出错误的行,它说

"与 JVM 通信时出错。MongoDB Spark jar 是否打开 Spark 的类路径? :“

因此,作为回应,我试图确保传递了正确的罐子,但我可能做错了,见下文

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

我已将 pymongo 导入到同一个 python 程序中,以验证我至少可以使用它访问 MongoDB,而且我可以。

我知道这里有很多活动部件,所以如果我能提供更多有用的信息,请告诉我。

【问题讨论】:

【参考方案1】:

更新

2016-07-04

自上次更新以来MongoDB Spark Connector 成熟了很多。它提供up-to-date binaries 和基于数据源的API,但它使用SparkConf 配置,因此主观上不如Stratio/Spark-MongoDB 灵活。

2016-03-30

从最初的答案开始,我发现了两种从 Spark 连接到 MongoDB 的不同方法:

mongodb/mongo-spark Stratio/Spark-MongoDB

虽然前者似乎相对不成熟,但后者看起来比 Mongo-Hadoop 连接器更好,并提供 Spark SQL API。

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+

它似乎比mongo-hadoop-spark稳定得多,支持谓词下推,无需静态配置,简单易用。

原答案

确实,这里有很多活动部件。我试图通过构建一个与描述的配置大致匹配的简单 Docker 映像来使其更易于管理(但为了简洁起见,我省略了 Hadoop 库)。你可以找到complete source on GitHub (DOI 10.5281/zenodo.47882) 并从头开始构建它:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

或者下载一张我pushed to Docker Hub的图片,这样你就可以简单地docker pull zero323/mongo-spark):

开始图片:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

通过--jars--driver-class-path 启动PySpark shell:

pyspark --jars $JARS --driver-class-path $SPARK_DRIVER_EXTRA_CLASSPATH

最后看看它是如何工作的:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    "x": 1.0, "y": -1.0, "x": 0.0, "y": 4.0])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('0foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

请注意,mongo-hadoop 似乎在第一次操作后关闭了连接。所以在collect之后调用例如rdd.count()会抛出异常。

基于我在创建此图像时遇到的不同问题,我倾向于相信 传递 mongo-hadoop-1.5.0-SNAPSHOT.jarmongo-hadoop-spark-1.5.0-SNAPSHOT.jar 同时传递给两者 --jars--driver-class-path 是唯一的硬性要求

注意事项

这张图片大致基于jaceklaskowski/docker-spark ,所以如果有帮助,请务必向@jacek-laskowski发送一些好的因果报应。 如果不需要包含new API 的开发版本,那么使用--packages 很可能是更好的选择。

【讨论】:

快速问题(如果我应该将此移至一个新问题,请说)。如何使用 pymongo_spark 对 mongo 进行身份验证,我尝试了标准的 mongo 连接 uri 方法mongo_rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/db.collection'),但它不起作用 说实话我不知道。传入 uri should work。您可以尝试在配置字典 sc.mongoRDD(uri, some_config) 中传递它们,但这只是猜测。 原来是this is my problem。我现在正在解决这个问题。再次感谢 如果您在给出的读取示例之外添加一个写入示例,将会很有帮助。 MongoDB 用户应该有权对集合执行 splitvector 命令。请参阅docs.mongodb.com/manual/reference/built-in-roles 中的 clusterManager 角色。而且,要将 rdd 写入 mongodb,请使用 new_rdd.saveToMongoDB(out_path)【参考方案2】:

您可以尝试在 spark-submit 命令中使用--package 选项而不是--jars ...

spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]

其中一些 jar 文件不是 Uber jar,需要下载更多依赖项才能开始工作。

【讨论】:

我收到以下错误,指的是下载失败:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS Exception in thread "main" java.lang.RuntimeException: [download failed: com.google.guava#guava;11.0.2!guava.jar, ...... 可能是网络的暂时性问题。你能再试一次吗?潜在的异常是什么? 您的建议确实对我有用,但我觉得@zero323 给出了更有用的答案,因此将其标记为正确。【参考方案3】:

我昨天也遇到了同样的问题。能够通过将mongo-java-driver.jar 放入$HADOOP_HOME/libmongo-hadoop-core.jarmongo-hadoop-spark.jar 放入$HADOOP_HOME/spark/classpath/emr(或$SPARK_CLASSPATH 中的任何其他文件夹)来修复它。

如果有帮助,请告诉我。

【讨论】:

它似乎没有帮助,在 $HADOOP_HOME/lib 我有 mongo-java-driver-3.0.4.jar 然后在 /usr/local/share/mongo-hadoop/spark/build/libs/ 我有 mongo-hadoop-core-1.4.1.jarmongo -hadoop-spark-1.5.0-SNAPSHOT.jar。我在运行作业时传递了这个目录 $SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py【参考方案4】:

祝你好运!

@见https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage

from pyspark import SparkContext, SparkConf

import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()


def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)

    # Create an RDD backed by the MongoDB collection.
    # This RDD *does not* contain key/value pairs, just documents.
    # If you want key/value pairs, use the mongoPairRDD method instead.
    rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')

    # Save this RDD back to MongoDB as a different collection.
    rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')

    # You can also read and write BSON:
    bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
    bson_rdd.saveToBSON('/path/to/bson/output')

if __name__ == '__main__':
    main()

【讨论】:

以上是关于让 Spark、Python 和 MongoDB 协同工作的主要内容,如果未能解决你的问题,请参考以下文章

无法序列化类 org.apache.hadoop.io.DoubleWritable - MongoDB Hadoop 连接器 + Spark + Python

MongoDB + Spark: 完整的大数据解决方案

Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢

大数据 | MongoDB + Spark: 完整的大数据解决方案

MongoDB & Spark:mongo-hadoop 和 mongo-spark 的区别

spark 读取mongodb失败,报executor time out 和GC overhead limit exceeded 异常