Scala 中的 Spark SQL(v2.0) UDAF 返回空字符串
Posted
技术标签:
【中文标题】Scala 中的 Spark SQL(v2.0) UDAF 返回空字符串【英文标题】:Spark SQL(v2.0) UDAF in Scala returns empty string 【发布时间】:2017-03-27 20:14:24 【问题描述】:当我试图为我们的一个复杂问题创建一个 UDAF 时,我决定从一个基本的 UDAF 开始,它会按原样返回列。由于我是 Spark SQL/Scala 的新手,有人可以帮助我并突出我的错误。
以下是代码:
导入 org.apache.spark.sql.expressions.MutableAggregationBuffer 导入 org.apache.spark.sql.expressions.UserDefinedAggregateFunction 导入 org.apache.spark.sql.Row 导入 org.apache.spark.sql.types._ 导入 org.apache.spark.sql.types.DataTypes
导入 scala.collection._
object MinhashUdaf 扩展 UserDefinedAggregateFunction
覆盖 def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)
覆盖 def bufferSchema: StructType = StructType( StructField("shingles", (StringType)) :: Nil)
覆盖定义数据类型:DataType = (StringType)
覆盖def确定性:布尔=真
覆盖定义初始化(缓冲区:MutableAggregationBuffer):单位= 缓冲区(0)=(“”)
覆盖定义更新(缓冲区:MutableAggregationBuffer,输入:行): 单位 = buffer.update(0, input.toString())
覆盖 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): 单位 =
覆盖 def 评估(缓冲区:行):任何 = 缓冲区(0)
为了运行上面的UDAF,代码如下:
def main(args: Array[String]) val 火花:SparkSession = SparkSession.builder .master("本地[*]") .appName("测试") .getOrCreate();
import spark.implicits._; val df = spark.read.json("people.json") df.createOrReplaceTempView("people") val sqlDF = spark.sql("Select name from people") sqlDF.show() val minhash = df.select(MinhashUdaf(col("name")).as("minhash")) minhash.printSchema() minhash.show(truncate = false)
由于在 UDAF 中我按原样返回输入,因此我应该按原样获取每一行的“名称”列的值。而在运行上述字符串时,我返回一个空字符串。
【问题讨论】:
您执行了哪个操作?什么是预期的输出?实际输出? @Yaron :我已经编辑了我的问题以包含我运行 UDAF 的代码。预期输出:- 列的值。实际输出:- 空字符串 【参考方案1】:你没有实现合并功能。
使用下面的代码,您可以根据需要打印列的值。
object MinhashUdaf extends UserDefinedAggregateFunction
override def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)
override def bufferSchema: StructType = StructType( StructField("shingles", (StringType)) :: Nil)
override def dataType: DataType = (StringType)
override def deterministic: Boolean = true
override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ("")
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = buffer.update(0, input.get(0))
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = buffer1.update(0, buffer2.get(0))
override def evaluate(buffer: Row): Any = buffer(0)
【讨论】:
以上是关于Scala 中的 Spark SQL(v2.0) UDAF 返回空字符串的主要内容,如果未能解决你的问题,请参考以下文章
forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员
如何在 Spark 的 github 中查看 Functions.Scala 中的代码
intellij 中 spark scala 应用程序中的线程“main”java.lang.NoClassDefFoundError:org/apache/spark/sql/catalyst/St