如何在 Spark 中实现“交叉连接”?

Posted

技术标签:

【中文标题】如何在 Spark 中实现“交叉连接”?【英文标题】:How to implement "Cross Join" in Spark? 【发布时间】:2014-09-11 13:38:22 【问题描述】:

我们计划将 Apache Pig 代码迁移到新的 Spark 平台。

Pig 具有“Bag/Tuple/Field”概念,其行为类似于关系数据库。 Pig 提供对 CROSS/INNER/OUTER 连接的支持。

对于 CROSS JOIN,我们可以使用alias = CROSS alias, alias [, alias …] [PARTITION BY partitioner] [PARALLEL n];

但是当我们迁移到 Spark 平台时,我在 Spark API 中找不到任何对应物。你有什么想法吗?

【问题讨论】:

它还没有准备好,但目前正在构建 spork(pig on spark),所以你可能不需要更改任何代码 【参考方案1】:

oneRDD.cartesian(anotherRDD)

【讨论】:

谢谢,cartesian join是cross join的昵称【参考方案2】:

这是 Spark 2.x 数据集和数据帧的推荐版本:

scala> val ds1 = spark.range(10)
ds1: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds1.cache.count
res1: Long = 10

scala> val ds2 = spark.range(10)
ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> ds2.cache.count
res2: Long = 10

scala> val crossDS1DS2 = ds1.crossJoin(ds2)
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint]

scala> crossDS1DS2.count
res3: Long = 100

或者,也可以使用没有连接条件的传统 JOIN 语法。使用此配置选项可避免以下错误。

spark.conf.set("spark.sql.crossJoin.enabled", true)

省略该配置时出错(具体使用“join”语法):

scala> val crossDS1DS2 = ds1.join(ds2)
crossDS1DS2: org.apache.spark.sql.DataFrame = [id: bigint, id: bigint]

scala> crossDS1DS2.count
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
...
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

相关:spark.sql.crossJoin.enabled for Spark 2.x

【讨论】:

当你做你的数据集加入数据集时,你的结果是一个数据帧,但我希望它是另一个数据集......为什么不改用 joinWith 呢? 好眼丹!该示例旨在纯粹说明交叉连接语义,因此使用 joinWith 获取数据集并不是最重要的。我会更新答案,但是您的问题打开了关于 crossJoin 方法返回 DF 而不是 DS 的另一条询问线,如果用户希望维护他们的 DS,则可以使用 joinWith 和配置选项,嗯 在我看来使用 joinWith 并进行交叉连接,你必须使用两个相互矛盾的语句来联合到你的整个数据集,我想这是为了确保你想要做一个交叉加入

以上是关于如何在 Spark 中实现“交叉连接”?的主要内容,如果未能解决你的问题,请参考以下文章

如何在R中进行交叉连接?

如何在 Apache Spark 中实现递归算法?

如何在 spark scala 中实现 uniqueConcatenate、uniqueCount [关闭]

如何在 Spark SQL(PySpark) 中实现自增

如何在 Spark UDAF 中实现 fastutils 映射?

Scala - 如何在 Spark 的 map 函数中实现 Try