Spark with Hive 是不是可以将项目阶段推送到 HiveTableScan?

Posted

技术标签:

【中文标题】Spark with Hive 是不是可以将项目阶段推送到 HiveTableScan?【英文标题】:Spark with Hive is it possible to push Project phase to HiveTableScan?Spark with Hive 是否可以将项目阶段推送到 HiveTableScan? 【发布时间】:2019-09-06 09:34:23 【问题描述】:

我正在使用 Spark SQL 来查询 Hive 中以 ORC 格式存储的数据。

当我对提供给spark.sql(query) 的查询运行解释命令时,我看到以下查询计划:

== Physical Plan ==
*Project [col1, col2, col3]
+- *Filter (....)
   +- HiveTableScan [col1, col2, col3, ...col50]

据我所知,从 Hive 查询所有 50 列,然后才在 Spark 中进行过滤,后记仅选择需要的实际列。

是否可以将所需的列直接下推到 Hive,这样它们就不会一直加载到 Spark 中?

【问题讨论】:

您是否选择了 spark.sql(query) 中的所有列? @GowthamSB 否,选择项目阶段中列出的那些,即 [col1, col2, col3] 谢谢,请问您使用的是什么文件格式? 我认为问题出在版本上。我正在使用 HDP 3,当我运行相同的查询时,它没有进行完整扫描 == Physical Plan == CollectLimit 10 +- *(1) Project [account_number#245] +- *(1) FileScan parquet crr.transaction[account_number#245,pyear#341,pmonth#342,pdate#343] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://, PartitionCount: 142, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<account_number:string> a: Unit = () 【参考方案1】:

检查以下属性是否设置为默认或假?

spark.sql("SET spark.sql.orc.enabled=true");
spark.sql("SET spark.sql.hive.convertMetastoreOrc=true")
spark.sql("SET spark.sql.orc.filterPushdown=true")

这些可以帮助您避免读取不必要的列,并在您的数据分布在 hdfs上的不同分区。

将上述属性设置为“true”,然后查看您的解释计划显示的内容。

您还可以使用 Spark 对 orc 格式的分区修剪受益,因为它不需要扫描整个表,并且可以限制 Spark 在查询时需要的分区数量。这将有助于减少磁盘输入/输出操作。

例如:

我正在运行以下语句以从 Hive orc 文件格式表创建一个数据框,该表在列 'country''tran_date' 上分区。

df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' """)

给定的表有几个分区,如果我们查看上述查询的物理计划,我们可以看到它只扫描了一个分区。

== Physical Plan ==
*(1) Project [transaction_date#69, payment_type#72, city#74]
+- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#69,payment_type#72,city#74,country#76,tran_date#77] 
Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev..., 
*PartitionCount: 1,* PartitionFilters: [isnotnull(country#76), isnotnull(tran_date#77), (country#76 = United Kingdom), (tran_date#77 = 2..., 
PushedFilters: [], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>

请参阅 "PartitionCount: 1" 并且 PartitionFilters 设置为不为空。

同样,如果您在查询中指定了任何过滤器,您可以下推过滤器。 在这里,就像我使用城市列过滤掉数据一样。

df=spark.sql("""select transaction_date,payment_type,city from test_dev_db.transactionmainhistorytable where country ='United Kingdom' and tran_date='2009-06-01' and city='London' """)


== Physical Plan ==
*(1) Project [transaction_date#104, payment_type#107, city#109]
+- *(1) Filter (isnotnull(city#109) && (city#109 = London))
   +- *(1) FileScan orc test_dev_db.transactionmainhistorytable[transaction_date#104,payment_type#107,city#109,country#111,tran_date#112] 
   Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://host/user/vikct001/dev/hadoop/database/test_dev..., 
   PartitionCount: 1, PartitionFilters: [isnotnull(country#111), isnotnull(tran_date#112), (country#111 = United Kingdom), (tran_date#112..., 
   PushedFilters: [IsNotNull(city), EqualTo(city,London)], ReadSchema: struct<transaction_date:timestamp,payment_type:string,city:string>

您可以在上面看到PushedFilters 不为空,并且它有一个需要过滤的具有特定值的国家列。

【讨论】:

是的,现在我看到 == 物理计划 == *Project +- *Filter +- *FileScan orc ... 批处理:false,格式:ORC,位置:...,PartitionFilters :[],PushedFilters:[],ReadSchema:......所以现在似乎使用文件扫描而不是表扫描,它更快吗?看到 PushedFilters: [] 也很奇怪,我认为过滤器阶段会迁移到这里。这些 conf 选项是否记录在某处?我在我正在使用的spark.apache.org/docs/2.2.0/configuration.html 中找不到它们,也许您知道它们是否受 Cloudera 发行版支持?

以上是关于Spark with Hive 是不是可以将项目阶段推送到 HiveTableScan?的主要内容,如果未能解决你的问题,请参考以下文章

Spark on Yarn with Hive实战案例与常见问题解决

Hive with Hadoop vs Hive with spark vs spark sql vs HDFS - 它们如何相互协作?

Spark SQL 到底怎么搭建起来

将 Hive 转换为 spark

在 Hadoop 中使用 HBase 而不是 Hive [重复]

项目实战——参数配置化Spark将Hive表的数据写入需要用户名密码认证的ElasticSearch(Java版本)