Scala 中的 Spark SQL(v2.0) UDAF 返回空字符串

Posted

技术标签:

【中文标题】Scala 中的 Spark SQL(v2.0) UDAF 返回空字符串【英文标题】:Spark SQL(v2.0) UDAF in Scala returns empty string 【发布时间】:2017-03-27 20:14:24 【问题描述】:

当我试图为我们的一个复杂问题创建一个 UDAF 时,我决定从一个基本的 UDAF 开始,它会按原样返回列。由于我是 Spark SQL/Scala 的新手,有人可以帮助我并突出我的错误。

以下是代码:

导入 org.apache.spark.sql.expressions.MutableAggregationBuffer 导入 org.apache.spark.sql.expressions.UserDefinedAggregateFunction 导入 org.apache.spark.sql.Row 导入 o​​rg.apache.spark.sql.types._ 导入 org.apache.spark.sql.types.DataTypes

导入 scala.collection._

object MinhashUdaf 扩展 UserDefinedAggregateFunction

覆盖 def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)

覆盖 def bufferSchema: StructType = StructType( StructField("shingles", (StringType)) :: Nil)

覆盖定义数据类型:DataType = (StringType)

覆盖def确定性:布尔=真

覆盖定义初始化(缓冲区:MutableAggregationBuffer):单位= 缓冲区(0)=(“”)

覆盖定义更新(缓冲区:MutableAggregationBuffer,输入:行): 单位 = buffer.update(0, input.toString())

覆盖 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): 单位 =

覆盖 def 评估(缓冲区:行):任何 = 缓冲区(0)

为了运行上面的UDAF,代码如下:

def main(args: Array[String]) val 火花:SparkSession = SparkSession.builder .master("本地[*]") .appName("测试") .getOrCreate();

import spark.implicits._;

val df = spark.read.json("people.json")
df.createOrReplaceTempView("people")
val sqlDF = spark.sql("Select name from people")
sqlDF.show()

val minhash = df.select(MinhashUdaf(col("name")).as("minhash"))
minhash.printSchema()
minhash.show(truncate = false)

由于在 UDAF 中我按原样返回输入,因此我应该按原样获取每一行的“名称”列的值。而在运行上述字符串时,我返回一个空字符串。

【问题讨论】:

您执行了哪个操作?什么是预期的输出?实际输出? @Yaron :我已经编辑了我的问题以包含我运行 UDAF 的代码。预期输出:- 列的值。实际输出:- 空字符串 【参考方案1】:

你没有实现合并功能。

使用下面的代码,您可以根据需要打印列的值。

object MinhashUdaf extends UserDefinedAggregateFunction 

override def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)

override def bufferSchema: StructType = StructType( StructField("shingles", (StringType)) :: Nil)

override def dataType: DataType = (StringType)

override def deterministic: Boolean = true

override def initialize(buffer: MutableAggregationBuffer): Unit =  buffer(0) = ("") 

override def update(buffer: MutableAggregationBuffer, input: Row): Unit =  buffer.update(0, input.get(0)) 

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit =   buffer1.update(0, buffer2.get(0))

override def evaluate(buffer: Row): Any =  buffer(0)  

【讨论】:

以上是关于Scala 中的 Spark SQL(v2.0) UDAF 返回空字符串的主要内容,如果未能解决你的问题,请参考以下文章

spark-sql/Scala 中的反透视列名是数字

forEach Spark Scala 中的错误:值选择不是 org.apache.spark.sql.Row 的成员

如何在 Spark 的 github 中查看 Functions.Scala 中的代码

intellij 中 spark scala 应用程序中的线程“main”java.lang.NoClassDefFoundError:org/apache/spark/sql/catalyst/St

分别用SQL和Spark(Scala)解决50道SQL题

Scala Spark 中的 udf 运行时错误