运行Spark测试时出现ClassCastException
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了运行Spark测试时出现ClassCastException相关的知识,希望对你有一定的参考价值。
我已将mockito(与Guice)添加到我们的测试环境中,如下所示:
class SparkModuleWithMocks extends AbstractModule with JsonFormats {
override def configure(): Unit = {
//bind(classOf[TrafficFilterRules]).toInstance(trafficFilterRulesMock)
bind(classOf[TrafficFilterRules]).toProvider(new Provider[TrafficFilterRules]{
override def get(): TrafficFilterRules = {
val trafficFilterRulesMock: TrafficFilterRules = mock[TrafficFilterRules](withSettings().serializable())
val stream = getClass.getResourceAsStream("/trafficFilterRules.json")
val lines = scala.io.Source.fromInputStream( stream ).getLines.mkString
val array = parse(lines).extract[List[FilterRules]].toArray
when(trafficFilterRulesMock.trafficFilterRulesTable).thenReturn(array)
trafficFilterRulesMock
}
})
bind(classOf[SiteTable]).toProvider(new Provider[SiteTable]{
override def get(): SiteTable = {
val siteTableMock: SiteTable = mock[SiteTable](withSettings().serializable())
val stream = getClass.getResourceAsStream("/siteDomains.json")
val lines = scala.io.Source.fromInputStream( stream ).getLines.mkString
val array = parse(lines).extract[List[SiteDomain]].toArray
when(siteTableMock.siteDomains).thenReturn(array)
siteTableMock
}
})
bind(classOf[SparkSession]).toProvider(classOf[SparkSessionProvider])
}
}
val injectorWithMocks: Injector = Guice.createInjector(new SparkModuleWithMocks)
SparkSessionProvider是我们自己的类,它为guice重写get()并构建sparkSession。使用injectorWithMocks,我注入sparkSession和我们测试的服务如下:
val sparkSession = injector.instance[SparkSession]
val clickoutService = injectorWithMocks.instance[ClickoutEnrichmentService]
当我从Intellij运行测试时,一切正常,但是当我从sbt命令行运行它时,例如:
sbt "testOnly *ClickoutEnrichmentServiceTest"
我收到以下错误:
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 49.0 failed 1 times, most recent failure: Lost task 0.0 in
stage 49.0 (TID 68, localhost, executor driver): java.lang.ClassCastException:
cannot assign instance of scala.collection.immutable.List$SerializationProxy
to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of
type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
我已经阅读了几个关于这个问题的门票,但它们都与运行spark集群有关,而不是本地或处于测试模式。
有人可以解释这个错误的原因是什么,解决它的好方向是什么?
谢谢你
我也碰到了这个。我最终从@K.Chen发现了这个问题Mockito's mock throw ClassNotFoundException in Spark application
他为他的问题提供了一个解决方案,即在你的代码中使用mock[SiteTable](withSettings().serializable(SerializableMode.ACROSS_CLASSLOADERS))
。也就是说,通过额外的param SerializableMode.ACROSS_CLASSLOADERS
不幸的是,正如他的问题所指出的,他不确定为什么这会解决问题。
在Mockito 2.13.1中SerializableMode
的源代码中,我们发现以下评论:
/**
* Useful if the mock is deserialized in a different classloader / vm.
*/
@Incubating
ACROSS_CLASSLOADERS
source
也许执行者与主人的类加载器不同?
以上是关于运行Spark测试时出现ClassCastException的主要内容,如果未能解决你的问题,请参考以下文章
从 apache Spark 运行 java 程序时出现 ClassNotFound 异常
本地运行 spark 作业时出现“Scheme 没有文件系统:gs”
在 Zeppelin 0.7.1 中运行 Spark 代码时出现 NullPointerException
尝试使用 python 3 运行 Spark 时出现几个错误