具有替代方法的重载方法 foreachBatch

Posted

技术标签:

【中文标题】具有替代方法的重载方法 foreachBatch【英文标题】:Overloaded method foreachBatch with alternatives 【发布时间】:2020-07-28 15:19:48 【问题描述】:

我正在尝试将 json 文件序列化为镶木地板格式。我有这个错误:

错误:(34, 25) 重载方法 foreachBatch 有替代方法: (函数:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark .sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (函数: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark .sql.Row] 不能应用于 ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame) askDF.writeStream.foreachBatch (askDF: DataFrame, batchId: Long) =>

这是我的代码:

package fr.fdj
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType


object serialize 

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("serialize")
    .getOrCreate()


  def main(args : Array[String]) 

  spark.sparkContext.setLogLevel("ERROR")

  //schema definition
  val mySchema = StructType(Array(
    StructField("Species", StringType, true),
    StructField("Race", StringType, true),
    StructField("Color", StringType, true),
    StructField("Age", IntegerType, true)
  ))

  val askDF = spark
  .readStream
  .format("json")
  .option("header", "true")
  .schema(mySchema)
  .load("/src/main/scala/file.json")

  askDF.writeStream.foreachBatch  (askDF: DataFrame, batchId: Long) =>
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
  .start().awaitTermination()


  

【问题讨论】:

对于函数 foreachBatch() 有解决方案或替代方案吗? 这里真的需要结构化流吗? 【参考方案1】:

我想你使用的是 Scala 2.12。

由于 Scala 2.12 中的一些更改,DataStreamWriter.foreachBatch 方法需要对代码进行一些更新,否则会出现这种歧义。

您可以在此处检查这两种 foreachBatch 方法:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html

我想您可以改用 scala 2.11,或者查看已解决问题的链接:https://docs.databricks.com/release-notes/runtime/7.0.html

在你的代码中,你可以试试这个:

def myFunc( askDF:DataFrame, batchID:Long ) : Unit = 
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()

askDF.writeStream.foreachBatch(myFunc _).start().awaitTermination()

【讨论】:

我在从 Spark 2.4.5、Scala 2.11 迁移到 Spark 3.0.1、Scala 2.12 时遇到了这个问题。将我的 .foreachBatch... 中的所有内容移到它自己的方法中 100% 解决了我的问题。谢谢!您能否详细说明“由于 Scala 2.12 的一些变化”? 发现这是一个可能的解释:issues.apache.org/jira/browse/…【参考方案2】:

通过在末尾添加 () 从 foreach 块中的函数返回单元。在内联函数的最后添加 () 会改变方法签名,从而解决函数中的歧义

 askDF.writeStream.foreachBatch  (askDF: DataFrame, batchId: Long) =>
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
    ()
  .start().awaitTermination()

【讨论】:

请在您的回答中提供更多详细信息。正如目前所写的那样,很难理解您的解决方案。【参考方案3】:

正如@jeremy 指出的那样,存在 Spark 兼容版本问题。我能找到的最佳解决方案如下所述:

使用具有该类型的函数创建一个变量并显式返回。

val myFunc = (DataFrame, Long) => Unit = 
  (askDF: DataFrame, batchId: Long) => 
    askDF.persist()
    askDF.write.parquet("/src/main/scala/file.json")
    askDF.unpersist()
  

在 foreachBatch 方法中使用这个新变量

askDF
 .writeStream
 .foreachBatch(myFunc)
 .start()
 .awaitTermination()

【讨论】:

以上是关于具有替代方法的重载方法 foreachBatch的主要内容,如果未能解决你的问题,请参考以下文章

带有替代方法的重载方法值选择

Scala:方法的多个重载替代方案

Scala - 处理“方法的多个重载替代方案......定义默认参数”

如何使用替代方法解决重载方法值寄存器,UDF Spark scala

错误重载方法值 regexp_replace 与替代品

PHP 是不是有“方法重载”的规定,就像 java 和其他一些编程语言一样?如果没有,是不是有替代方案? [复制]