更新表时如何改进 Spark 中的 SQL 查询? (子查询中的'NOT IN')

Posted

技术标签:

【中文标题】更新表时如何改进 Spark 中的 SQL 查询? (子查询中的\'NOT IN\')【英文标题】:How to improve SQL query in Spark when updating table? ('NOT IN' in subquery)更新表时如何改进 Spark 中的 SQL 查询? (子查询中的'NOT IN') 【发布时间】:2020-01-21 17:33:47 【问题描述】:

我在 Spark 中有一个 Dataframe,它注册为一个名为 A 的表,并且有 1 billion 记录和 10 列。第一列 (ID) 是主键。 还有另一个数据框,它注册为一个名为 B 的表,并且有 10,000 记录和 10 列(与表 A 相同的列,第一列 (ID) 是主键)。

表 B 中的记录是“更新记录”。所以我需要用表 B 中的记录更新表 A 中的所有 10,000 条记录。

我首先尝试了这个 SQL 查询:

select * from A where ID not in (select ID from B) 然后将其与表 B 联合。 方法可以,但第一次查询 (select * from A where ID not in (select ID from B)) 非常慢(中等集群上的小时数)。

然后我尝试使用 LEFT JOIN 加速第一个查询:select A.* from A left join B on (A.ID = B.ID ) where B.ID is null

这种方法在逻辑上看起来不错,但对于 Spark 容器来说它需要大量内存(YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memory)..

什么是更好/更快/更少内存消耗的方法?

【问题讨论】:

您可以broadcast 将较小的数据框发送给所有工作人员。 你能举一个上面的例子吗(或接近上面的例子)? 用AutoBroadcast也能做到吗? conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*200) 还是您提出的方法不同? (1) - 我建议使用完全连接而不是 + 联合。 (2)加入时尽量广播小表。 (3) 您可以在这里找到Spark SQL Performance Tuning 和Tuning Spark 的一些提示。 第二张表的大小是多少?此操作是否需要第二张表的所有列?数据是否基于 id 列均匀分布?数据格式是什么? 【参考方案1】:

我也会选择left join 而不是not in

一些减少内存需求和性能的建议 -

    请看大表是按连接键(ID)均匀分布的。如果没有,那么有些任务会负担很重,有些任务会很忙。这将导致严重的缓慢。请发送groupBy ID and count 来衡量这一点。 如果连接键自然偏斜,则向连接条件添加更多列以保持结果相同。更多的列可能会增加统一打乱数据的机会。这一点很难实现。 内存需求取决于 - 正在运行的并行任务的数量、在执行器中执行的每个任务的数据量。减少其中一个或两个都将减少内存压力,并且显然运行速度较慢,但​​这比崩溃要好。我会通过在数据上创建更多分区来减少每个任务的数据量。假设您有 10 个分区用于 1B 行,然后将其设为 200 以减少每个任务的卷。在表 A 上使用repartition。不要创建太多分区,因为这会导致效率低下,10K 分区可能是个坏主意。 有一些参数需要调整,即explained here。 具有 10K 行的小表应该自动广播,因为它很小。如果没有,您可以增加广播限制并应用广播提示。

【讨论】:

嗨,ID 都是唯一的。不确定 1) 如何测量计数和分组依据,因为所有都将 = 1。我还应该尝试 repartition 作为查询的第一部分还是? 如果 id 是唯一的,那么在第一点上可以做的不多。事先重新分区并将 spark.sql.shuffle.partitions 设置为 say500 小费有帮助吗?特别是分区的想法?如果是这样,您可能需要添加一些代表 我尝试了重新分区,但没有多大帮助..仍然太长或内存不足.. 但是我切换回 Dataframe 的做事方式,这很有帮助..

以上是关于更新表时如何改进 Spark 中的 SQL 查询? (子查询中的'NOT IN')的主要内容,如果未能解决你的问题,请参考以下文章