如何加快 Spark SQL 单元测试?
Posted
技术标签:
【中文标题】如何加快 Spark SQL 单元测试?【英文标题】:How to speed up Spark SQL unit tests? 【发布时间】:2015-11-29 14:57:15 【问题描述】:我正在评估 Spark SQL 以实现一个简单的报告模块(很少对已经存储在 HDFS 上的 Avro 数据进行简单聚合)。我毫不怀疑 Spark SQL 可以很好地满足我的功能性和非功能性需求。
但是,除了生产要求之外,我还想确保该模块是可测试的。我们采用 BDD 方法,场景非常集中,这意味着该模块将需要对一些非常简单的数据(1..10 条记录)运行数十/数百个 SQL 查询。
为了大致了解本地模式下 Spark SQL 的性能,我快速构建了一些测试原型:
select count(*) from myTable
select key, count(*) from myTable group by key
第一个测试平均需要 100 毫秒,但第二个测试需要 500 毫秒。这样的性能是不可接受的,这会使测试套件变得太慢。
为了比较,我可以使用 Crunch 及其 MemPipeline 在 10 毫秒内运行相同的测试(在本地模式下使用 MRPipeline 为 1500 毫秒),在嵌入式模式下使用 Hive 也可以在 1500 毫秒内运行。因此,Spark SQL 在本地模式下比 MR 快一点,但在构建良好的测试套件方面仍然很慢。
是否可以在本地模式下加速 Spark SQL?
是否有更好/更快的方法来测试 Spark SQL 模块?
(我还没有分析执行,但是由于 RDD 上的 groupBy().countByKey()
平均需要 40 毫秒,我希望发现罪魁祸首是查询优化器)
我的快速而肮脏的测试代码如下:
SparkConf sparkConf = new SparkConf()
.setMaster("local[4]")
.setAppName("poc-sparksql");
try (JavaSparkContext ctx = new JavaSparkContext(sparkConf))
SQLContext sqlCtx = new SQLContext(ctx);
for (int i = 0; i < ITERATIONS; i++)
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
df.registerTempTable("myTable");
DataFrame result = sqlCtx.sql("select count(*) from myTable");
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
for (int i = 0; i < ITERATIONS; i++)
Stopwatch testCaseSw = new Stopwatch().start();
DataFrame df = sqlCtx.load("/tmp/test.avro", "com.databricks.spark.avro");
df.registerTempTable("myTable");
DataFrame result = sqlCtx.sql("select a, count(*) from myTable group by a ");
System.out.println("Results: " + result.collectAsList());
System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
【问题讨论】:
你考虑过缓存吗? 如果你在同一数据上测试不同的查询,加载数据一次..然后查询.. 根据我的测试,缓存没有帮助(sql 调用很慢)。我更多地考虑的是能够禁用一些优化。我不认为缓存是一种解决方案,因为 1-“好”测试的输入旨在使给定行为易于理解,因此每个测试都有不同的输入。我提供的草率代码并没有试图模仿测试套件会做什么(小黄瓜表的自动 Avro 序列化等) 2-如果输入始终相同,则 SQL 查询是确定性的,然后我缓存了收集的输出数据而不是输入 已经一年了,但您是否真的找到了使用 .sql() 的慢速测试的解决方案?如果是这样,你能和我们分享一下吗?在我的本地环境中,spark 的初始初始化已经花费了大约 20 秒,但它的测试一直在花费(3 分钟)。 Apache Crunch 仍然非常适合我们所做的大多数事情,并决定坚持下去。编写一些额外的代码似乎比了解如何使用 Spark 及其“仅廉价集成测试”设计编写好的测试套件更好。不过,我仍然对这个话题感兴趣。 【参考方案1】:当数据量非常小时,启动太多任务不是一个好选择。 在您的第二个选项中,group by
将使用200 tasks
创建另一个stage
,因为您没有设置 shuffle partitions 属性,默认为200
,大部分为空。
它可能对单个测试没有影响,但当您有数千个带有随机操作的测试时,它可能会产生重大影响。
在 spark conf 中将 "spark.sql.shuffle.partitions"
设置为 x (where x is local[x]
)。
实际上你不需要4 executors
来处理少于10条记录,所以最好将执行者的数量减少到1
,并将shuffle.paritions
设置为1
。
【讨论】:
spark.sql.shuffle.partitions 还是参数吗?我在 2.4.5 文档中找不到它。我打算检查以验证,但我认为随机分区甚至不会在本地模式下使用。 我认为你可以做几个连接(或任何随机操作)并检查输出帧的分区(或 UI 上的任务)。请在spark.apache.org/docs/latest/sql-performance-tuning.html 上找到该物业【参考方案2】:如果您正在研究 ms 级别的优化,则有各种指针。
-
一次读取您的数据并缓存和多次对其进行 SQL 查询。在循环加载中意味着“在everyIteartion 中生成新任务”
DataFrame df = sqlCtx.load("/tmp/test.avro","com.databricks.spark.avro"); df.registerTempTable("myTable"); df.cache() for (int i = 0; i < ITERATIONS; i++) Stopwatch testCaseSw = new Stopwatch().start(); DataFrame result = sqlCtx.sql("select count(*) from myTable"); // Dont do printLn inside the loop , save the output in some hashMap and print it later once the loop is complete System.out.println("Results: " + result.collectAsList()); System.out.println("Elapsed: " + testCaseSw.elapsedMillis());
-
在循环外提取 System.out.println 会消耗一些时间。
请看一下: http://bytepadding.com/big-data/spark/understanding-spark-through-map-reduce/
【讨论】:
【参考方案3】:我使用由 Holden Karau 开发的 spark-testing-base
库在 Spark 中进行单元测试。
在相关的README.md
s 中,您可以找到更多信息以调整资源以分配给单元测试。
【讨论】:
以上是关于如何加快 Spark SQL 单元测试?的主要内容,如果未能解决你的问题,请参考以下文章
无法在 Spark-2.2.0 - Scala-2.11.8 上运行单元测试(scalatest)