将 sql server jar 添加到 pyspark 的类路径后无法查询 hive

Posted

技术标签:

【中文标题】将 sql server jar 添加到 pyspark 的类路径后无法查询 hive【英文标题】:unable to query hive after adding sql server jar to pyspark's class path 【发布时间】:2017-06-08 16:02:18 【问题描述】:

Hive 设置正确,我可以在使用pyspark 输入repl 后使用spark.sql 查询它。我想从 sql server 读取一个表并将其保存到 hive。如果启动 repl 传递一个 jdbc jar 像 pyspark --driver-class-path sqljdbc4.jar --jars sqljdbc4.jar 我可以从 sql server 读取。但是现在 spark 无法访问 hive。对现有 hive 表的任何查询都会导致 Lzo Codec 错误(见下文)。

我想知道如何查询/下拉外部 sql server 表,然后将其保存到现有的 hive 表中。

spark.sql("select max(product_id) from table").show()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 287, in show
        print(self._jdf.showString(n, truncate))
      File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString.
    : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
    Exchange SinglePartition
    +- *HashAggregate(keys=[], functions=[partial_max(product_id#35)], output=[max#45])
       +- HiveTableScan [product_id#35], MetastoreRelation db, table

        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:114)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
        at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
        at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
        at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.RuntimeException: Error in configuring object
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112)
        at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
        at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
        at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:185)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:246)
        at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:263)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:123)
        at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:114)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 41 more
    Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
        ... 80 more
    Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:139)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:179)
        at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)
        ... 85 more
    Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
        at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:132)
        ... 87 more

【问题讨论】:

【参考方案1】:

--driver-class-path 标志替换您在默认设置中配置的任何值。它不附加新值。您可能已将 LZO jar 添加到 spark-defaults.conf 文件中的类路径中,并且它被忽略了,因为您正在使用标志覆盖该设置。您应该:

1) 在您的 --driver-class-path 设置中包含整个驱动程序类路径

2) 将 SQL JDBC jar 添加到 spark-defaults.conf 文件中 spark.driver.extraClassPath 设置的类路径中

此外,正如@Tim 在评论中指出的那样,您仍然需要使用 --jars 标志将您的 jar 提供给命令行。

【讨论】:

谢谢@RyanW。我想你应该提到你仍然必须通过罐子。然后我会标记它是正确的。

以上是关于将 sql server jar 添加到 pyspark 的类路径后无法查询 hive的主要内容,如果未能解决你的问题,请参考以下文章

如何安装使用SQL Server的JDBC驱动程序

无法将 jdbc 连接到 sql server 2008 r2

将sqlite数据库添加到jar时出现sql错误或缺少数据库

如何将jar包加入到Maven本地仓库

将 jars 添加到 spark 应用程序

从 Android Studio 连接到 SQL Server 2014