Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢

Posted

技术标签:

【中文标题】Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢【英文标题】:Spark - MongoDb - slow dataframe.limit(2) in java when compared with the pyspark version 【发布时间】:2018-06-20 21:23:11 【问题描述】:

有以下 python 脚本:

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('Test') \
    .config("spark.driver.extraJavaOptions", "-Xss1G") \
    .master('local[*]') \
    .getOrCreate()

dataframe = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database",
                                                                                    "finance")\
    .option("collection", "finished13").option(
            "uri", "mongodb://localhost:27017/test") \
    .option("pipeline",
            "[  '$sort': 'prop1':-1 , '$limit': 700]").load()

dataframe = dataframe.limit(2)

dataframe.show()

Spark 最初是通过对 Mongodb 结构进行采样开始的,因此 load() 调用需要一段时间。 Dataframe.limit(2) 调用是即时的(如预期的那样),因为 spark 是一个惰性执行器。另一方面,我用 Java 编写了完全相同的代码,如下所示:

SparkSession session = SparkSession
        .builder()
        .master("local[*]")
        .appName("Java Spark SQL basic example")
        .config("spark.driver.extraJavaOptions", "-Xss1G")
        .getOrCreate();
Dataset<Row> dataset = session.read().format("com.mongodb.spark.sql.DefaultSource").option("collection", "finished13")
        .option("uri", "mongodb://localhost:27017/test").option("pipeline",
                "[  '$sort': 'prop1':-1 , '$limit': 700]").load();


dataset = dataset.limit(2);

Object rows = dataset.collect();

在 Java 版本中,dataset.limit(2) 大约需要 9 分钟才能完成。它是在同一个集合中采样的。集合包含大约 30k 个文档。每个文档具有相同的结构并包含大约 27k 属性(第一级,无嵌套)。平均文档大小为 1.5MB。知道为什么 Java 版本需要永恒才能完成吗?

【问题讨论】:

深入挖掘后,发现很少:关于以下问题-***.com/questions/46832394/…,***.com/questions/35869884/…,issues.apache.org/jira/browse/SPARK-15689-看起来Java/Scala中没有直接提供下推spark-mongodb 驱动程序(也可在 Scala 中重现)。 我仍然不明白为什么它急切地等待命令,而不是仅在收集 DF 时执行 所以在我将属性数量减少到 cca 3k 之后,转换再次开始变得足够快。看起来数据集操作与属性的数量密切相关。 如果其他人偶然发现同样的问题,只是为了未来的人 - 使用 mongo-java-driver-3.7.0,mongo-spark-connector_2.11-2.2.2 驱动程序可以这样工作,当添加一些操作,例如限制、选择等,它会遍历所有列。我已经有 30k 了,因此需要很长时间。我的解决方法是通过本地 Java 驱动程序创建新集合,其中仅存储聚合文档(字段更少)并通过 spark API 访问此集合。 【参考方案1】:

如果其他人偶然发现同样的问题,只是为了未来的人们 - 使用 mongo-java-driver-3.7.0、mongo-spark-connector_2.11-2.2.2 驱动程序可以在添加少量操作时工作,例如限制,选择等它遍历所有列。我已经有 30k 了,因此需要很长时间。我的解决方法是通过本地 Java 驱动程序创建新集合,其中仅存储聚合文档(具有更少的字段)并通过 spark API 访问此集合。

【讨论】:

以上是关于Spark - MongoDb - 与 pyspark 版本相比,java 中的 dataframe.limit(2) 慢的主要内容,如果未能解决你的问题,请参考以下文章

YMatrix + PLPython替代Spark实现车联网算法

使用 Jupyter Notebook 为 PySpark 内核设置 spark.app.name

Spark教程Spark连接MongoDB

当MongoDB遇见Spark

Pyspark - 配置 Amazon Redshift JDBC jar

MongoDB 遇见 spark(进行整合)