让 Spark、Python 和 MongoDB 协同工作
Posted
技术标签:
【中文标题】让 Spark、Python 和 MongoDB 协同工作【英文标题】:Getting Spark, Python, and MongoDB to work together 【发布时间】:2016-01-28 06:33:12 【问题描述】:我很难将这些组件正确地结合在一起。我已经安装了 Spark 并成功运行,我可以在本地、独立以及通过 YARN 运行作业。我已按照建议的步骤(据我所知)here 和 here
我正在开发 Ubuntu,我拥有的各种组件版本是
Spark spark-1.5.1-bin-hadoop2.6 Hadoop hadoop-2.6.1 蒙古 2.6.10 Mongo-Hadoop 连接器 克隆自 https://github.com/mongodb/mongo-hadoop.git Python 2.7.10我在执行各个步骤时遇到了一些困难,例如将哪些罐子添加到哪个路径,所以我添加的是
in/usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce
我已添加 mongo-hadoop-core-1.5.0-SNAPSHOT.jar
以下环境变量
export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
export PATH=$PATH:$HADOOP_HOME/bin
export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
export PATH=$PATH:$SPARK_HOME/bin
我的 Python 程序是基本的
from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
rdd = sc.mongoRDD(
'mongodb://username:password@localhost:27017/mydb.mycollection')
if __name__ == '__main__':
main()
我正在使用命令运行它
$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py
结果我得到以下输出
Traceback (most recent call last):
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
main()
File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
return self.mongoPairRDD(connection_string, config).values()
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
_ensure_pickles(self)
File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
orig_tb)
py4j.protocol.Py4JError
根据here
Java 客户端发生异常时引发此异常 代码。例如,如果您尝试从空堆栈中弹出一个元素。 抛出的 Java 异常的实例存储在 java_exception 成员。
查看pymongo_spark.py
的源代码和抛出错误的行,它说
"与 JVM 通信时出错。MongoDB Spark jar 是否打开 Spark 的类路径? :“
因此,作为回应,我试图确保传递了正确的罐子,但我可能做错了,见下文
$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py
我已将 pymongo
导入到同一个 python 程序中,以验证我至少可以使用它访问 MongoDB,而且我可以。
我知道这里有很多活动部件,所以如果我能提供更多有用的信息,请告诉我。
【问题讨论】:
【参考方案1】:更新:
2016-07-04
自上次更新以来MongoDB Spark Connector 成熟了很多。它提供up-to-date binaries 和基于数据源的API,但它使用SparkConf
配置,因此主观上不如Stratio/Spark-MongoDB 灵活。
2016-03-30
从最初的答案开始,我发现了两种从 Spark 连接到 MongoDB 的不同方法:
mongodb/mongo-spark Stratio/Spark-MongoDB虽然前者似乎相对不成熟,但后者看起来比 Mongo-Hadoop 连接器更好,并提供 Spark SQL API。
# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
.format("com.stratio.datasource.mongodb")
.options(host="mongo:27017", database="foo", collection="bar")
.load())
df.show()
## +---+----+--------------------+
## | x| y| _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+
它似乎比mongo-hadoop-spark
稳定得多,支持谓词下推,无需静态配置,简单易用。
原答案:
确实,这里有很多活动部件。我试图通过构建一个与描述的配置大致匹配的简单 Docker 映像来使其更易于管理(但为了简洁起见,我省略了 Hadoop 库)。你可以找到complete source on GitHub
(DOI 10.5281/zenodo.47882) 并从头开始构建它:
git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .
或者下载一张我pushed to Docker Hub的图片,这样你就可以简单地docker pull zero323/mongo-spark
):
开始图片:
docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash
通过--jars
和--driver-class-path
启动PySpark shell:
pyspark --jars $JARS --driver-class-path $SPARK_DRIVER_EXTRA_CLASSPATH
最后看看它是如何工作的:
import pymongo
import pymongo_spark
mongo_url = 'mongodb://mongo:27017/'
client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
"x": 1.0, "y": -1.0, "x": 0.0, "y": 4.0])
client.close()
pymongo_spark.activate()
rdd = (sc.mongoRDD('0foo.bar'.format(mongo_url))
.map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()
## [(1.0, -1.0), (0.0, 4.0)]
请注意,mongo-hadoop 似乎在第一次操作后关闭了连接。所以在collect之后调用例如rdd.count()
会抛出异常。
基于我在创建此图像时遇到的不同问题,我倾向于相信 传递 mongo-hadoop-1.5.0-SNAPSHOT.jar
和 mongo-hadoop-spark-1.5.0-SNAPSHOT.jar
同时传递给两者 --jars
和 --driver-class-path
是唯一的硬性要求。
注意事项:
这张图片大致基于jaceklaskowski/docker-spark ,所以如果有帮助,请务必向@jacek-laskowski发送一些好的因果报应。 如果不需要包含new API 的开发版本,那么使用--packages
很可能是更好的选择。
【讨论】:
快速问题(如果我应该将此移至一个新问题,请说)。如何使用 pymongo_spark 对 mongo 进行身份验证,我尝试了标准的 mongo 连接 uri 方法mongo_rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/db.collection')
,但它不起作用
说实话我不知道。传入 uri should work。您可以尝试在配置字典 sc.mongoRDD(uri, some_config)
中传递它们,但这只是猜测。
原来是this is my problem。我现在正在解决这个问题。再次感谢
如果您在给出的读取示例之外添加一个写入示例,将会很有帮助。
MongoDB 用户应该有权对集合执行 splitvector 命令。请参阅docs.mongodb.com/manual/reference/built-in-roles 中的 clusterManager 角色。而且,要将 rdd 写入 mongodb,请使用 new_rdd.saveToMongoDB(out_path)
【参考方案2】:
您可以尝试在 spark-submit 命令中使用--package
选项而不是--jars ...
:
spark-submit --packages org.mongodb.mongo-hadoop:mongo-hadoop-core:1.3.1,org.mongodb:mongo-java-driver:3.1.0 [REST OF YOUR OPTIONS]
其中一些 jar 文件不是 Uber jar,需要下载更多依赖项才能开始工作。
【讨论】:
我收到以下错误,指的是下载失败:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS Exception in thread "main" java.lang.RuntimeException: [download failed: com.google.guava#guava;11.0.2!guava.jar, ......
可能是网络的暂时性问题。你能再试一次吗?潜在的异常是什么?
您的建议确实对我有用,但我觉得@zero323 给出了更有用的答案,因此将其标记为正确。【参考方案3】:
我昨天也遇到了同样的问题。能够通过将mongo-java-driver.jar
放入$HADOOP_HOME/lib
和mongo-hadoop-core.jar
和mongo-hadoop-spark.jar
放入$HADOOP_HOME/spark/classpath/emr
(或$SPARK_CLASSPATH
中的任何其他文件夹)来修复它。
如果有帮助,请告诉我。
【讨论】:
它似乎没有帮助,在 $HADOOP_HOME/lib 我有 mongo-java-driver-3.0.4.jar 然后在 /usr/local/share/mongo-hadoop/spark/build/libs/ 我有 mongo-hadoop-core-1.4.1.jar 和 mongo -hadoop-spark-1.5.0-SNAPSHOT.jar。我在运行作业时传递了这个目录 $SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py【参考方案4】:祝你好运!
@见https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage
from pyspark import SparkContext, SparkConf
import pymongo_spark
# Important: activate pymongo_spark.
pymongo_spark.activate()
def main():
conf = SparkConf().setAppName("pyspark test")
sc = SparkContext(conf=conf)
# Create an RDD backed by the MongoDB collection.
# This RDD *does not* contain key/value pairs, just documents.
# If you want key/value pairs, use the mongoPairRDD method instead.
rdd = sc.mongoRDD('mongodb://localhost:27017/db.collection')
# Save this RDD back to MongoDB as a different collection.
rdd.saveToMongoDB('mongodb://localhost:27017/db.other.collection')
# You can also read and write BSON:
bson_rdd = sc.BSONFileRDD('/path/to/file.bson')
bson_rdd.saveToBSON('/path/to/bson/output')
if __name__ == '__main__':
main()
【讨论】:
以上是关于让 Spark、Python 和 MongoDB 协同工作的主要内容,如果未能解决你的问题,请参考以下文章
无法序列化类 org.apache.hadoop.io.DoubleWritable - MongoDB Hadoop 连接器 + Spark + Python
Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢
大数据 | MongoDB + Spark: 完整的大数据解决方案
MongoDB & Spark:mongo-hadoop 和 mongo-spark 的区别
spark 读取mongodb失败,报executor time out 和GC overhead limit exceeded 异常