Spark如何仅在分区内加入

Posted

技术标签:

【中文标题】Spark如何仅在分区内加入【英文标题】:Spark How to Join Only Within Partitions 【发布时间】:2020-10-02 19:10:08 【问题描述】:

我有 2 个大数据框。每行都有纬度/经度数据。我的目标是在 2 个数据帧之间进行连接,并找到距离内的所有点,例如100米。

df1: (id, lat, lon, geohash7)
df2: (id, lat, lon, geohash7)

我想在 geohash7 上对 df1 和 df2 进行分区,然后只在分区内加入。我想避免在分区之间加入以减少计算量。

df1 = df1.repartition(200, "geohash7")
df2 = df2.repartition(200, "geohash7")

df_merged = df1.join(df2, (df1("geohash7")===df2("geohash7")) & (dist(df1("lat"),df1("lon"),df2("lat"),df2("lon"))<100) )

所以基本上加入geohash7,然后确保点之间的距离小于100。 问题是,Spark 实际上会交叉连接所有数据。我怎样才能让它只做 inter-partition join 而不是 intra-partition join?

【问题讨论】:

你检查.explain()的计划了吗?我不明白代码会做cross join dist 在做什么?我是正确的,它是计算两点之间的欧几里德距离的 udf?这将explain 交叉连接 dist 计算两点之间的半正弦距离。是的,火花似乎不支持我想要的东西。经过一番尝试,我认为 spark 没有进行交叉连接,因为 geohash-7 条件在 join 语句中。如果我删除 geohash-7 匹配条件,查询运行速度会慢很多。 【参考方案1】:

在大量使用数据之后,spark 似乎足够聪明,可以首先确保在相等条件(“geohash7”)上发生连接。因此,如果那里没有匹配,它将不会计算“dist”函数。 似乎在相等条件下,它不再进行交叉连接。所以我不必做任何其他事情。上面的连接工作正常。

【讨论】:

以上是关于Spark如何仅在分区内加入的主要内容,如果未能解决你的问题,请参考以下文章

如何对数据进行物理分区以避免 Spark SQL 连接中的洗牌

Spark JDBC 读取仅在一个分区中结束

如何使用合并更改分区数?

Spark:加入时设置最大分区大小

共同分区的 RDD 的加入是不是会导致 Apache Spark 中的洗牌?

如何在 Spark 中使用 Python 查找 DataFrame 中的分区数以及如何在 Spark 中使用 Python 在 DataFrame 中创建分区