Spark Scala动态创建可序列化对象
Posted
技术标签:
【中文标题】Spark Scala动态创建可序列化对象【英文标题】:Spark Scala Dynamic creation of Serializable object 【发布时间】:2017-09-03 17:32:42 【问题描述】:我需要为 Scala Spark 过滤器使用测试器,测试器实现 java 的 Predicate 接口并通过参数接收特定的类名。 我正在做这样的事情
val tester = Class.forName(qualifiedName).newInstance().asInstanceOf[Predicate[T]]
var filtered = rdd.filter(elem => tester.test(elem))
问题是在运行时我有一个 Spark“TaskNotSerializable Exception”,因为我的特定 Predicate 类不是 Serializable。
如果我这样做了
val tester = Class.forName(qualifiedName).newInstance()
.asInstanceOf[Predicate[T] with Serializable]
var filtered = rdd.filter(elem => tester.test(elem))
我得到同样的错误。 如果我在 rdd.filter 中创建测试器调用它可以工作:
var filtered = rdd.filter elem =>
val tester = Class.forName(qualifiedName).newInstance()
.asInstanceOf[Predicate[T] with Serializable]
tester.test(elem)
但我会创建一个对象(也许是广播)进行测试。我该如何解决?
【问题讨论】:
【参考方案1】:您只需要要求类实现Serializable
。请注意,asInstanceOf[Predicate[T] with Serializable]
转换是一个谎言:它实际上并没有检查值是Serializable
,这就是为什么第二种情况在转换期间不会立即产生错误,而最后一种情况“成功”。
但我会创建一个对象(也许要广播)用于测试。
你不能。广播与否,反序列化将在工作节点上创建 new 对象。但是您只能在每个分区上创建一个实例:
var filtered = rdd.mapPartitions iter =>
val tester = Class.forName(qualifiedName).newInstance()
.asInstanceOf[Predicate[T]]
iter.filter(tester.test)
它实际上比序列化tester
、发送它和反序列化它的性能更好,因为它的工作量非常少。
【讨论】:
哦,谢谢你的建议!但我得到两个错误:No ClassTag available for T Error occurred in an application involving default arguments.
not enough arguments for method mapPartitions: (implicit evidence$6: scala.reflect.ClassTag[T])org.apache.spark.rdd.RDD[T]. Unspecified value parameter evidence$6. Error occurred in an application involving default arguments.
我添加了preservePartitions布尔值(为假),但我不明白在问什么(API中没有证据)
一些 Spark 操作需要隐式的 ClassTag
参数。有关ClassTag
s 的描述,请参阅docs.scala-lang.org/overviews/reflection/…,或仅搜索更多信息。在这里,假设T
是您的方法/类的类型参数,您只需向其添加: ClassTag
(即,如果此代码在def foo[T](...)
方法中,则将其替换为def foo[T: ClassTag]
)。编译和更新任何类似编译错误的调用者。
mmh 什么函数应该是 foo[T]..?我使用 java.util.function.Predicate 中的 test(T) 方法进行过滤,过滤后的元素也是一个 java 类,所以我不知道在哪里放置 classTag 标记
T
在asInstanceOf[Predicate[T]]
代码中的任何位置声明。以上是关于Spark Scala动态创建可序列化对象的主要内容,如果未能解决你的问题,请参考以下文章
通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧
spark sql - 如何在 spark sql 中编写动态查询