具有替代方法的重载方法 foreachBatch
Posted
技术标签:
【中文标题】具有替代方法的重载方法 foreachBatch【英文标题】:Overloaded method foreachBatch with alternatives 【发布时间】:2020-07-28 15:19:48 【问题描述】:我正在尝试将 json 文件序列化为镶木地板格式。我有这个错误:
错误:(34, 25) 重载方法 foreachBatch 有替代方法: (函数:org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark .sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (函数: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark .sql.Row] 不能应用于 ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame) askDF.writeStream.foreachBatch (askDF: DataFrame, batchId: Long) =>
这是我的代码:
package fr.fdj
import org.apache.spark.sql.DataFrame, SparkSession
import org.apache.spark.sql.types.IntegerType, StringType, StructField, StructType
object serialize
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("serialize")
.getOrCreate()
def main(args : Array[String])
spark.sparkContext.setLogLevel("ERROR")
//schema definition
val mySchema = StructType(Array(
StructField("Species", StringType, true),
StructField("Race", StringType, true),
StructField("Color", StringType, true),
StructField("Age", IntegerType, true)
))
val askDF = spark
.readStream
.format("json")
.option("header", "true")
.schema(mySchema)
.load("/src/main/scala/file.json")
askDF.writeStream.foreachBatch (askDF: DataFrame, batchId: Long) =>
askDF.persist()
askDF.write.parquet("/src/main/scala/file.json")
askDF.unpersist()
.start().awaitTermination()
【问题讨论】:
对于函数 foreachBatch() 有解决方案或替代方案吗? 这里真的需要结构化流吗? 【参考方案1】:我想你使用的是 Scala 2.12。
由于 Scala 2.12 中的一些更改,DataStreamWriter.foreachBatch 方法需要对代码进行一些更新,否则会出现这种歧义。
您可以在此处检查这两种 foreachBatch 方法:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html
我想您可以改用 scala 2.11,或者查看已解决问题的链接:https://docs.databricks.com/release-notes/runtime/7.0.html
在你的代码中,你可以试试这个:
def myFunc( askDF:DataFrame, batchID:Long ) : Unit =
askDF.persist()
askDF.write.parquet("/src/main/scala/file.json")
askDF.unpersist()
askDF.writeStream.foreachBatch(myFunc _).start().awaitTermination()
【讨论】:
我在从 Spark 2.4.5、Scala 2.11 迁移到 Spark 3.0.1、Scala 2.12 时遇到了这个问题。将我的.foreachBatch...
中的所有内容移到它自己的方法中 100% 解决了我的问题。谢谢!您能否详细说明“由于 Scala 2.12 的一些变化”?
发现这是一个可能的解释:issues.apache.org/jira/browse/…【参考方案2】:
通过在末尾添加 () 从 foreach
块中的函数返回单元。在内联函数的最后添加 () 会改变方法签名,从而解决函数中的歧义
askDF.writeStream.foreachBatch (askDF: DataFrame, batchId: Long) =>
askDF.persist()
askDF.write.parquet("/src/main/scala/file.json")
askDF.unpersist()
()
.start().awaitTermination()
【讨论】:
请在您的回答中提供更多详细信息。正如目前所写的那样,很难理解您的解决方案。【参考方案3】:正如@jeremy 指出的那样,存在 Spark 兼容版本问题。我能找到的最佳解决方案如下所述:
使用具有该类型的函数创建一个变量并显式返回。
val myFunc = (DataFrame, Long) => Unit =
(askDF: DataFrame, batchId: Long) =>
askDF.persist()
askDF.write.parquet("/src/main/scala/file.json")
askDF.unpersist()
在 foreachBatch 方法中使用这个新变量
askDF
.writeStream
.foreachBatch(myFunc)
.start()
.awaitTermination()
【讨论】:
以上是关于具有替代方法的重载方法 foreachBatch的主要内容,如果未能解决你的问题,请参考以下文章
Scala - 处理“方法的多个重载替代方案......定义默认参数”