我们如何使用 HiveWarehouseConnector 安全地重用读取 Hive 表的数据框?

Posted

技术标签:

【中文标题】我们如何使用 HiveWarehouseConnector 安全地重用读取 Hive 表的数据框?【英文标题】:How can we safely reuse a dataframe reading a Hive table using HiveWarehouseConnector? 【发布时间】:2020-01-15 20:08:53 【问题描述】:

当使用相同的数据帧使用HiveWarehouseConnector 多次读取 Hive 表时,计算期间会发生异常。

例子:

val hive = com.hortonworks.spark.sql.hive.llap.HiveWarehouseBuilder.session(spark).build()

hive.setDatabase("db")
val df_data = hive.table("table")

val df_one_col = df_data.select("col1")
val df_two_col = df_data.select("col1", "col2")

val df_res = df_two_col.join(df_one_col, "col1")

df_res.show()

我们在任务执行期间得到一个ArrayIndexOutOfBoundsException

20/01/15 19:46:36 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 18, host, executor 1): java.lang.ArrayIndexOutOfBoundsException: 1
        at org.apache.spark.sql.vectorized.ColumnarBatch.column(ColumnarBatch.java:98)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.datasourcev2scan_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

发生异常的生成代码如下第39行:

/* 031 */   ...
/* 032 */   private void datasourcev2scan_nextBatch_0() throws java.io.IOException 
/* 033 */     long getBatchStart = System.nanoTime();
/* 034 */     if (datasourcev2scan_mutableStateArray_0[0].hasNext()) 
/* 035 */       datasourcev2scan_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)datasourcev2scan_mutableStateArray_0[0].next();
/* 036 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(datasourcev2scan_mutableStateArray_1[0].numRows());
/* 037 */       datasourcev2scan_batchIdx_0 = 0;
/* 038 */       datasourcev2scan_mutableStateArray_2[0] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(0);
/* 039 */       datasourcev2scan_mutableStateArray_2[1] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(1);
/* 040 */
/* 041 */     
/* 042 */     datasourcev2scan_scanTime_0 += System.nanoTime() - getBatchStart;
/* 043 */   
/* 044 */   ...

访问第二列时抛出ArrayIndexOutOfBoundsException

实物图如下:

== Physical Plan ==
*(5) Project [col1#333, col2#334]
+- *(5) SortMergeJoin [col1#333], [col1#390], Inner
   :- *(2) Sort [col1#333 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(col1#333, 200)
   :     +- *(1) DataSourceV2Scan [col1#333, col2#334], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@4527e4
   +- *(4) Sort [col1#390 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(col1#390, 200)
         +- *(3) DataSourceV2Scan [col1#390], com.hortonworks.spark.sql.hive.llap.HiveWarehouseDataSourceReader@4527e4

我们可以看到我们使用了相同的 HiveWarehouseDataSourceReader 实例。

Spark 日志显示,触发的两个 Hive 查询仅请求“col1”列。

20/01/15 19:46:32 INFO LlapBaseInputFormat: Handle ID c259528d-60ac-42d5-a201-9646335151dd: query=select `col1` from (SELECT * FROM table) as q_d89e3f8df0cd4ce3bdf4c7d938c006ad WHERE col1 IS NOT NULL
...
20/01/15 19:46:35 INFO LlapBaseInputFormat: Handle ID c259528d-60ac-42d5-a201-9646335151dd: query=select `col1` from (SELECT * FROM table) as q_76d9b5c5a5af482a8a3e671b6c8421a8 WHERE col1 IS NOT NULL

在 Spark 逻辑计划优化期间,列修剪在同一个 HiveWarehouseDataSourceReader 实例上发生了两次,使所需的列仅是“col1”。 DataSourceV2Relation,取决于可变的阅读器,是可变的,这似乎令人惊讶。

我正在寻找一种解决方案,以安全地重用使用 HiveWarehouseConnector 读取 Hive 表的数据框。

我使用 HDP 3.1.0 和以下组件: - Apache Spark 2.3.2 - spark-llap HiveWarehouseConnector 1.0.0 - 蜂巢 3.1.0

【问题讨论】:

【参考方案1】:

Spark 2.4改进了DataSourceV2,尤其是SPARK-23203 DataSourceV2 should use immutable trees

Spark 2.3HiveWarehouseConnector 数据源阅读器中禁用列修剪。

如HDP 3.1.5 Release Notes 所述,Hortonworks 已修复此问题。 我们可以在其HiveWarehouseConnector github repository 中找到更正:

    if (useSpark23xReader) 
      LOG.info("Using reader HiveWarehouseDataSourceReaderForSpark23x with column pruning disabled");
      return new HiveWarehouseDataSourceReaderForSpark23x(params);
     else if (disablePruningPushdown) 
      LOG.info("Using reader HiveWarehouseDataSourceReader with column pruning and filter pushdown disabled");
      return new HiveWarehouseDataSourceReader(params);
     else 
      LOG.info("Using reader PrunedFilteredHiveWarehouseDataSourceReader");
      return new PrunedFilteredHiveWarehouseDataSourceReader(params);
    

另外,HDP 3.1.5 Hive integration doc 指定:

为防止此版本中出现数据正确性问题,默认情况下会禁用修剪和投影下推。 ... 为防止这些问题并确保正确的结果,请不要启用修剪和下推。

【讨论】:

以上是关于我们如何使用 HiveWarehouseConnector 安全地重用读取 Hive 表的数据框?的主要内容,如果未能解决你的问题,请参考以下文章

我们如何使用贝宝支付流来拆分金额

我们如何使用 tapGesture 在 Tvos 中播放视频

我们如何/为啥使用 operator.abs

NSStringDrawingUsesDeviceMetrics 标志是啥意思,我们如何使用它?

如何使用Countifs函数动态统计

我们如何使用 AFNetworking 3.0 记录请求