为啥在集群模式下读取 CSV 文件失败(而在本地工作正常)?
Posted
技术标签:
【中文标题】为啥在集群模式下读取 CSV 文件失败(而在本地工作正常)?【英文标题】:Why does reading CSV file fail in cluster mode (while works fine in local)?为什么在集群模式下读取 CSV 文件失败(而在本地工作正常)? 【发布时间】:2016-04-01 10:13:14 【问题描述】:读取 csv 会导致某些 csv 文件的阶段失败。相同的代码适用于不同的 csv 文件。 Printschema 打印模式。 df.show 导致暂存错误。
val df=sqlContext.read.format("csv").option("header","true").option("inferSchema","true").load("hdfs://myIp:9000/data/time.csv")
df.printSchema
df.show
时间.csv
Date,norm date,lala
1302820211,"Thu, 14 Apr 2011 22:30:11 GMT",2016-03-28
1372820211,"Wed, 03 Jul 2013 02:56:51 GMT",2016-03-28
1304820211,"Sun, 08 May 2011 02:03:31 GMT",2016-03-28
错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 (TID 20, slave03): java.lang.NullPointerException
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:120)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:107)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
您的代码适用于我(使用 Spark V1.5.1、spark-csv V1.3.2、Scala 2.10) - 这是做工作的文件之一吗?跨度> 它在本地[*]模式下工作。我已经设置了火花大师和工人。我使用 spark 1.6 , spark-csv V1.4.0 Scala 2.10。对于“time.csv”,代码给出错误。在本地 [*] 模式下,相同的文件给出正确的输出 【参考方案1】:添加 apache commons-csv jar 和 databricks-csv jar 对我有用。最初我只包含了 databricks-csv jar
val conf = new SparkConf().setAppName("TEST APP")
.setMaster("spark://ip:7077")
.setJars(Seq("pathto/spark-csv_2.10-1.4.0.jar",
"pathTo/commons-csv-1.1.jar"))
val sc=SparkContext.getOrCreate(conf)
val sqlContext = new SQLContext(sc)
【讨论】:
你为什么要通过setJars
添加jar并在你的代码中定义SparkConf
?!您应该将spark-submit
与--jars
和--master
一起使用。我还建议使用 sbt-assembly 插件将所有依赖项捆绑在一个 uber-jar 中。
我需要在我的驱动程序中使用 csv jars。我不想将我的应用程序(驱动程序)作为 jar 提交(我为什么要提交?)。我错过了什么大事吗?以上是关于为啥在集群模式下读取 CSV 文件失败(而在本地工作正常)?的主要内容,如果未能解决你的问题,请参考以下文章
read.csv() 读取 data.frame OK readr::read_csv() for the same data.frame 失败,为啥?
Spark Dataframe Write to CSV 在独立集群模式下创建 _temporary 目录文件