Scala 编译器无法在 Spark lambda 函数中推断类型

Posted

技术标签:

【中文标题】Scala 编译器无法在 Spark lambda 函数中推断类型【英文标题】:Scala compiler failed to infer type inside Spark lambda function 【发布时间】:2020-07-10 23:50:43 【问题描述】:

假设我有这个用 Scala 2.12 编写的 Spark 代码

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( partition => partition.foreach 
      entry: String => println(entry)
    )

当我运行代码时,编译器给出了这个错误


[info] Compiling 1 Scala source to <path>/scala-2.12/classes ...
[error] Code.scala:11:52: value foreach is not a member of Object
[error]     empty.foreachPartition( partition => partition.foreach
[error]                                                    ^
[error] one error found
[error] (Compile / compileIncremental) Compilation failed
[error] Total time: 1 s, completed Jul 11, 2020 1:43:41 AM


为什么编译器partitionObject 而不是Iterator[String]

我必须手动添加 partition 类型才能使代码正常工作。

    val dataset = spark.emptyDataset[String]

    dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach 
      entry: String => println(entry)
    )

【问题讨论】:

这是由于java的重载导致的问题。你必须明确类型,没有解决方法,我怀疑 Spark 维护者会做任何事情来改进 Scala API,他们只关心 Python Java 之一。 @LuisMiguelMejíaSuárez 考虑到 Scala 应该是 Spark 的一等公民,这很奇怪。 多年前确实如此。而且你不能怪他们,只是简单的营销 Scala 用户的百分比是少数。 【参考方案1】:

这是因为 foreachPartition 和 Java-Scala 互操作的两个重载版本。

如果代码仅在 Scala 中(这是最少的代码并且独立于 Spark)

val dataset: Dataset[String] = ???

dataset.foreachPartition(partition => ???)

class Dataset[T] 
  def foreachPartition(f: Iterator[T] => Unit): Unit = ???
  def foreachPartition(func: ForeachPartitionFunction[T]): Unit = ???


trait ForeachPartitionFunction[T] extends Serializable 
  def call(t: Iterator[T]): Unit

然后将推断partition 的类型(如scala.collection.Iterator[String])。

但在实际的 Spark 代码中,ForeachPartitionFunction 是 Java 接口,其方法 call 接受 java.util.Iterator[String]

所以两种选择

dataset.foreachPartition((
  (partition: scala.collection.Iterator[String]) => ??? 
): Iterator[String] => Unit)

dataset.foreachPartition((
  (partition: java.util.Iterator[String]) => ??? 
): ForeachPartitionFunction[String])

符合条件且编译器无法推断 partition 的类型。

Scala 中的推理是本地的,因此在编译器可以看到 partition =&gt; partition.foreach...(并且 java.util.Iterator[String] 没有方法 foreach)之后,再输入 partition 为时已晚。

【讨论】:

【参考方案2】:

就像@Dmytro 所说,scala 编译器无法推断它应该应用哪个重载函数。但是,您可以使用此辅助函数来使用一个简单的解决方法:

  def helper[I](f: I => Unit): I => Unit = f

现在你需要做的就是:

dataset.foreachPartition( (partition:Iterator[String]) => partition.foreach 
      helper[String](entry => println(entry))
    )

【讨论】:

以上是关于Scala 编译器无法在 Spark lambda 函数中推断类型的主要内容,如果未能解决你的问题,请参考以下文章

Lambda表达式用法大比较: Scala和Java 8

Lambda表达式用法大比较: Scala和Java 8

Spark平台下,scala比java更有优势么

Spark2.1.0编译

无法在 Spark-2.2.0 - Scala-2.11.8 上运行单元测试(scalatest)

Scala里跟Java的ArrayList,有add方法的类是哪个