Scala 编译器无法在 Spark lambda 函数中推断类型
Posted
技术标签:
【中文标题】Scala 编译器无法在 Spark lambda 函数中推断类型【英文标题】:Scala compiler failed to infer type inside Spark lambda function 【发布时间】:2020-07-10 23:50:43 【问题描述】:假设我有这个用 Scala 2.12 编写的 Spark 代码
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( partition => partition.foreach
entry: String => println(entry)
)
当我运行代码时,编译器给出了这个错误
[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error] empty.foreachPartition( partition => partition.foreach
[error] ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM
为什么编译器partition
是Object
而不是Iterator[String]
?
我必须手动添加 partition
类型才能使代码正常工作。
val dataset = spark.emptyDataset[String]
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach
entry: String => println(entry)
)
【问题讨论】:
这是由于java的重载导致的问题。你必须明确类型,没有解决方法,我怀疑 Spark 维护者会做任何事情来改进 Scala API,他们只关心 Python 和 Java 之一。 @LuisMiguelMejíaSuárez 考虑到 Scala 应该是 Spark 的一等公民,这很奇怪。 多年前确实如此。而且你不能怪他们,只是简单的营销 Scala 用户的百分比是少数。 【参考方案1】:这是因为 foreachPartition
和 Java-Scala 互操作的两个重载版本。
如果代码仅在 Scala 中(这是最少的代码并且独立于 Spark)
val dataset: Dataset[String] = ???
dataset.foreachPartition(partition => ???)
class Dataset[T]
def foreachPartition(f: Iterator[T] => Unit): Unit = ???
def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???
trait ForeachPartitionFunction[T] extends Serializable
def call(t: Iterator[T]): Unit
然后将推断partition
的类型(如scala.collection.Iterator[String]
)。
但在实际的 Spark 代码中,ForeachPartitionFunction
是 Java 接口,其方法 call
接受 java.util.Iterator[String]
。
所以两种选择
dataset.foreachPartition((
(partition: scala.collection.Iterator[String]) => ???
): Iterator[String] => Unit)
dataset.foreachPartition((
(partition: java.util.Iterator[String]) => ???
): ForeachPartitionFunction[String])
符合条件且编译器无法推断 partition
的类型。
Scala 中的推理是本地的,因此在编译器可以看到 partition => partition.foreach...
(并且 java.util.Iterator[String]
没有方法 foreach
)之后,再输入 partition
为时已晚。
【讨论】:
【参考方案2】:就像@Dmytro 所说,scala 编译器无法推断它应该应用哪个重载函数。但是,您可以使用此辅助函数来使用一个简单的解决方法:
def helper[I](f: I => Unit): I => Unit = f
现在你需要做的就是:
dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach
helper[String](entry => println(entry))
)
【讨论】:
以上是关于Scala 编译器无法在 Spark lambda 函数中推断类型的主要内容,如果未能解决你的问题,请参考以下文章