Spark Scala动态创建可序列化对象

Posted

技术标签:

【中文标题】Spark Scala动态创建可序列化对象【英文标题】:Spark Scala Dynamic creation of Serializable object 【发布时间】:2017-09-03 17:32:42 【问题描述】:

我需要为 Scala Spark 过滤器使用测试器,测试器实现 java 的 Predicate 接口并通过参数接收特定的类名。 我正在做这样的事情

val tester = Class.forName(qualifiedName).newInstance().asInstanceOf[Predicate[T]]
var filtered = rdd.filter(elem => tester.test(elem))

问题是在运行时我有一个 Spark“TaskNotSerializable Exception”,因为我的特定 Predicate 类不是 Serializable。

如果我这样做了

val tester = Class.forName(qualifiedName).newInstance()
             .asInstanceOf[Predicate[T] with Serializable]
var filtered = rdd.filter(elem => tester.test(elem))

我得到同样的错误。 如果我在 rdd.filter 中创建测试器调用它可以工作:

var filtered = rdd.filter  elem => 
    val tester = Class.forName(qualifiedName).newInstance()
             .asInstanceOf[Predicate[T] with Serializable]
    tester.test(elem)

但我会创建一个对象(也许是广播)进行测试。我该如何解决?

【问题讨论】:

【参考方案1】:

您只需要要求类实现Serializable。请注意,asInstanceOf[Predicate[T] with Serializable] 转换是一个谎言:它实际上并没有检查值是Serializable,这就是为什么第二种情况在转换期间不会立即产生错误,而最后一种情况“成功”。

但我会创建一个对象(也许要广播)用于测试。

你不能。广播与否,反序列化将在工作节点上创建 new 对象。但是您只能在每个分区上创建一个实例:

var filtered = rdd.mapPartitions  iter => 
    val tester = Class.forName(qualifiedName).newInstance()
             .asInstanceOf[Predicate[T]]
    iter.filter(tester.test)

它实际上比序列化tester、发送它和反序列化它的性能更好,因为它的工作量非常少。

【讨论】:

哦,谢谢你的建议!但我得到两个错误:No ClassTag available for T Error occurred in an application involving default arguments.not enough arguments for method mapPartitions: (implicit evidence$6: scala.reflect.ClassTag[T])org.apache.spark.rdd.RDD[T]. Unspecified value parameter evidence$6. Error occurred in an application involving default arguments.我添加了preservePartitions布尔值(为假),但我不明白在问什么(API中没有证据) 一些 Spark 操作需要隐式的 ClassTag 参数。有关ClassTags 的描述,请参阅docs.scala-lang.org/overviews/reflection/…,或仅搜索更多信息。在这里,假设T 是您的方法/类的类型参数,您只需向其添加: ClassTag(即,如果此代码在def foo[T](...) 方法中,则将其替换为def foo[T: ClassTag])。编译和更新任何类似编译错误的调用者。 mmh 什么函数应该是 foo[T]..?我使用 java.util.function.Predicate 中的 test(T) 方法进行过滤,过滤后的元素也是一个 java 类,所以我不知道在哪里放置 classTag 标记 TasInstanceOf[Predicate[T]] 代码中的任何位置声明。

以上是关于Spark Scala动态创建可序列化对象的主要内容,如果未能解决你的问题,请参考以下文章

我们可以编写 Scala/Spark 通用动态编写的代码吗

通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧

spark sql - 如何在 spark sql 中编写动态查询

Spark:scala中数据集的动态过滤器

动态和可配置地更改几种 Spark DataFrame 列类型

Spark:在scala中的数据帧上使用动态过滤器进行聚合