类型安全连接与 Spark 数据集的安全性比我预期的要低

Posted

技术标签:

【中文标题】类型安全连接与 Spark 数据集的安全性比我预期的要低【英文标题】:Typesafe Joins with Spark Datasets Less Safe Than I'd Expect 【发布时间】:2018-08-21 14:30:27 【问题描述】:

在尝试将 solution 到 Perform a typed join in Scala with Spark Datasets 作为隐式提供时,我遇到了一些我不明白的事情。

在下面的测试中,innerJoin 的签名是def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]),但我用f: Foo => Stringg: Bar => Int 来称呼它。我希望在编译时出现错误,但它编译得很好。这是为什么呢?

实际发生的情况是它编译得很好,当 Spark 尝试创建产品编码器时,测试失败并显示 java.lang.ClassNotFoundException: scala.Any(我认为,对于生成的 ((K, Foo),(K, Bar)) 元组)。我假设Any 显示为IntString 的共同“父级”。

import org.apache.spark.sql.Dataset, Encoder, SparkSession
import org.scalatest.Matchers
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.Test

case class Foo(a: String)

case class Bar(b: Int)

class JoinTest extends TestNGSuite with Matchers 
  import JoinTest._

  @Test
  def testJoin(): Unit = 
    val spark = SparkSession.builder()
      .master("local")
      .appName("test").getOrCreate()

    import spark.implicits._

    val ds1 = spark.createDataset(Seq(Foo("a")))
    val ds2 = spark.createDataset(Seq(Bar(123)))

    val jd = ds1.innerJoin(ds2)(_.a, _.b)

    jd.count shouldBe 0
  
 

object JoinTest 
  implicit class Joins[T](ds1: Dataset[T]) 
    def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)
     (implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = 
     
       val ds1_ = ds1.map(x => (f(x), x))
       val ds2_ = ds2.map(x => (g(x), x))
       ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
    
   

【问题讨论】:

K 实际上被推断为Any 这听起来像是一个依赖问题。你能包括构建定义吗? 【参考方案1】:

您是正确的,Any 被推断为StringInt 的共同父级,因此用作KFunction 在输出类型中是协变的。所以Foo => StringFoo => Any 的有效子类。

解决这种问题的常用方法是使用两个类型参数和一个隐式=:=。例如:

def innerJoin[U, K1, K2](ds2: Dataset[U])(f: T => K1, g: U => K2)
  (implicit eq: K1 =:= K2, e1: Encoder[(K2, T)], e2: Encoder[(K2, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] = 
  
    val ds1_ = ds1.map(x => (eq(f(x)), x))
    ... rest the same as before ...

【讨论】:

啊哈!我一直在看=:=,但没想到要添加另一个类型参数。

以上是关于类型安全连接与 Spark 数据集的安全性比我预期的要低的主要内容,如果未能解决你的问题,请参考以下文章

RDD 与Spark 生产代码的数据集

当预期类型可能不同时,如何安全地处理来自 JSON 的数据?

Hadoop对Spark:正面比拼报告(架构性能成本安全性和机器学习)

Java Spark:使用未知连接列名称连接的数据集的 Spark 错误解决方法

Spark 2.2 空安全左外连接空指针异常

如何将位于 HDFS 上的类型安全配置文件添加到 spark-submit(集群模式)?