为啥 Spark 认为这是一个交叉/笛卡尔连接
Posted
技术标签:
【中文标题】为啥 Spark 认为这是一个交叉/笛卡尔连接【英文标题】:Why does Spark think this is a cross / Cartesian join为什么 Spark 认为这是一个交叉/笛卡尔连接 【发布时间】:2017-02-27 02:51:19 【问题描述】:我想加入两次数据如下:
rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])
res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()
然后我得到一些错误:
pyspark.sql.utils.AnalysisException: u'Cartesian joins could be 非常昂贵,默认情况下被禁用。要显式启用它们,请设置 spark.sql.crossJoin.enabled = true;'
但我认为这不是交叉连接
更新:
res2.explain()
== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
: :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
: : +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
: : +- *Filter isnotnull(idx#0L)
: : +- Scan ExistingRDD[idx#0L,val#1]
: +- *Sort [key1#5L ASC, key2#6L ASC], false, 0
: +- Exchange hashpartitioning(key1#5L, key2#6L, 200)
: +- *Filter ((isnotnull(key2#6L) && (key2#6L = key1#5L)) && isnotnull(key1#5L))
: +- Scan ExistingRDD[key1#5L,key2#6L,val#7L]
+- Scan ExistingRDD[idx#40L,val#41]
【问题讨论】:
【参考方案1】:发生这种情况是因为你们join
结构共享相同的血统,这导致了一个微不足道的平等条件:
res2.explain()
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Join Inner, ((idx#204L = key1#209L) && (key2#210L = idx#204L))
:- Filter isnotnull(idx#204L)
: +- LogicalRDD [idx#204L, val#205]
+- Filter ((isnotnull(key2#210L) && (key2#210L = key1#209L)) && isnotnull(key1#209L))
+- LogicalRDD [key1#209L, key2#210L, val#211L]
and
LogicalRDD [idx#235L, val#236]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
在这种情况下,您应该使用别名:
from pyspark.sql.functions import col
rdd1 = spark.createDataFrame(...).alias('rdd1')
rdd2 = spark.createDataFrame(...).alias('rdd2')
res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).alias('res1')
res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx')).explain()
== Physical Plan ==
*SortMergeJoin [key2#297L], [idx#360L], Inner
:- *Sort [key2#297L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key2#297L, 200)
: +- *SortMergeJoin [idx#290L], [key1#296L], Inner
: :- *Sort [idx#290L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(idx#290L, 200)
: : +- *Filter isnotnull(idx#290L)
: : +- Scan ExistingRDD[idx#290L,val#291]
: +- *Sort [key1#296L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key1#296L, 200)
: +- *Filter (isnotnull(key2#297L) && isnotnull(key1#296L))
: +- Scan ExistingRDD[key1#296L,key2#297L,val#298L]
+- *Sort [idx#360L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(idx#360L, 200)
+- *Filter isnotnull(idx#360L)
+- Scan ExistingRDD[idx#360L,val#361]
详情见SPARK-6459。
【讨论】:
@user6910411 当您的意思是“相同的血统”时,我认为这是 spark dataframe 的惰性评估和查询计划器的问题。从 sql 的角度来看,OP 查询不是笛卡尔积,对吧? @nir 你可以这么说。简而言之,如果您有df1
和df2
都派生自df
,并且所有三个共享col
则df1.col op df2.col
可能被解析为微不足道的真或假,即使它在技术上(根据实际的解析规则)是't。
有道理。我发现这个问题可以通过使用实际的 sql 并通过sparkSession.sql("your sql")
执行而不是基于数据帧的 dsl 来完全避免。【参考方案2】:
在第二次加入之前持久化数据帧时我也成功了。
类似:
res1 = rdd1.join(rdd2, col('rdd1.idx') == col('rdd2.key1')).persist()
res1.join(rdd1, on=col('res1.key2') == col('rdd1.idx'))
【讨论】:
@GergeSzekely,这对我也有用,但我不知道为什么。有什么区别?【参考方案3】:坚持对我不起作用。
我用 DataFrames 上的别名克服了它
from pyspark.sql.functions import col
df1.alias("buildings").join(df2.alias("managers"), col("managers.distinguishedName") == col("buildings.manager"))
【讨论】:
以上是关于为啥 Spark 认为这是一个交叉/笛卡尔连接的主要内容,如果未能解决你的问题,请参考以下文章
R语言merge函数交叉连接dataframe数据(Cross joinCartesian join笛卡尔交叉连接)merge函数进行交叉连接必须将参数by设置为NULL(by = NULL)