如何测试 Spark RDD

Posted

技术标签:

【中文标题】如何测试 Spark RDD【英文标题】:How to Test Spark RDD 【发布时间】:2017-04-20 00:00:00 【问题描述】:

我不确定我们是否可以在 Spark 中测试 RDD。

我看到一篇文章说模拟 RDD 不是一个好主意。是否有任何其他方法或任何测试 RDD 的最佳实践

【问题讨论】:

你看过霍顿的spark-test-base了吗? 【参考方案1】:

感谢您提出这个悬而未决的问题。出于某种原因,当谈到 Spark 时,每个人都沉迷于分析,以至于忘记了过去 15 年左右出现的伟大的软件工程实践。这就是为什么我们在课程中重点讨论测试和持续集成(以及 DevOps 等其他内容)。

术语简介

在我继续之前,我必须对@himanshuIIITian 引用的 KnolX 演示文稿表示一点不同意。 true 单元测试意味着您可以完全控制测试中的每个组件。不能与数据库、REST 调用、文件系统甚至系统时钟进行交互;正如 Gerard Mezaros 在xUnit Test Patterns 中所说的那样,一切都必须“加倍”(例如模拟、存根等)。我知道这看起来像是语义,但它确实很重要。未能理解这一点是您在持续集成中看到间歇性测试失败的主要原因之一。

我们仍然可以进行单元测试

因此,鉴于这种理解,对RDD 进行单元测试是不可能的。但是,在开发分析时仍然有单元测试的地方。

(注意:我将使用 Scala 作为示例,但概念超越了语言和框架。)

考虑一个简单的操作:

rdd.map(foo).map(bar)

这里的foobar 是简单的函数。这些可以以正常方式进行单元测试,并且它们应该包含尽可能多的极端案例。毕竟,他们为什么关心他们从哪里获得输入,无论是测试夹具还是RDD

别忘了 Spark Shell

这不是测试本身,但在这些早期阶段,您还应该在 Spark shell 中进行试验,以找出您的转换,尤其是您的方法的后果。例如,您可以使用toDebugStringexplainglomshowprintSchema 等许多不同的函数检查物理和逻辑查询计划、分区策略和保存以及数据状态在。我会让你探索这些。

您还可以在 Spark shell 和测试中将您的 master 设置为 local[2],以识别只有在您开始分发工作时才可能出现的任何问题。

使用 Spark 进行集成测试

现在是有趣的东西。

为了在您对辅助函数和RDD/DataFrame 转换逻辑的质量有信心之后进行集成测试 Spark,做一些事情至关重要(无论构建工具和测试框架):

增加 JVM 内存。 启用分叉但禁用并行执行。 使用您的测试框架将您的 Spark 集成测试累积到套件中,并在所有测试之前初始化 SparkContext,并在所有测试之后停止它。

有几种方法可以做到这一点。一个可从@Pushkr 引用的spark-testing-base 和@himanshuIIITian 链接的KnolX 演示文稿中获得。

贷款模式

另一种方法是使用Loan Pattern。

例如(使用 ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup 
  "My analytics" should 
    "calculate the right thing" in withSparkContext  (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    
  


trait SparkContextSetup 
  def withSparkContext(testMethod: (SparkContext) => Any) 
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try 
      testMethod(sparkContext)
    
    finally sparkContext.stop()
  
 

如您所见,贷款模式利用高阶函数将SparkContext“贷款”给测试,然后在完成后将其丢弃。

面向痛苦的编程(谢谢,Nathan)

这完全是一个偏好问题,但在引入另一个框架之前,我更喜欢使用贷款模式并尽可能地自行连接。除了试图保持轻量级之外,框架有时会添加很多“魔法”,使调试测试失败难以推理。所以我采用Suffering-Oriented Programming 方法——我避免添加一个新框架,直到没有它的痛苦无法承受。但同样,这取决于您。

现在 spark-testing-base 真正大放异彩的一个地方是使用基于 Hadoop 的帮助程序,例如 HDFSClusterLikeYARNClusterLike。将这些特征混合在一起确实可以为您节省很多设置痛苦。它发光的另一个地方是Scalacheck-like 属性和生成器。但同样,我个人会推迟使用它,直到我的分析和测试达到那种复杂程度。

使用 Spark Streaming 进行集成测试

最后,我想介绍一下带有内存值的 SparkStreaming 集成测试设置的外观:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

这比看起来简单。它实际上只是将数据序列转换为队列以提供给DStream。其中大部分实际上只是适用于 Spark API 的样板设置。

这可能是我有史以来最长的帖子,所以我会留在这里。我希望其他人能加入其他想法,通过改进所有其他应用程序开发的相同敏捷软件工程实践来帮助提高我们的分析质量。

对于无耻的插件表示歉意,您可以查看我们的课程 Analytics with Apache Spark,我们在其中讨论了很多这些想法等等。我们希望尽快有一个在线版本。

【讨论】:

【参考方案2】:

有 2 种测试 Spark RDD/应用程序的方法。它们如下:

例如

要测试的单元

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

class WordCount  
  def get(url: String, sc: SparkContext): RDD[(String, Int)] =  
    val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
   

现在方法一测试如下:

import org.scalatest. BeforeAndAfterAll, FunSuite 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

class WordCountTest extends FunSuite with BeforeAndAfterAll  
  private var sparkConf: SparkConf = _ 
  private var sc: SparkContext = _ 

  override def beforeAll()  
    sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local") 
    sc = new SparkContext(sparkConf) 
   

  private val wordCount = new WordCount 

  test("get word count rdd")  
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10)
    

  override def afterAll()  
    sc.stop() 
   

在方法 1 中,我们不是在模拟 RDD。我们只是在检查 WordCount 类的行为。但是这里我们必须自己管理 SparkContext 的创建和销毁。因此,如果您不想为此编写额外的代码,则可以使用spark-testing-base,如下所示:

方法二

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 

class WordCountTest extends FunSuite with SharedSparkContext  
  private val wordCount = new WordCount 

  test("get word count rdd")  
    val result = wordCount.get("file.txt", sc)
    assert(result.take(10).length === 10) 
   

或者

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 
import com.holdenkarau.spark.testing.RDDComparisons 

class WordCountTest extends FunSuite with SharedSparkContext with RDDComparisons  
  private val wordCount = new WordCount 

  test("get word count rdd with comparison")  
    val expected = sc.textFile("file.txt")
                     .flatMap(_.split(" "))
                     .map((_, 1))
                     .reduceByKey(_ + _) 

    val result = wordCount.get("file.txt", sc)

    assert(compareRDD(expected, result).isEmpty)
    

有关 Spark RDD 测试的更多详细信息,请参阅此 - KnolX: Unit Testing of Spark Applications

【讨论】:

我的只是一个小程序,所以我正在尝试使用 Method:1 ,但是您(himanshu)在方法 1 中显示的并不是比较 RDD。您正在对该 RDD 执行操作,然后您尝试将其等同于整数值。我想要的是比较 2 个 RDD... 让我们说 RDD[myClass] === RDD[myClass] 为了比较RDD,你应该使用RDDComparisons,在方法2中提到。 但这是通过使用 Some 开发的自定义库,该库仍在开发中,而不是像 Apache 这样的大伞。它可能还没有准备好生产。

以上是关于如何测试 Spark RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark程序进行单元测试-使用scala

Spark核心编程---创建RDD

Spark 调优之RDD持久化级别及kryo序列化性能测试

spark记录spark Core之RDD

基准测试 Spark 代码中有趣的性能变化

有啥关于 Spark 的书推荐?