在 Spark 中处理 Join ON OR

Posted

技术标签:

【中文标题】在 Spark 中处理 Join ON OR【英文标题】:Handling Join ON OR in Spark 【发布时间】:2017-10-16 16:50:07 【问题描述】:

我有一个像这样的数据框:

+---+---+---+---+---+
|AId| A1| A2| A3| A4|
+---+---+---+---+---+
|  1|  *|  a|  b|  c|
|  2|  *|  *|  b|  c|
|  3|  c|  a|  b|  c|
|  4|  *|  *|  *|  c|
|  5|  *|  *|  *|  *|
+---+---+---+---+---+

我想加入:

+---+---+---+---+---+----+
|BId| B1| B2| B3| B4|Code|
+---+---+---+---+---+----+
|  1|  c|  a|  b|  c|  AO|
|  2|  b|  a|  b|  c|  AS|
|  3|  b|  b|  b|  c|  AT|
|  4|  a|  d|  d|  c|  BO|
|  5|  d|  a|  c|  b|  BS|
|  6|  a|  b|  b|  c|  BT|
|  7|  d|  d|  d|  c|  CO|
|  8|  d|  d|  d|  d|  CS|
+---+---+---+---+---+----+

将 ID 与 Rule 匹配。但是,* 是通配符。它会匹配任何东西。在上面的例子中,AId == 1 将匹配 BId 1 和 2,AId == 3 将仅匹配 BId 1,AId == 4 将匹配除 5 和 8 之外的所有,而 AId == 5 将匹配所有 8。

解决这个问题的最佳方法是什么?该查询在 Spark 中似乎很昂贵,而且 Spark 没有内置 OR。替代方案似乎做了一个案例,即 A1-A4 设置标志,然后返回并加入。一个棘手的问题是通配符可以在第一个表的任何列中出现 1-4 次,尽管它们不会出现在第二个表中。

【问题讨论】:

【参考方案1】:

您可以将连接条件表示为:

(A1 = * | (A1 = B1)) AND (A2 = * | (A2 = B2)) AND ... AND (AN = * | (AN = BN))

使用 PySpark 可以生成类似这样的表达式

from pyspark.sql.functions import col
from functools import reduce
from operator import and_

expr = reduce(
    and_, 
    ((col("A".format(i)) == "*") | (col("A".format(i)) == col("B".format(i)))
    for i in range(1, 5)))
Column<b'(((((A1 = *) OR (A1 = B1)) AND ((A2 = *) OR (A2 = B2))) AND ((A3 = *) OR (A3 = B3))) AND ((A4 = *) OR (A4 = B4)))'>

crossJoin一起使用:

a.crossJoin(b).where(expr)

spark.conf.set("spark.sql.crossJoin.enabled", "true")

a.join(b, expr)

不幸的是,由于笛卡尔积,这相当昂贵。对于少量的列(4 可能是边界情况),您可以尝试生成列的幂集并创建优化计划,但显然它不会扩展到更多的列。

【讨论】:

以上是关于在 Spark 中处理 Join ON OR的主要内容,如果未能解决你的问题,请参考以下文章

Hve on Spark left join的hashTable问题

Spark On Zeppelin

ERROR: Timeout on the Spark engine during the broadcast join

Spark on Kubernetes 与 Spark on Yarn 不完全对比分析

Spark join 产生错误的结果

spark 内存溢出处理