使用 Apache Spark 将 RDD 写入文本文件
Posted
技术标签:
【中文标题】使用 Apache Spark 将 RDD 写入文本文件【英文标题】:Write RDD as textfile using Apache Spark 【发布时间】:2015-06-23 04:03:39 【问题描述】:我正在探索 Spark 进行批处理。我正在使用独立模式在本地机器上运行 spark。
我正在尝试使用 saveTextFile() 方法将 Spark RDD 转换为单个文件 [最终输出],但它不起作用。
例如,如果我有多个分区,我们如何获得一个文件作为最终输出。
更新:
我尝试了以下方法,但出现空指针异常。
person.coalesce(1).toJavaRDD().saveAsTextFile("C://Java_All//output");
person.repartition(1).toJavaRDD().saveAsTextFile("C://Java_All//output");
例外是:
15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
15/06/23 18:25:27 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
15/06/23 18:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/06/23 18:25:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/06/23 18:25:27 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/06/23 18:25:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/06/23 18:25:27 INFO TaskSchedulerImpl: Cancelling stage 1
15/06/23 18:25:27 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at TestSpark.java:40) failed in 0.249 s
15/06/23 18:25:28 INFO DAGScheduler: Job 0 failed: saveAsTextFile at TestSpark.java:40, took 0.952286 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/23 18:25:28 INFO SparkContext: Invoking stop() from shutdown hook
15/06/23 18:25:28 INFO SparkUI: Stopped Spark web UI at http://10.37.145.179:4040
15/06/23 18:25:28 INFO DAGScheduler: Stopping DAGScheduler
15/06/23 18:25:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/06/23 18:25:28 INFO Utils: path = C:\Users\crh537\AppData\Local\Temp\spark-a52371d8-ae6a-4567-b759-0a6c66c1908c\blockmgr-4d17a5b4-c8f8-4408-af07-0e88239794e8, already present as root for deletion.
15/06/23 18:25:28 INFO MemoryStore: MemoryStore cleared
15/06/23 18:25:28 INFO BlockManager: BlockManager stopped
15/06/23 18:25:28 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/23 18:25:28 INFO SparkContext: Successfully stopped SparkContext
15/06/23 18:25:28 INFO Utils: Shutdown hook called
问候, 尚卡尔
【问题讨论】:
好吧,你的 rdd 在某处变空了。我们无法帮助您找到您提供给我们的代码部分的错误。我建议您至少尝试计算您的 rdd 检查它是否为空并一一进行! 您能否检查该特定文件夹的文件系统或 HDFS 权限。您也可以在文件系统路径之前附加协议。同样如前所述,如果您想在本地运行 hadoop 相关的东西,您可能需要在系统路径中设置 WinUtils。 【参考方案1】:您可以使用coalesce
方法保存到单个文件中。这样您的代码将如下所示:
val myFile = sc.textFile("file.txt")
val finalRdd = doStuff(myFile)
finalRdd.coalesce(1).saveAsTextFile("newfile")
还有另一种方法repartition
可以做同样的事情,但是它会导致洗牌,这可能非常昂贵,而合并会尽量避免洗牌。
【讨论】:
我正在使用 Java 来实现 Spark,但我得到了异常,我已经用异常详细信息更新了问题。 看起来它正在尝试写入文件但它失败了。您可以检查您是否有权写入目录吗?另外,由于 Spark 是懒惰的,因此问题可能出在人 rdd 上。你能运行person.coalesce(1).toJavaRDD().count()
来确保它产生多行并且不抛出异常吗?
当我使用 saveAsTextFile("") 来保存文件时,我的意思是哪个节点(工作者或驱动程序)。我们也可以给任何特定的文件名作为输出文件吗?
除非在本地使用,否则您通常不会专门保存给工人或司机。在分布式集群环境中,您通常会保存到 HDFS、s3 或其他一些存储。示例: - S3:rdd.saveAsTextFile("s3n://bucketname/path/newfile.csv") - HDFS:rdd.saveAsTextFile("hdfs://path/newfile.csv")
谢谢@Maksud 知道了。【参考方案2】:
你是在 Windows 上运行它吗?如果是,那么您需要添加以下行
System.setProperty("hadoop.home.dir", "C:\\winutil\\")
您可以从以下链接下载 winutils
http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe
【讨论】:
【参考方案3】:Spark 内部使用 hadoop 文件系统,因此当您尝试读取和写入文件系统时,它将首先查找包含 bin\winutils.exe 的 HADOOP_HOME 配置文件夹。可能是你没有设置这就是它抛出空指针的原因。
【讨论】:
【参考方案4】:您可以在 RDD 中使用重新分区方法。它实际上创建了与您将整数传递给它一样多的分区。在您的情况下,它将是:
rdd.repartition(1).saveAsTextFile("path to save rdd")
【讨论】:
我正在使用 Java 来实现 Spark,但我得到了异常,我已经用异常详细信息更新了问题。【参考方案5】:-
下载winutils.exe
将winutils.exe放在任意驱动器的bin文件夹下(D:/Winutils/bin/)
在你的代码中设置路径如下
System.setProperty("hadoop.home.dir", "D:\\Winutils\\");
现在运行你的代码,它必须工作。
【讨论】:
以上是关于使用 Apache Spark 将 RDD 写入文本文件的主要内容,如果未能解决你的问题,请参考以下文章
值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员
将 Parquet 文件从 Spark RDD 写入动态文件夹
值 toDS 不是 org.apache.spark.rdd.RDD 的成员