SparkSQL 聚合器:类型不匹配错误

Posted

技术标签:

【中文标题】SparkSQL 聚合器:类型不匹配错误【英文标题】:SparkSQL Aggregator: Type Mismatch Error 【发布时间】:2016-06-21 08:42:25 【问题描述】:

我正在使用带有 Spark 2.0 预览版的 Databricks 社区版。 我尝试了以下(简单)代码:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import java.util.Calendar
import spark.implicits._

case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double)
val teams = sc.parallelize(Seq(C1("hash1", "NLC", "Cubs", Java.sql.Date.valueOf("2016-01-23"), 3253.21), C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72))).toDS

object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]]  
  def zero: Seq[C1] = Seq.empty[C1] //Nil
  def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
  def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
  def finish(r: Seq[C1]): Seq[C1] = r

  override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
  override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]

val g_c1 = teams.groupByKey(_.f1).agg[Seq[C1]](C1Agg.toColumn).collect

我收到以下错误消息:

错误:类型不匹配; 找到:org.apache.spark.sql.TypedColumn[C1,Seq[C1]] 必需:org.apache.spark.sql.TypedColumn[C1,Seq[C1]] val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

当我使用时

val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).collect

我明白了:

错误:类型不匹配; 找到:org.apache.spark.sql.TypedColumn[C1,Seq[C1]] 必需:org.apache.spark.sql.TypedColumn[C1,?] val g_c1 = teams.groupByKey(_.f1).aggSeq[C1]

有什么提示吗?

【问题讨论】:

【参考方案1】:

我找到了原因:发生这种情况是因为我在(笔记本的)一个单元格上声明案例类,然后在不同的后续单元格中使用它。

将整个代码放在同一个单元格中可以解决这个问题。 (不幸的是,现在我面临另一个问题MissingRequirementError)

【讨论】:

以上是关于SparkSQL 聚合器:类型不匹配错误的主要内容,如果未能解决你的问题,请参考以下文章

Spark UDF 类型不匹配错误

SparkSQL自定义无类型聚合函数

深入研究Spark SQL的Catalyst优化器(原创翻译)

入门大数据---SparkSQL常用聚合函数

Spark安装部署| 运行模式

如何在 Spark Table 中创建索引?