Spark scala 模拟 spark.implicits 用于单元测试

Posted

技术标签:

【中文标题】Spark scala 模拟 spark.implicits 用于单元测试【英文标题】:Spark scala mocking spark.implicits for unit testing 【发布时间】:2020-10-26 14:46:52 【问题描述】:

在尝试使用 Spark 和 Scala 简化单元测试时,我使用的是 scala-test 和 mockito-scala(以及 mockito 糖)。这只是让你做这样的事情:

val sparkSessionMock = mock[SparkSession]

然后,您通常可以使用“何时”和“验证”来完成所有魔法。

但是如果你有一些实现必须导入

import spark.implicits._

在它的代码中,单元测试的简单性似乎消失了(或者至少我还没有找到解决这个问题的最合适的方法)。

我最终得到了这个错误:

org.mockito.exceptions.verification.SmartNullPointerException: 
You have a NullPointerException here:
-> at ...
because this method call was *not* stubbed correctly:
-> at scala.Option.orElse(Option.scala:289)
sparkSession.implicits();

由于打字问题,简单地模拟 SparkSession 内“隐式”对象的调用将无济于事:

val implicitsMock = mock[SQLImplicits]
when(sparkSessionMock.implicits).thenReturn(implicitsMock)

不会让你通过,因为它说它需要你的模拟对象的类型:

require: sparkSessionMock.implicits.type
found: implicitsMock.type

请不要告诉我我应该做 SparkSession.builder.getOrCreate()... 因为那不再是单元测试而是更重量级的集成测试。

(编辑):这是一个完整的可重现示例:

import org.apache.spark.sql._
import org.mockito.Mockito.when
import org.scalatest. FlatSpec, Matchers 
import org.scalatestplus.mockito.MockitoSugar

case class MyData(key: String, value: String)

class ClassToTest()(implicit spark: SparkSession) 
    import spark.implicits._

    def read(path: String): Dataset[MyData] = 
         spark.read.parquet(path).as[MyData]


class SparkMock extends FlatSpec with Matchers with MockitoSugar 

     it should "be able to mock spark.implicits" in 
         implicit val sparkMock: SparkSession = mock[SparkSession]
         val implicitsMock = mock[SQLImplicits]
         when(sparkMock.implicits).thenReturn(implicitsMock)
         val readerMock = mock[DataFrameReader]
         when(sparkMock.read).thenReturn(readerMock)
         val dataFrameMock = mock[DataFrame]
         when(readerMock.parquet("/some/path")).thenReturn(dataFrameMock)
         val dataSetMock = mock[Dataset[MyData]]
         implicit val testEncoder: Encoder[MyData] = Encoders.product[MyData]
         when(dataFrameMock.as[MyData]).thenReturn(dataSetMock)

         new ClassToTest().read("/some/path/") shouldBe dataSetMock
    
 

【问题讨论】:

我不确定我是否完全理解你在做什么,但when[SQLImplicits](sparkSessionMock.implicits).thenReturn(implicitsMock) 似乎可以编译。你有可重现的例子吗? 请查看article,其中SparkSession 被嘲笑(链接来自question)。 @DmytroMitin 感谢本文的提示,这是一个非常好的示例,非常有帮助。我之前已经找到了这个,正在寻找解决这个问题的方法。这个例子的不幸之处在于它没有模拟“import spark.implicits._” @DmytroMitin 我在原始帖子中添加了一个完整的示例。我还尝试了您建议将类型添加到“when[...]...”的更改,但是在运行此程序时,您会收到另一个错误,因为它与“sparkMock.implicits.type”不同: 【参考方案1】:

你不能模拟隐式。隐式在编译时解决,而模拟发生在运行时(运行时反射,字节码操作通过 Byte Buddy)。您不能在编译时导入仅在运行时模拟的隐式。您必须手动解析隐式(原则上,如果您在运行时再次启动编译器,您可以在运行时解析隐式,但这会更难1234)。

试试

class ClassToTest()(implicit spark: SparkSession, encoder: Encoder[MyData]) 
  def read(path: String): Dataset[MyData] = 
    spark.read.parquet(path).as[MyData]


class SparkMock extends AnyFlatSpec with Matchers with MockitoSugar 

  it should "be able to mock spark.implicits" in 
    implicit val sparkMock: SparkSession = mock[SparkSession]
    val readerMock = mock[DataFrameReader]
    when(sparkMock.read).thenReturn(readerMock)
    val dataFrameMock = mock[DataFrame]
    when(readerMock.parquet("/some/path")).thenReturn(dataFrameMock)
    val dataSetMock = mock[Dataset[MyData]]
    implicit val testEncoder: Encoder[MyData] = Encoders.product[MyData]
    when(dataFrameMock.as[MyData]).thenReturn(dataSetMock)

    new ClassToTest().read("/some/path") shouldBe dataSetMock
  


//[info] SparkMock:
//[info] - should be able to mock spark.implicits
//[info] Run completed in 2 seconds, 727 milliseconds.
//[info] Total number of tests run: 1
//[info] Suites: completed 1, aborted 0
//[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
//[info] All tests passed.

请注意"/some/path" 在两个地方应该是相同的。在您的代码 sn-p 中,两个字符串不同。

【讨论】:

感谢您的回答。这也是我最终如何重新编写代码的方式。关于这一点的问题部分仍然是,通常人们倾向于简单地使用大量导入隐式。所以,如果你有一个更大的代码库,你会发现很多。它需要到处更换。这就是我想知道是否有机会也简单地嘲笑它的原因。 :)

以上是关于Spark scala 模拟 spark.implicits 用于单元测试的主要内容,如果未能解决你的问题,请参考以下文章

原创用Scala和Spark实现机票Shopping系统

代做kafka storm spark2 mysql oracle hive3 java python scala代做

Spark安装

如何在CDH5上运行Spark应用

Spark采用Scala,是因为Scala支持函数式编程吗?

spark哪个版本支持scala2.11