将 sql server jar 添加到 pyspark 的类路径后无法查询 hive
Posted
技术标签:
【中文标题】将 sql server jar 添加到 pyspark 的类路径后无法查询 hive【英文标题】:unable to query hive after adding sql server jar to pyspark's class path 【发布时间】:2017-06-08 16:02:18 【问题描述】:Hive 设置正确,我可以在使用pyspark
输入repl 后使用spark.sql
查询它。我想从 sql server 读取一个表并将其保存到 hive。如果启动 repl 传递一个 jdbc jar 像 pyspark --driver-class-path sqljdbc4.jar --jars sqljdbc4.jar
我可以从 sql server 读取。但是现在 spark 无法访问 hive。对现有 hive 表的任何查询都会导致 Lzo Codec 错误(见下文)。
我想知道如何查询/下拉外部 sql server 表,然后将其保存到现有的 hive 表中。
spark.sql("select max(product_id) from table").show()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 287, in show
print(self._jdf.showString(n, truncate))
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *HashAggregate(keys=[], functions=[partial_max(product_id#35)], output=[max#45])
+- HiveTableScan [product_id#35], MetastoreRelation db, table
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:185)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:263)
at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:123)
at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:114)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 41 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
... 80 more
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:139)
at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:179)
at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
... 85 more
Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:132)
... 87 more
【问题讨论】:
【参考方案1】:--driver-class-path 标志替换您在默认设置中配置的任何值。它不附加新值。您可能已将 LZO jar 添加到 spark-defaults.conf 文件中的类路径中,并且它被忽略了,因为您正在使用标志覆盖该设置。您应该:
1) 在您的 --driver-class-path
设置中包含整个驱动程序类路径
或
2) 将 SQL JDBC jar 添加到 spark-defaults.conf 文件中 spark.driver.extraClassPath
设置的类路径中
此外,正如@Tim 在评论中指出的那样,您仍然需要使用 --jars 标志将您的 jar 提供给命令行。
【讨论】:
谢谢@RyanW。我想你应该提到你仍然必须通过罐子。然后我会标记它是正确的。以上是关于将 sql server jar 添加到 pyspark 的类路径后无法查询 hive的主要内容,如果未能解决你的问题,请参考以下文章
无法将 jdbc 连接到 sql server 2008 r2