带有 POM 的 SparkSQL-Scala
Posted
技术标签:
【中文标题】带有 POM 的 SparkSQL-Scala【英文标题】:SparkSQL-Scala with POM 【发布时间】:2016-06-30 13:06:45 【问题描述】:我对 Cloudera VM 和 Spark 有一些问题。首先,我是 Spark 的新手,我的老板要求我在虚拟机中在 Scala 上运行 Spark 以进行一些测试。
我已经在 Virtual Box 环境中下载了虚拟机,所以我打开了 Eclipse,我在 Maven 上有了一个新项目。 不知不觉中,我之前运行了 Cloudera 环境并启动了所有服务,如 Spark、Yarn、Hive 等。 Cloudera 服务中的所有服务都运行良好,并且所有检查都是绿色的。我用 Impala 做了一些测试,效果很好。
使用 Eclipse 和 Scala-Maven 环境,情况变得更糟:这是我在 Scala 中的非常简单的代码:
package org.test.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
object TestSelectAlgorithm
def main(args: Array[String]) =
val conf = new SparkConf()
.setAppName("TestSelectAlgorithm")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.sql("SELECT * FROM products").show()
测试非常简单,因为“products”表存在:如果我在 Impala 上复制并粘贴相同的查询,查询工作正常!
在 Eclipse 环境下,否则,我有一些问题:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/30 05:43:17 INFO SparkContext: Running Spark version 1.6.0
16/06/30 05:43:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/06/30 05:43:18 WARN Utils: Your hostname, quickstart.cloudera resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
16/06/30 05:43:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
16/06/30 05:43:18 INFO SecurityManager: Changing view acls to: cloudera
16/06/30 05:43:18 INFO SecurityManager: Changing modify acls to: cloudera
16/06/30 05:43:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera)
16/06/30 05:43:19 INFO Utils: Successfully started service 'sparkDriver' on port 53730.
16/06/30 05:43:19 INFO Slf4jLogger: Slf4jLogger started
16/06/30 05:43:19 INFO Remoting: Starting remoting
16/06/30 05:43:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@10.0.2.15:39288]
16/06/30 05:43:19 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 39288.
16/06/30 05:43:19 INFO SparkEnv: Registering MapOutputTracker
16/06/30 05:43:19 INFO SparkEnv: Registering BlockManagerMaster
16/06/30 05:43:19 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-7d685fc0-ea88-423a-9335-42ca12db85da
16/06/30 05:43:19 INFO MemoryStore: MemoryStore started with capacity 1619.3 MB
16/06/30 05:43:20 INFO SparkEnv: Registering OutputCommitCoordinator
16/06/30 05:43:20 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/06/30 05:43:20 INFO SparkUI: Started SparkUI at http://10.0.2.15:4040
16/06/30 05:43:20 INFO Executor: Starting executor ID driver on host localhost
16/06/30 05:43:20 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57294.
16/06/30 05:43:20 INFO NettyBlockTransferService: Server created on 57294
16/06/30 05:43:20 INFO BlockManagerMaster: Trying to register BlockManager
16/06/30 05:43:20 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57294 with 1619.3 MB RAM, BlockManagerId(driver, localhost, 57294)
16/06/30 05:43:20 INFO BlockManagerMaster: Registered BlockManager
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: products;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:306)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:315)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$9.applyOrElse(Analyzer.scala:310)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:310)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:300)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:72)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:36)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:36)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
at org.test.spark.TestSelectAlgorithm$.main(TestSelectAlgorithm.scala:18)
at org.test.spark.TestSelectAlgorithm.main(TestSelectAlgorithm.scala)
16/06/30 05:43:22 INFO SparkContext: Invoking stop() from shutdown hook
16/06/30 05:43:22 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040
16/06/30 05:43:22 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/06/30 05:43:22 INFO MemoryStore: MemoryStore cleared
16/06/30 05:43:22 INFO BlockManager: BlockManager stopped
16/06/30 05:43:22 INFO BlockManagerMaster: BlockManagerMaster stopped
16/06/30 05:43:22 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/06/30 05:43:22 INFO SparkContext: Successfully stopped SparkContext
16/06/30 05:43:22 INFO ShutdownHookManager: Shutdown hook called
16/06/30 05:43:22 INFO ShutdownHookManager: Deleting directory /tmp/spark-29d381e9-b5e7-485c-92f2-55dc57ca7d25
主要错误是(对我来说):
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table not found: products;
我在其他网站和文档上搜索,发现问题与 Hive 表有关...但我没有使用 Hive 表,我使用 SparkSql...
有人可以帮帮我吗? 感谢您的回复。
【问题讨论】:
这个products
表在哪里?在关系数据库中?还是你想从 hdfs 读取文件?
来自 hdfs:也许我在虚拟机中对 quickstart.cloudera:8888/impala/execute/query/8#query/results ==> IMPALA 执行相同的查询 - 这非常有效。
您需要使用数据框或create a schema > register temp table > run query
- 此代码会给您一些提示 - 对于文本文件格式:gist.github.com/InvisibleTech/c71cb88b2390eb2223a8 对于 jsonfile 格式:tutorialspoint.com/spark_sql/spark_sql_dataframes.htm
如果你给我一个示例文件格式,我可以给你正确的解决方案
我想用我的查询结果在 hdfs 上创建一个简单的表......所以,在这种模式下,我可以创建另一个查询等等......也是一个带有结果的 RDD很好,我想创建一个嵌套查询...
【参考方案1】:
在 spark 中,对于 impala 没有像 hive 那样的直接支持。所以,你必须加载文件。如果是 csv 你可以使用 spark-csv,
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("your .csv file location")
import sqlContext.implicits._
import sqlContext._
df.registerTempTable("products")
sqlContext.sql("select * from products").show()
spark-csv 的 pom 依赖
<!-- https://mvnrepository.com/artifact/com.databricks/spark-csv_2.10 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
对于 avro,有 spark-avro
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.avro("your .avro file location")
import sqlContext.implicits._
import sqlContext._
df.registerTempTable("products")
val result= sqlContext.sql("select * from products")
val result.show()
result.write
.format("com.databricks.spark.avro")
.save("Your ouput location")
avro 的 pom 依赖
<!-- http://mvnrepository.com/artifact/com.databricks/spark-avro_2.10 -->
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.10</artifactId>
<version>2.0.1</version>
</dependency>
parquet spark具有内置支持
val sqlContext = new SQLContext(sc)
val parquetFile = sqlContext.read.parquet("your parquet file location")
parquetFile.registerTempTable("products")
sqlContext.sql("select * from products").show()
【讨论】:
好的,首先:非常感谢!现在我必须了解我可以将什么样的路径放入“您的文件位置”!现在“产品”表已创建,问题已解决!!! ;-) 但是,如果您下次不想帮助我,您能告诉我在哪里找到 avro 格式输出的路径吗? 在这段代码中,我们没有将文件保存到任何位置,因为您必须做一些额外的工作。请检查更新的代码。【参考方案2】:您能否检查 /user/cloudera/.sparkStaging/stagingArea 位置是否存在或是否包含 .avro 文件?请按目录位置更改“您的输出位置”。 请查看 avro github 页面以获取更多详细信息。 https://github.com/databricks/spark-avro
【讨论】:
以上是关于带有 POM 的 SparkSQL-Scala的主要内容,如果未能解决你的问题,请参考以下文章
带有 Gradle 的 POM 中缺少 Android 库依赖项
Github 操作 - 带有 maven 的 Java:mvn 验证多个 pom [关闭]
带有 org.springframework.cloud 和 spring-boot-starter-parent 的自定义父 pom