Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢
Posted
技术标签:
【中文标题】Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢【英文标题】:Spark - MongoDb - slow dataframe.limit(2) in java when compared with the pyspark version 【发布时间】:2018-06-20 21:23:11 【问题描述】:有以下 python 脚本:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('Test') \
.config("spark.driver.extraJavaOptions", "-Xss1G") \
.master('local[*]') \
.getOrCreate()
dataframe = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database",
"finance")\
.option("collection", "finished13").option(
"uri", "mongodb://localhost:27017/test") \
.option("pipeline",
"[ '$sort': 'prop1':-1 , '$limit': 700]").load()
dataframe = dataframe.limit(2)
dataframe.show()
Spark 最初是通过对 Mongodb 结构进行采样开始的,因此 load() 调用需要一段时间。 Dataframe.limit(2) 调用是即时的(如预期的那样),因为 spark 是一个惰性执行器。另一方面,我用 Java 编写了完全相同的代码,如下所示:
SparkSession session = SparkSession
.builder()
.master("local[*]")
.appName("Java Spark SQL basic example")
.config("spark.driver.extraJavaOptions", "-Xss1G")
.getOrCreate();
Dataset<Row> dataset = session.read().format("com.mongodb.spark.sql.DefaultSource").option("collection", "finished13")
.option("uri", "mongodb://localhost:27017/test").option("pipeline",
"[ '$sort': 'prop1':-1 , '$limit': 700]").load();
dataset = dataset.limit(2);
Object rows = dataset.collect();
在 Java 版本中,dataset.limit(2) 大约需要 9 分钟才能完成。它是在同一个集合中采样的。集合包含大约 30k 个文档。每个文档具有相同的结构并包含大约 27k 属性(第一级,无嵌套)。平均文档大小为 1.5MB。知道为什么 Java 版本需要永恒才能完成吗?
【问题讨论】:
深入挖掘后,发现很少:关于以下问题-***.com/questions/46832394/…,***.com/questions/35869884/…,issues.apache.org/jira/browse/SPARK-15689-看起来Java/Scala中没有直接提供下推spark-mongodb 驱动程序(也可在 Scala 中重现)。 我仍然不明白为什么它急切地等待命令,而不是仅在收集 DF 时执行 所以在我将属性数量减少到 cca 3k 之后,转换再次开始变得足够快。看起来数据集操作与属性的数量密切相关。 如果其他人偶然发现同样的问题,只是为了未来的人 - 使用 mongo-java-driver-3.7.0,mongo-spark-connector_2.11-2.2.2 驱动程序可以这样工作,当添加一些操作,例如限制、选择等,它会遍历所有列。我已经有 30k 了,因此需要很长时间。我的解决方法是通过本地 Java 驱动程序创建新集合,其中仅存储聚合文档(字段更少)并通过 spark API 访问此集合。 【参考方案1】:如果其他人偶然发现同样的问题,只是为了未来的人们 - 使用 mongo-java-driver-3.7.0、mongo-spark-connector_2.11-2.2.2 驱动程序可以在添加少量操作时工作,例如限制,选择等它遍历所有列。我已经有 30k 了,因此需要很长时间。我的解决方法是通过本地 Java 驱动程序创建新集合,其中仅存储聚合文档(具有更少的字段)并通过 spark API 访问此集合。
【讨论】:
以上是关于Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢的主要内容,如果未能解决你的问题,请参考以下文章
YMatrix + PLPython替代Spark实现车联网算法
使用 Jupyter Notebook 为 PySpark 内核设置 spark.app.name