类型安全连接与 Spark 数据集的安全性比我预期的要低
Posted
技术标签:
【中文标题】类型安全连接与 Spark 数据集的安全性比我预期的要低【英文标题】:Typesafe Joins with Spark Datasets Less Safe Than I'd Expect 【发布时间】:2018-08-21 14:30:27 【问题描述】:在尝试将 solution 到 Perform a typed join in Scala with Spark Datasets 作为隐式提供时,我遇到了一些我不明白的事情。
在下面的测试中,innerJoin
的签名是def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)])
,但我用f: Foo => String
和g: Bar => Int
来称呼它。我希望在编译时出现错误,但它编译得很好。这是为什么呢?
实际发生的情况是它编译得很好,当 Spark 尝试创建产品编码器时,测试失败并显示 java.lang.ClassNotFoundException: scala.Any
(我认为,对于生成的 ((K, Foo),(K, Bar))
元组)。我假设Any
显示为Int
和String
的共同“父级”。
import org.apache.spark.sql.Dataset, Encoder, SparkSession
import org.scalatest.Matchers
import org.scalatest.testng.TestNGSuite
import org.testng.annotations.Test
case class Foo(a: String)
case class Bar(b: Int)
class JoinTest extends TestNGSuite with Matchers
import JoinTest._
@Test
def testJoin(): Unit =
val spark = SparkSession.builder()
.master("local")
.appName("test").getOrCreate()
import spark.implicits._
val ds1 = spark.createDataset(Seq(Foo("a")))
val ds2 = spark.createDataset(Seq(Bar(123)))
val jd = ds1.innerJoin(ds2)(_.a, _.b)
jd.count shouldBe 0
object JoinTest
implicit class Joins[T](ds1: Dataset[T])
def innerJoin[U, K](ds2: Dataset[U])(f: T => K, g: U => K)
(implicit e1: Encoder[(K, T)], e2: Encoder[(K, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] =
val ds1_ = ds1.map(x => (f(x), x))
val ds2_ = ds2.map(x => (g(x), x))
ds1_.joinWith(ds2_, ds1_("_1") === ds2_("_1")).map(x => (x._1._2, x._2._2))
【问题讨论】:
K
实际上被推断为Any
。
这听起来像是一个依赖问题。你能包括构建定义吗?
【参考方案1】:
您是正确的,Any
被推断为String
和Int
的共同父级,因此用作K
。 Function
在输出类型中是协变的。所以Foo => String
是Foo => Any
的有效子类。
解决这种问题的常用方法是使用两个类型参数和一个隐式=:=
。例如:
def innerJoin[U, K1, K2](ds2: Dataset[U])(f: T => K1, g: U => K2)
(implicit eq: K1 =:= K2, e1: Encoder[(K2, T)], e2: Encoder[(K2, U)], e3: Encoder[(T, U)]): Dataset[(T, U)] =
val ds1_ = ds1.map(x => (eq(f(x)), x))
... rest the same as before ...
【讨论】:
啊哈!我一直在看=:=
,但没想到要添加另一个类型参数。以上是关于类型安全连接与 Spark 数据集的安全性比我预期的要低的主要内容,如果未能解决你的问题,请参考以下文章
当预期类型可能不同时,如何安全地处理来自 JSON 的数据?
Hadoop对Spark:正面比拼报告(架构性能成本安全性和机器学习)