“无法找到存储在数据集中的类型的编码器”,即使 spark.implicits._ 被导入?

Posted

技术标签:

【中文标题】“无法找到存储在数据集中的类型的编码器”,即使 spark.implicits._ 被导入?【英文标题】:"Unable to find encoder for type stored in a Dataset" even spark.implicits._ is imported? 【发布时间】:2017-07-17 17:57:32 【问题描述】: 错误:(39, 12) 无法找到存储在数据集中的类型的编码器。通过导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(案例类)。未来版本中将添加对序列化其他类型的支持。 dbo.map((r) => ods.map((s) => 错误:(39、12)方法映射的参数不足:(隐含证据$6:org.apache.spark.sql.Encoder[org.apache.spark.sql.Dataset[Int]])org.apache.spark.sql .Dataset[org.apache.spark.sql.Dataset[Int]]。 未指定值参数证据$6。 dbo.map((r) => ods.map((s) =>
object Main extends App 
  ....

  def compare(sqlContext: org.apache.spark.sql.SQLContext, 
            dbo: Dataset[Cols], ods: Dataset[Cols]) = 
    import sqlContext.implicits._ // Tried import dbo.sparkSession.implicits._ too
    dbo.map((r) => ods.map((s) =>  // Errors occur here
      0
    ))


case class Cols (A: Int,
                   B: String,
                   C: String,
                   D: String,
                   E: Double,
                   F: Date,
                   G: String,
                   H: String,
                   I: Double,
                   J: String
                  )
    为什么我导入sqlContext.implicits._后还是报错? 我创建了参数sqlContext 仅用于导入。有没有更好的方法不传递参数?

    这应该由import dbo.sparkSession.implicits._解决

【问题讨论】:

我试过import dbo.sparkSession.implicits._ 还是出现同样的错误? 再看一遍我想我明白了问题所在,错误消息说应该是spark.implicits._来导入编码器。另一个问题是您试图访问地图中的另一个数据集,这是不可能的。您将需要以某种方式组合您的数据集(通常使用连接) 基本上,我需要为两个数据集的所有组合中的每一个获取一个值,并获得大于阈值的值。所以我需要连接两个数据集,然后对连接的数据集运行计算。对?计算是否会分发给所有执行者? 是的,您需要加入,然后对结果数据集进行计算。因为您想要所有可能的组合,这将是一个笛卡尔。请注意,这将非常慢,因为会生成许多记录(如果您真的想将所有内容与所有内容匹配,这是不可避免的,如果您只需要一个数据集中的数据进行阈值过滤,我会在加入之前进行过滤)。 是的,我需要对笛卡尔结果进行计算,只返回满足某些条件的结果。这就是我将 SQL 服务器代码转换为 Spark 的原因,以便计算可以分发给许多执行程序(并在内存中执行)以加快计算速度。 【参考方案1】:

您的代码正在尝试创建一个 Dataset[Dataset[Int]],这有几个原因是错误的

你不能在一个数据集中使用数据集,如果你想从两个数据集中交叉数据,你需要以某种方式加入它们

没有办法创建 Encoder[Dataset[Int]],你可以有 Encoder[Int] 但另一件事没有意义

这样的事情更有意义

import org.apache.spark.sql.functions => func

dbo.joinWith(ods, func.expr("true")).map 
  case (r, s) =>
    0

【讨论】:

谢谢,我实际上需要在两个数据集之间进行笛卡尔连接。我想将加入分配给每个执行者。最好的方法是什么?我应该先创建一个array[DataSet[Cols]] 吗? 我已经编辑了答案以制作笛卡尔积 .joinWith(..., func.expr("true")) 是否与.crossJoin() 相同? joinWith 是 Dataset api 的一部分,resutl 是另一个 Dataset[(Cols, Cols)] 而 crossJoin 将生成一个包含每个数据集中所有字段的行的 Dataframe,这两件事都会生成不同格式的相同数据 你也可以使用func.lit(true)代替func.expr("true")

以上是关于“无法找到存储在数据集中的类型的编码器”,即使 spark.implicits._ 被导入?的主要内容,如果未能解决你的问题,请参考以下文章

为啥使用案例类在 DataFrame 上映射失败并显示“无法找到存储在数据集中的类型的编码器”?

即使注册成功,isRegisteredForRemoteNotifications 也会返回 false

即使数据不在 s-s-rS 报告中的表格中,我们能否获得与空白相同数量的输出?

即使将其加载到内存中,也无法从 kernel.s 打印

如何使用 SQL 代码选择不按顺序排列的最新 ID(即使我已对其进行排序)

即使在 if (process) 块中使用,也未定义未捕获的进程? [复制]