更新表时如何改进 Spark 中的 SQL 查询? (子查询中的'NOT IN')
Posted
技术标签:
【中文标题】更新表时如何改进 Spark 中的 SQL 查询? (子查询中的\'NOT IN\')【英文标题】:How to improve SQL query in Spark when updating table? ('NOT IN' in subquery)更新表时如何改进 Spark 中的 SQL 查询? (子查询中的'NOT IN') 【发布时间】:2020-01-21 17:33:47 【问题描述】:我在 Spark 中有一个 Dataframe,它注册为一个名为 A
的表,并且有 1 billion
记录和 10 列。第一列 (ID) 是主键。
还有另一个数据框,它注册为一个名为 B
的表,并且有 10,000
记录和 10 列(与表 A 相同的列,第一列 (ID) 是主键)。
表 B 中的记录是“更新记录”。所以我需要用表 B 中的记录更新表 A 中的所有 10,000 条记录。
我首先尝试了这个 SQL 查询:
select * from A where ID not in (select ID from B)
然后将其与表 B 联合。 方法可以,但第一次查询 (select * from A where ID not in (select ID from B)
) 非常慢(中等集群上的小时数)。
然后我尝试使用 LEFT JOIN 加速第一个查询:select A.* from A left join B on (A.ID = B.ID ) where B.ID is null
这种方法在逻辑上看起来不错,但对于 Spark 容器来说它需要大量内存(YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memory
)..
什么是更好/更快/更少内存消耗的方法?
【问题讨论】:
您可以broadcast
将较小的数据框发送给所有工作人员。
你能举一个上面的例子吗(或接近上面的例子)?
用AutoBroadcast也能做到吗? conf.set("spark.sql.autoBroadcastJoinThreshold", 1024*1024*200)
还是您提出的方法不同?
(1) - 我建议使用完全连接而不是 + 联合。 (2)加入时尽量广播小表。 (3) 您可以在这里找到Spark SQL Performance Tuning 和Tuning Spark 的一些提示。
第二张表的大小是多少?此操作是否需要第二张表的所有列?数据是否基于 id 列均匀分布?数据格式是什么?
【参考方案1】:
我也会选择left join
而不是not in
。
一些减少内存需求和性能的建议 -
-
请看大表是按连接键(ID)均匀分布的。如果没有,那么有些任务会负担很重,有些任务会很忙。这将导致严重的缓慢。请发送
groupBy ID and count
来衡量这一点。
如果连接键自然偏斜,则向连接条件添加更多列以保持结果相同。更多的列可能会增加统一打乱数据的机会。这一点很难实现。
内存需求取决于 - 正在运行的并行任务的数量、在执行器中执行的每个任务的数据量。减少其中一个或两个都将减少内存压力,并且显然运行速度较慢,但这比崩溃要好。我会通过在数据上创建更多分区来减少每个任务的数据量。假设您有 10 个分区用于 1B 行,然后将其设为 200 以减少每个任务的卷。在表 A 上使用repartition
。不要创建太多分区,因为这会导致效率低下,10K 分区可能是个坏主意。
有一些参数需要调整,即explained here。
具有 10K 行的小表应该自动广播,因为它很小。如果没有,您可以增加广播限制并应用广播提示。
【讨论】:
嗨,ID 都是唯一的。不确定 1) 如何测量计数和分组依据,因为所有都将 = 1。我还应该尝试repartition
作为查询的第一部分还是?
如果 id 是唯一的,那么在第一点上可以做的不多。事先重新分区并将 spark.sql.shuffle.partitions 设置为 say500
小费有帮助吗?特别是分区的想法?如果是这样,您可能需要添加一些代表
我尝试了重新分区,但没有多大帮助..仍然太长或内存不足..
但是我切换回 Dataframe 的做事方式,这很有帮助..以上是关于更新表时如何改进 Spark 中的 SQL 查询? (子查询中的'NOT IN')的主要内容,如果未能解决你的问题,请参考以下文章
查询同一张表时,spark sql 返回空值,但配置单元和 impaly 获取正常数据?