简单的 Spark 应用程序在 3 GB 数据上运行缓慢

Posted

技术标签:

【中文标题】简单的 Spark 应用程序在 3 GB 数据上运行缓慢【英文标题】:Simple Spark app running slow on 3 gb of data 【发布时间】:2018-03-12 20:18:05 【问题描述】:

Spark 运行缓慢。 我总共有 3 GB 的数据(六个 .csv 文件)。 3000 万条记录。 我尝试在 Elastic Map Reduce AWS 集群 (EMR) 上运行简单的 Spark 应用程序。 但是性能很慢。

我需要对这 3000 万条记录进行一些操作: - 将它们过滤成 4 组。 - 对于每个组计算一些指标,请参阅 getTotalOps() 方法

我测量了时间,我花了大约 15 分钟来运行这部分代码。我不计入这 15 分钟的时间来从文件中读取数据并从中创建 JavaRDD。

 // Read data from 6 files and create one JavaRDD from those files:

   JavaRDD<PurchasesDataObject> allRecords = context.textFile(inputFilePath).map (
                    data -> 
                        String[] fields = data.split(",");
                        return new PurchasesDataObject(fields[0], fields[1], fields[2], fields[3],fields[4]);                       
                    );


    // Filter all records into 4 groups:
JavaRDD<PurchasesDataObject> collectionControl = allRecords.filter(record -> "C".equals(record.getTreatmentName()));

JavaRDD<PurchasesDataObject> collectionTreatment = allRecords.filter(record -> "T1".equals(record.getTreatmentName()));

JavaRDD<PurchasesDataObject> collectionControlNW = collectionControl.filter(record -> return webLabSessionVideoWatchedList.get(record.getSessionId()) == null ? true : false; );

JavaRDD<PurchasesDataObject> collectionControlW = collectionControl.filter(record -> return webLabSessionVideoWatchedList.get(record.getSessionId()) != null ? true : false;);

// Calculate metrics:
    double totalOpsControl = getTotalOps(collectionControl);
    double totalOpsTreatment = getTotalOps(collectionTreatment);
    double totalOpsControlNW = getTotalOps(collectionControlNW);
    double totalOpsControlW = getTotalOps(collectionControlW);

// .... Output results





private double getTotalOps(JavaRDD<PurchasesDataObject> dataCollection) 
        return dataCollection
                .mapToDouble(data -> data.getPrice() * data.getQuantity())
                .sum();

【问题讨论】:

【参考方案1】:
allRecords.cache

在对同一数据执行不同操作之前执行缓存。所以它不会从 Disk 或 S3 重复读取它。

【讨论】:

【参考方案2】:

.cache() 会有所帮助,但 Spark 很慢,因此 15 分钟处理 3000 万行对于 Spark 来说是一个不错的性能。

【讨论】:

感谢您建议使用 cache()。你能解释一下为什么它很慢吗?我被推荐在我的用例中使用 Spark。但现在似乎使用纯 Java 和 Java8 在没有 Spark 的情况下执行所有计算会更快。我的用例是在数据集(3-5 GB 数据)上计算 10-15 个指标(就像您在下面的代码中看到的那样)。如果一个指标的时间是 15 分钟,您认为可以等待 15*10 = 150 分钟来获得 10 个指标,就像我上面计算的那样? [指标,如 avrg、sum、avg with group by 等] Spark 是一个用 Java 和 Scala 编写的复杂机器。它的主要目的是使用 HDFS 和并行处理来处理大量数据。如果您可以将数据放入内存,则没有理由使用 Spark。

以上是关于简单的 Spark 应用程序在 3 GB 数据上运行缓慢的主要内容,如果未能解决你的问题,请参考以下文章

根据大小(mb/gb)读取 spark 数据帧

Spark 12 GB 数据加载与 Window 函数性能问题

Apache Spark 如何处理不适合内存的数据?

spark如何加载大于集群磁盘大小的输入文件?

spark运行任务报错:Container [...] is running beyond physical memory limits. Current usage: 3.0 GB of 3 GB

在 Spark 中高效地连接一个大表(1TB)和另一个小表(250GB)