使用 spark hivecontext 读取外部 hive 分区表的问题
Posted
技术标签:
【中文标题】使用 spark hivecontext 读取外部 hive 分区表的问题【英文标题】:Issues with reading external hive partitioned table using spark hivecontext 【发布时间】:2016-09-22 07:19:02 【问题描述】:我有一个外部配置单元分区表,我正在尝试使用 HiveContext 从 Spark 中读取它。但我得到空值。
val maxClose = hiveContext.sql("select max(Close) from stock_partitioned_data where symbol = 'AAPL'"); maxClose.collect().foreach (println)
=====
scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext
scala> val hiveContext = new HiveContext(sc);
16/09/22 00:12:47 INFO HiveContext: Initializing execution hive, version 1.1.0
16/09/22 00:12:47 INFO ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.5.0
16/09/22 00:12:47 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@455aef06
scala> val maxClose = hiveContext.sql("select max(Close) from stock_data2")
16/09/22 00:12:53 INFO ParseDriver: Parsing command: select max(Close) from stock_data2
16/09/22 00:12:54 INFO ParseDriver: Parse Completed
16/09/22 00:12:54 INFO ClientWrapper: Inspected Hadoop version: 2.6.0-cdh5.5.0
16/09/22 00:12:54 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0-cdh5.5.0
maxClose: org.apache.spark.sql.DataFrame = [_c0: double]
scala> maxClose.collect().foreach (println )
16/09/22 00:13:04 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
16/09/22 00:13:04 INFO MemoryStore: ensureFreeSpace(425824) called with curMem=0, maxMem=556038881
16/09/22 00:13:04 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 415.8 KB, free 529.9 MB)
16/09/22 00:13:05 INFO MemoryStore: ensureFreeSpace(44793) called with curMem=425824, maxMem=556038881
16/09/22 00:13:05 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 43.7 KB, free 529.8 MB)
16/09/22 00:13:05 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.0.2.15:47553 (size: 43.7 KB, free: 530.2 MB)
16/09/22 00:13:05 INFO SparkContext: Created broadcast 0 from collect at <console>:27
16/09/22 00:13:05 INFO SparkContext: Starting job: collect at <console>:27
16/09/22 00:13:06 INFO FileInputFormat: Total input paths to process : 1
16/09/22 00:13:06 INFO DAGScheduler: Registering RDD 5 (collect at <console>:27)
16/09/22 00:13:06 INFO DAGScheduler: Got job 0 (collect at <console>:27) with 1 output partitions
16/09/22 00:13:06 INFO DAGScheduler: Final stage: ResultStage 1(collect at <console>:27)
16/09/22 00:13:06 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/09/22 00:13:06 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/09/22 00:13:06 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[5] at collect at <console>:27), which has no missing parents
16/09/22 00:13:06 INFO MemoryStore: ensureFreeSpace(18880) called with curMem=470617, maxMem=556038881
16/09/22 00:13:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 18.4 KB, free 529.8 MB)
16/09/22 00:13:06 INFO MemoryStore: ensureFreeSpace(8367) called with curMem=489497, maxMem=556038881
16/09/22 00:13:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 8.2 KB, free 529.8 MB)
16/09/22 00:13:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.0.2.15:47553 (size: 8.2 KB, free: 530.2 MB)
16/09/22 00:13:06 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
16/09/22 00:13:06 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[5] at collect at <console>:27)
16/09/22 00:13:06 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
16/09/22 00:13:07 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
16/09/22 00:13:08 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2)
16/09/22 00:13:11 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@10.0.2.15:45637] <- [akka.tcp://driverPropsFetcher@quickstart.cloudera:33635]: Error [Shut down address: akka.tcp://driverPropsFetcher@quickstart.cloudera:33635] [
akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@quickstart.cloudera:33635
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
16/09/22 00:13:12 INFO YarnClientSchedulerBackend: Registered executor: AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@quickstart.cloudera:49490/user/Executor#-842589632]) with ID 1
16/09/22 00:13:12 INFO ExecutorAllocationManager: New executor 1 has registered (new total is 1)
16/09/22 00:13:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, quickstart.cloudera, partition 0,NODE_LOCAL, 2291 bytes)
16/09/22 00:13:13 INFO BlockManagerMasterEndpoint: Registering block manager quickstart.cloudera:56958 with 530.3 MB RAM, BlockManagerId(1, quickstart.cloudera, 56958)
16/09/22 00:13:13 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on quickstart.cloudera:56958 (size: 8.2 KB, free: 530.3 MB)
16/09/22 00:13:15 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on quickstart.cloudera:56958 (size: 43.7 KB, free: 530.2 MB)
16/09/22 00:13:31 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, quickstart.cloudera, partition 1,NODE_LOCAL, 2291 bytes)
16/09/22 00:13:31 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 18583 ms on quickstart.cloudera (1/2)
16/09/22 00:13:31 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 157 ms on quickstart.cloudera (2/2)
16/09/22 00:13:31 INFO DAGScheduler: ShuffleMapStage 0 (collect at <console>:27) finished in 25.082 s
16/09/22 00:13:31 INFO DAGScheduler: looking for newly runnable stages
16/09/22 00:13:31 INFO DAGScheduler: running: Set()
16/09/22 00:13:31 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/09/22 00:13:31 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/09/22 00:13:31 INFO DAGScheduler: failed: Set()
16/09/22 00:13:31 INFO DAGScheduler: Missing parents for ResultStage 1: List()
16/09/22 00:13:31 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[8] at collect at <console>:27), which is now runnable
16/09/22 00:13:31 INFO MemoryStore: ensureFreeSpace(16544) called with curMem=497864, maxMem=556038881
16/09/22 00:13:31 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.2 KB, free 529.8 MB)
16/09/22 00:13:31 INFO MemoryStore: ensureFreeSpace(7375) called with curMem=514408, maxMem=556038881
16/09/22 00:13:31 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 7.2 KB, free 529.8 MB)
16/09/22 00:13:31 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 10.0.2.15:47553 (size: 7.2 KB, free: 530.2 MB)
16/09/22 00:13:31 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
16/09/22 00:13:31 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[8] at collect at <console>:27)
16/09/22 00:13:31 INFO YarnScheduler: Adding task set 1.0 with 1 tasks
16/09/22 00:13:31 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, quickstart.cloudera, partition 0,PROCESS_LOCAL, 1914 bytes)
16/09/22 00:13:31 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on quickstart.cloudera:56958 (size: 7.2 KB, free: 530.2 MB)
16/09/22 00:13:31 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 0 to quickstart.cloudera:49490
16/09/22 00:13:31 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 157 bytes
16/09/22 00:13:31 INFO DAGScheduler: ResultStage 1 (collect at <console>:27) finished in 0.245 s
16/09/22 00:13:31 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 245 ms on quickstart.cloudera (1/1)
16/09/22 00:13:31 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/09/22 00:13:31 INFO DAGScheduler: Job 0 finished: collect at <console>:27, took 26.194947 s
[null]
===
但如果我直接从 hive 控制台执行此操作,我会得到结果。
hive> select max(Close) from stock_data2
> ;
Query ID = cloudera_20160922001414_4b684522-3e42-4957-8260-ff6b4da67c8f
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
Starting Job = job_1474445009419_0005, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1474445009419_0005/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1474445009419_0005
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2016-09-22 00:14:45,000 Stage-1 map = 0%, reduce = 0%
2016-09-22 00:14:55,165 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.28 sec
2016-09-22 00:15:03,707 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.68 sec
MapReduce Total cumulative CPU time: 2 seconds 680 msec
Ended Job = job_1474445009419_0005
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.68 sec HDFS Read: 43379 HDFS Write: 10 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 680 msec
OK
52.369999
Time taken: 42.57 seconds, Fetched: 1 row(s)
我得到 count(*) 就好了,但是查询列值和最大值为空。
【问题讨论】:
我可以从普通(非分区和内部)表中读取它,没有任何问题。如果它是外部的或分区的,我会得到空 RDD 【参考方案1】:此问题已在 Spark 1.6 版中得到解决
【讨论】:
以上是关于使用 spark hivecontext 读取外部 hive 分区表的问题的主要内容,如果未能解决你的问题,请参考以下文章
HiveContext - 无法访问在 hive 中映射为外部表的 hbase 表
使用 SparkSQL 和 HiveContext 读取 Parquet 文件时出错
使用 Spark/Java Maven 项目获取 HiveContext