使用 Apache Spark 将 RDD 写入文本文件

Posted

技术标签:

【中文标题】使用 Apache Spark 将 RDD 写入文本文件【英文标题】:Write RDD as textfile using Apache Spark 【发布时间】:2015-06-23 04:03:39 【问题描述】:

我正在探索 Spark 进行批处理。我正在使用独立模式在本地机器上运行 spark。

我正在尝试使用 saveTextFile() 方法将 Spark RDD 转换为单个文件 [最终输出],但它不起作用。

例如,如果我有多个分区,我们如何获得一个文件作为最终输出。

更新:

我尝试了以下方法,但出现空指针异常。

person.coalesce(1).toJavaRDD().saveAsTextFile("C://Java_All//output");
person.repartition(1).toJavaRDD().saveAsTextFile("C://Java_All//output");

例外是:

    15/06/23 18:25:27 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
15/06/23 18:25:27 INFO deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
15/06/23 18:25:27 INFO deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
15/06/23 18:25:27 INFO deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class
15/06/23 18:25:27 INFO deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
15/06/23 18:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/06/23 18:25:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

15/06/23 18:25:27 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/06/23 18:25:27 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/06/23 18:25:27 INFO TaskSchedulerImpl: Cancelling stage 1
15/06/23 18:25:27 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at TestSpark.java:40) failed in 0.249 s
15/06/23 18:25:28 INFO DAGScheduler: Job 0 failed: saveAsTextFile at TestSpark.java:40, took 0.952286 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
    at org.apache.hadoop.util.Shell.run(Shell.java:379)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
    at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
    at org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
    at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1450)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/06/23 18:25:28 INFO SparkContext: Invoking stop() from shutdown hook
15/06/23 18:25:28 INFO SparkUI: Stopped Spark web UI at http://10.37.145.179:4040
15/06/23 18:25:28 INFO DAGScheduler: Stopping DAGScheduler
15/06/23 18:25:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/06/23 18:25:28 INFO Utils: path = C:\Users\crh537\AppData\Local\Temp\spark-a52371d8-ae6a-4567-b759-0a6c66c1908c\blockmgr-4d17a5b4-c8f8-4408-af07-0e88239794e8, already present as root for deletion.
15/06/23 18:25:28 INFO MemoryStore: MemoryStore cleared
15/06/23 18:25:28 INFO BlockManager: BlockManager stopped
15/06/23 18:25:28 INFO BlockManagerMaster: BlockManagerMaster stopped
15/06/23 18:25:28 INFO SparkContext: Successfully stopped SparkContext
15/06/23 18:25:28 INFO Utils: Shutdown hook called

问候, 尚卡尔

【问题讨论】:

好吧,你的 rdd 在某处变空了。我们无法帮助您找到您提供给我们的代码部分的错误。我建议您至少尝试计算您的 rdd 检查它是否为空并一一进行! 您能否检查该特定文件夹的文件系统或 HDFS 权限。您也可以在文件系统路径之前附加协议。同样如前所述,如果您想在本地运行 hadoop 相关的东西,您可能需要在系统路径中设置 WinUtils。 【参考方案1】:

您可以使用coalesce 方法保存到单个文件中。这样您的代码将如下所示:

val myFile = sc.textFile("file.txt")
val finalRdd = doStuff(myFile)
finalRdd.coalesce(1).saveAsTextFile("newfile")

还有另一种方法repartition 可以做同样的事情,但是它会导致洗牌,这可能非常昂贵,而合并会尽量避免洗牌。

【讨论】:

我正在使用 Java 来实现 Spark,但我得到了异常,我已经用异常详细信息更新了问题。 看起来它正在尝试写入文件但它失败了。您可以检查您是否有权写入目录吗?另外,由于 Spark 是懒惰的,因此问题可能出在人 rdd 上。你能运行person.coalesce(1).toJavaRDD().count() 来确保它产生多行并且不抛出异常吗? 当我使用 saveAsTextFile("") 来保存文件时,我的意思是哪个节点(工作者或驱动程序)。我们也可以给任何特定的文件名作为输出文件吗? 除非在本地使用,否则您通常不会专门保存给工人或司机。在分布式集群环境中,您通常会保存到 HDFS、s3 或其他一些存储。示例: - S3:rdd.saveAsTextFile("s3n://bucketname/path/newfile.csv") - HDFS:rdd.saveAsTextFile("hdfs://path/newfile.csv") 谢谢@Maksud 知道了。【参考方案2】:

你是在 Windows 上运行它吗?如果是,那么您需要添加以下行

System.setProperty("hadoop.home.dir", "C:\\winutil\\")

您可以从以下链接下载 winutils

http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe

【讨论】:

【参考方案3】:

Spark 内部使用 hadoop 文件系统,因此当您尝试读取和写入文件系统时,它将首先查找包含 bin\winutils.exe 的 HADOOP_HOME 配置文件夹。可能是你没有设置这就是它抛出空指针的原因。

【讨论】:

【参考方案4】:

您可以在 RDD 中使用重新分区方法。它实际上创建了与您将整数传递给它一样多的分区。在您的情况下,它将是:

rdd.repartition(1).saveAsTextFile("path to save rdd")

【讨论】:

我正在使用 Java 来实现 Spark,但我得到了异常,我已经用异常详细信息更新了问题。【参考方案5】:
    下载winutils.exe 将winutils.exe放在任意驱动器的bin文件夹下(D:/Winutils/bin/)

    在你的代码中设置路径如下

    System.setProperty("hadoop.home.dir", "D:\\Winutils\\");

现在运行你的代码,它必须工作。

【讨论】:

以上是关于使用 Apache Spark 将 RDD 写入文本文件的主要内容,如果未能解决你的问题,请参考以下文章

值 toDF 不是 org.apache.spark.rdd.RDD[(Long, org.apache.spark.ml.linalg.Vector)] 的成员

将 Parquet 文件从 Spark RDD 写入动态文件夹

值 toDS 不是 org.apache.spark.rdd.RDD 的成员

将 RDD 转换为 Dataframe Spark

CSV 到 RDD 到 Apache Spark 中的 Cassandra 存储

在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件