一次sparksql问题排查记录
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一次sparksql问题排查记录相关的知识,希望对你有一定的参考价值。
参考技术A问题: 在调试一个sparksql左连接查询时发现数据结果不正确,经过一天折腾才发现使用子查询方式能够得到正确的结果,分析执行计划发现第一种写法的优化后的执行计划将where t.ip is null and t.dn条件错误的加到了左表子查询中了,即红色标出的地方,这样导致左表子查询查不出数据来。
结论: 过滤条件写在where条件中时,spark会将sql优化为inner join, 如果连接条件中的字段出现在最后的where条件中,那么该条件在做谓词下推时也会被加到左表和右表中,此时就不符合预期结果,即会导致左表中的查不到预期的数据,但是将过滤数据用的限定条件写到子查询中时查出的结果是正确的,执行计划也是正确的,原因不详,怀疑是spark执行计划优化中的bug;
过程数据记录
1、条件在where中
select
oneday.dn, oneday.ip, \'20201202\', \'20201202\'
from
(
select
ip,dn
from dwd_dns.t_ip_dn_his_rel2
where dt = \'20201202\'
group by ip,dn
) oneday left join dwd_dns.t_ip_dn_first t on t.ip = oneday.ip and t.dn = oneday.dn
where t.ip is null and t.dn is null and t.dt = \'20201001\'
执行计划:
== Optimized Logical Plan ==
InsertIntoHiveTable dwd_dns . t_ip_dn_first , org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(dt -> None), true, false, [dn, ip, first_time, dt]
+- Project [dn#1, ip#2, 20201202 AS first_time#28, 20201202 AS dt#29]
+- Join Inner, ((ip#8 = ip#2) && (dn#7 = dn#1))
:- Aggregate [ip#2, dn#1], [ip#2, dn#1]
: +- Project [dn#1, ip#2]
: +- Filter (((((isnotnull(dt#6) && (dt#6 = 20201202)) && isnull(dn#1)) && isnull(ip#2)) && isnotnull(ip#2)) && isnotnull(dn#1))
: +- Relation[uid#0,dn#1,ip#2,cname#3,dnsip#4,probe_time#5,dt#6] orc
+- Project [dn#7, ip#8]
+- Filter (((((isnotnull(dt#10) && isnull(ip#8)) && isnull(dn#7)) && (dt#10 = 20201001)) && isnotnull(ip#8)) && isnotnull(dn#7))
+- Relation[dn#7,ip#8,first_time#9,dt#10] orc
2、条件在子查询中
select
/ + REPARTITION(10) /
oneday.dn, oneday.ip, \'20201202\', \'20201202\'
from
(
select
ip,dn
from dwd_dns.t_ip_dn_his_rel2
where dt = \'20201202\'
group by ip,dn
) oneday left join
(
select dn, ip
from
dwd_dns.t_ip_dn_first
where dt = \'20201001\'
) t on t.ip = oneday.ip and t.dn = oneday.dn
where t.ip is null and t.dn is null
执行计划:
== Optimized Logical Plan ==
InsertIntoHiveTable dwd_dns . t_ip_dn_first , org.apache.hadoop.hive.ql.io.orc.OrcSerde, Map(dt -> None), true, false, [dn, ip, first_time, dt]
+- Project [dn#1, ip#2, 20201202 AS first_time#28, 20201202 AS dt#29]
+- Repartition 10, true
+- Project [dn#1, ip#2]
+- Filter (isnull(ip#8) && isnull(dn#7))
+- Join LeftOuter, ((ip#8 = ip#2) && (dn#7 = dn#1))
:- Aggregate [ip#2, dn#1], [ip#2, dn#1]
: +- Project [dn#1, ip#2]
: +- Filter (isnotnull(dt#6) && (dt#6 = 20201202))
: +- Relation[uid#0,dn#1,ip#2,cname#3,dnsip#4,probe_time#5,dt#6] orc
+- Project [dn#7, ip#8]
+- Filter (((isnotnull(dt#10) && (dt#10 = 20201001)) && isnotnull(ip#8)) && isnotnull(dn#7))
+- Relation[dn#7,ip#8,first_time#9,dt#10] orc
sparksql 读取hive表子目录问题排查
版本:spark3.0.2
1.1. 问题现象:
业务使用hive union all产生的数据目录如下,sparksql无法读取该子目录下的数据。select * from table 这种简单的sql也无法读取。
一些博客建议设置下面两个参数,spark.hive.mapred.supports.subdirectories=true和spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive=true, 设置后sparksql依然无法读取,问题依然存在。
1.2. 问题分析:
查看物理执行计划,叶子节点为FileSourceScanExec,该执行计划是spark内置的读取文件的SparkPlan,根本就不会使用spark.hive.mapred.supports.subdirectories和spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive,这两个配置。
那么在FileSourceScanExec的逻辑中,是否能够处理这种子目录的情况呢。答案是目前FileSourceScanExec无法递归处理子目录:
FileSourceScanExec的selectedPartitions的属性保存了需要的读取的分区文件,这些文件信息来自PartitioningAwareFileIndex的listFiles方法。leafDirToChildrenFiles这个map中保存的是子目录和数据文件的映射,而path为分区路径,所以这里get返回为空。
既然FileSourceScanExec无法处理子目录,那有没有其他SparkPlan能够处理呢?
spark读取hive表时,hive表在logicPlan中其实最开始是HiveTableRelation,然后HiveSessionStateBuilder的postHocResolutionRules中的RelationConversions规则,将HiveTableRelation转换成了LogicalRelation。这个LogicalRelation最终会转换成FileSourceScanExec。
如果RelationConversions规则不生效,HiveTableRelation就不会转换成LogicalRelation,那最终HiveTableRelation对应的SparkPlan为HiveTableScanExec,在HiveTableScanExec中能够使用spark.hadoop.mapred.input.dir.recursive=true配置读取子目录。
所以如何控制RelationConversions规则是否生效,见代码。 如果数据文件是orc,那么就设置spark.sql.hive.convertMetastoreOrc=false,如果数据文件是parquet,就设置spark.sql.hive.convertMetastoreParquet=false。 设置了参数后RelationConversions规则就不会生效。
1.3. 问题解决方法
如果是orc文件:
--conf spark.hadoop.mapred.input.dir.recursive=true --conf spark.sql.hive.convertMetastoreOrc=false
如果是parquet文件:
--conf spark.hadoop.mapred.input.dir.recursive=true --conf spark.sql.hive.convertMetastoreParquet=false
以上是关于一次sparksql问题排查记录的主要内容,如果未能解决你的问题,请参考以下文章