从 PySpark 查询 Hive 表时出错
Posted
技术标签:
【中文标题】从 PySpark 查询 Hive 表时出错【英文标题】:Errors querying Hive table from PySpark 【发布时间】:2020-09-26 03:04:24 【问题描述】:我想使用 PySpark 查询 Hive 表(当前在本地运行,但将迁移到 Databricks),但我一直遇到错误。凭借我不存在的 Java 知识,我今天大部分时间都在尝试来自网络的各种解决方案,但似乎没有任何效果。
我尝试过的事情:
通过 DBeaver 使用相同的凭据查询表,这很有效 使用StructType
和StructField
指定方案,但出现相同的错误
改用 PyHive 和 impyla 连接,但没有成功。不断收到 TSocket read 0 bytes 错误
注册为临时表并使用SQL查询,但报同样的错误
感谢任何指导!谢谢!
from pyspark.sql import SparkSession
# initialize spark session
spark = SparkSession.builder.appName('test').getOrCreate()
# connect
driver = "org.apache.hive.jdbc.HiveDriver"
remote_table = spark.read.format("jdbc")\
.option("driver", driver)\
.option("url", url)\
.option("dbtable", table)\
.option("user", username)\
.option("password", password)\
.load()\
.limit(100)
# print schema
remote_table.printSchema()
输出
root
|-- ga_union.calendar_date: string (nullable = true)
|-- ga_union.profile_view: string (nullable = true)
|-- ga_union.channel_grouping: string (nullable = true)
|-- ga_union.device_category: string (nullable = true)
|-- ga_union.ga_source: string (nullable = true)
|-- ga_union.ga_medium: string (nullable = true)
|-- ga_union.sessions: double (nullable = true)
|-- ga_union.bounces: double (nullable = true)
|-- ga_union.pageviews: double (nullable = true)
|-- ga_union.users: double (nullable = true)
|-- ga_union.total_time_on_site: double (nullable = true)
|-- ga_union.newsletter_signup: double (nullable = true)
|-- ga_union.configuration_starts: double (nullable = true)
|-- ga_union.configuration_complete: double (nullable = true)
|-- ga_union.goal15_completions: double (nullable = true)
# show first 10 rows
remote_table.select("*").show(10)
输出
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-29-58d5fd3b71ec> in <module>
----> 1 remote_table.select("*").show(10)
~/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
~/opt/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
~/opt/anaconda3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
126 def deco(*a, **kw):
127 try:
--> 128 return f(*a, **kw)
129 except py4j.protocol.Py4JJavaError as e:
130 converted = convert_exception(e.java_exception)
~/opt/anaconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling 012.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o158.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, us-c02sc3d2gvc1.fios-router.home, executor driver): java.sql.SQLException: Cannot convert column 7 to double: java.lang.NumberFormatException: For input string: "ga_union.sessions"
at org.apache.hive.jdbc.HiveBaseResultSet.getDouble(HiveBaseResultSet.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$5(JdbcUtils.scala:417)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$5$adapted(JdbcUtils.scala:416)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:361)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "ga_union.sessions"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at org.apache.hive.jdbc.HiveBaseResultSet.getDouble(HiveBaseResultSet.java:293)
... 22 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: Cannot convert column 7 to double: java.lang.NumberFormatException: For input string: "ga_union.sessions"
at org.apache.hive.jdbc.HiveBaseResultSet.getDouble(HiveBaseResultSet.java:298)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$5(JdbcUtils.scala:417)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$5$adapted(JdbcUtils.scala:416)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:361)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:343)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.NumberFormatException: For input string: "ga_union.sessions"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043)
at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
at java.lang.Double.parseDouble(Double.java:538)
at org.apache.hive.jdbc.HiveBaseResultSet.getDouble(HiveBaseResultSet.java:293)
... 22 more
【问题讨论】:
连接 hive 元数据时,您不需要 jdbc。 其他问题,我记下相同的答案。 【参考方案1】:直线等需要 jdbc。Spark 访问 Hive 时不需要。
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
只需启用配置单元支持。
见https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
【讨论】:
有道理,但是如果我通过主机名/用户名/密码连接到数据库,我应该如何指定warehouse_location
?
这通常使用 ACL、Ranger、Active Directory 完成。与连接Oracle、mysql不同。
您介意指点我设置的方向吗?
docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/securing-hive/…
感谢您的帮助!认为这比我需要的要多。我只是在寻找在 Python 环境中查询表以用于分析的最简单方法。根据您的建议,我有以下内容,但在选择 * spark = SparkSession \ .builder \ .appName("test") \ .config("javax.jdo.option.ConnectionURL", url) \ .config("javax.jdo.option.ConnectionDriverName", driver) \ .config("javax.jdo.option.ConnectionUserName", username) \ .config("javax.jdo.option.ConnectionPassword", password) \ .enableHiveSupport() \ .getOrCreate()
时出现错误以上是关于从 PySpark 查询 Hive 表时出错的主要内容,如果未能解决你的问题,请参考以下文章
将数据从 CSV 文件映射到 HDFS 上的 Hive 表时出错