在 PySpark 中的文字列上检测到 INNER 连接的笛卡尔积

Posted

技术标签:

【中文标题】在 PySpark 中的文字列上检测到 INNER 连接的笛卡尔积【英文标题】:Detected cartesian product for INNER join on literal column in PySpark 【发布时间】:2018-11-23 15:12:45 【问题描述】:

以下代码引发“检测到内部连接的笛卡尔积”异常:

first_df = spark.createDataFrame(["first_id": "1", "first_id": "1", "first_id": "1", ])
second_df = spark.createDataFrame(["some_value": "????", ])

second_df = second_df.withColumn("second_id", F.lit("1"))

# If the next line is uncommented, then the JOIN is working fine.
# second_df.persist()

result_df = first_df.join(second_df,
                          first_df.first_id == second_df.second_id,
                          'inner')
data = result_df.collect()

result_df.explain()

并告诉我逻辑计划如下所示:

Filter (first_id#0 = 1)
+- LogicalRDD [first_id#0], false
and
Project [some_value#2, 1 AS second_id#4]
+- LogicalRDD [some_value#2], false
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

当 RuleExecutor 应用名为 CheckCartesianProducts 的优化规则集时,这些逻辑计划的 JOIN 条件中似乎不存在列是有原因的(请参阅https://github.com/apache/spark/blob/v2.3.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1114)。

但是,如果我在 JOIN 之前使用“persist”方法,它会起作用,并且物理计划是:

*(3) SortMergeJoin [first_id#0], [second_id#4], Inner
:- *(1) Sort [first_id#0 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(first_id#0, 10)
:     +- Scan ExistingRDD[first_id#0]
+- *(2) Sort [second_id#4 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#4, 10)
      +- InMemoryTableScan [some_value#2, second_id#4]
            +- InMemoryRelation [some_value#2, second_id#4], true, 10000, StorageLevel(disk, memory, 1 replicas)
                  +- *(1) Project [some_value#2, 1 AS second_id#4]
                     +- Scan ExistingRDD[some_value#2]

所以,可能有人可以解释导致这种结果的内部原因,因为持久化数据框看起来不是解决方案。

【问题讨论】:

【参考方案1】:

问题是,一旦你持久化你的数据,second_id 就会被合并到缓存表中,不再被认为是常量。因此规划器无法再推断查询应该表示为笛卡尔积,并在散列分区 second_id 上使用标准 SortMergeJoin

使用udf实现相同的结果是微不足道的,没有持久性

from pyspark.sql.functions import lit, pandas_udf, PandasUDFType

@pandas_udf('integer', PandasUDFType.SCALAR)                   
def identity(x):                                                   
    return x    

second_df = second_df.withColumn('second_id', identity(lit(1)))

result_df = first_df.join(second_df,
                         first_df.first_id == second_df.second_id,
                         'inner')

result_df.explain()

== Physical Plan ==
*(6) SortMergeJoin [cast(first_id#4 as int)], [second_id#129], Inner
:- *(2) Sort [cast(first_id#4 as int) ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(cast(first_id#4 as int), 200)
:     +- *(1) Filter isnotnull(first_id#4)
:        +- Scan ExistingRDD[first_id#4]
+- *(5) Sort [second_id#129 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(second_id#129, 200)
      +- *(4) Project [some_value#6, pythonUDF0#154 AS second_id#129]
         +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#154]
            +- *(3) Project [some_value#6]
               +- *(3) Filter isnotnull(pythonUDF0#153)
                  +- ArrowEvalPython [identity(1)], [some_value#6, pythonUDF0#153]
                     +- Scan ExistingRDD[some_value#6]

但是SortMergeJoin不是你应该在这里尝试实现。使用常量键,它会导致极端的数据倾斜,并且可能会在除了玩具数据之外的任何东西上失败。

然而,笛卡尔积虽然很贵,但不会受到这个问题的影响,在这里应该首选。因此建议启用交叉连接或使用显式交叉连接语法 (spark.sql.crossJoin.enabled for Spark 2.x) 并继续。

一个悬而未决的问题仍然是如何在缓存数据时防止出现不良行为。不幸的是,我没有为此准备好答案。我相当确定可以使用自定义优化器规则,但这不是单独使用 Python 可以完成的。

【讨论】:

以上是关于在 PySpark 中的文字列上检测到 INNER 连接的笛卡尔积的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 数据框中的组中的列上应用函数

在 pyspark 中的特定列上应用过滤器描述

在 Pyspark 中的多个列上使用相同的函数重复调用 withColumn()

基于另一列中的值的一列上的pyspark滞后函数

在 Pyspark ML 中的稀疏向量数据类型列上创建 Python 转换器

pyspark:groupby 和聚合 avg 和 first 在多个列上