Spark写入相同目录

Posted DataRain

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark写入相同目录相关的知识,希望对你有一定的参考价值。

多个Spark任务输出到同一个目录是会报错的,这是因为Spark无法判断已经存在的目录里面是否含有和将要输出文件同名的文件,为了保证spark任务运算完毕后的输出能够正常进行,spark默认不接受已经存在的输出目录。


但是有时会遇到这种需求,就是需要將多个spark任务的输出放到同一个目录里面,简单的处理的话可以使用二级目录来处理,输出在同一个一级目录下面的子目录里也是可以的。但是如果能够在同一个目录下就更方便了,起码读取的时候也不需要做更多操作,其实使用自定义的输出类是可以让多个spark任务输出到同一个目录下面的,这是看了一个大神用的方法后发现的,这里贴一下看的原文链接:https://www.cnblogs.com/Gxiaobai/p/10705712.html

1、需要存储的RDD先转换成PairRDD,对于这些PairRDD的key,每个job都赋值一个相同的,例如第一次向那个文件夹写的key就是0,第二次向那个文件夹写的key就是1,这个key可以用于输出文件命名中,用来避免每次输出得到相同的文件名。

resultDataset.javaRDD() .mapToPair(x -> new Tuple2<>(String.valueOf(partitionNum), x.mkString(","))) .saveAsHadoopFile(fileUrl, String.class, String.class, MyOutputFormat.class);


2、调用saveAsHadoopFile方法进行保存,并且自定义一个OutputFormat类,重写一些方法用于实现这个方式的输出。generateFileNameForKeyValue方法就是用于自定义输出的partition文件名称的,可以根据key获得一个前缀。另外修改一下checkOutputSpecs方法,这里只要把if (fs.exists(outDir))这部分的判断逻辑删除就可以了,这样即使目录已经存在也不会报错。

public class MyOutputFormat extends MultipleTextOutputFormat<String, String> {
@Override protected String generateFileNameForKeyValue(String key, String value, String name) { // 一个批次的数据拥有同一个key,以此为前缀避免多批次写入同一个目录名字重复 return "subTask_" + key + "_" + super.generateFileNameForKeyValue(key, value, name); }
@Override protected String generateActualKey(String key, String value) { return null; }
@Override public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException { // 改动一下判断文件夹是否存在的逻辑:super.checkOutputSpecs(ignored, job) Path outDir = getOutputPath(job); if (outDir == null && job.getNumReduceTasks() != 0) { throw new InvalidJobConfException("Output directory not set in JobConf."); } else { if (outDir != null) { FileSystem fs = outDir.getFileSystem(job); outDir = fs.makeQualified(outDir); setOutputPath(job, outDir); TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] {outDir}, job); } } }}

最终输出的文件格式就是subTask_{key}_part-xxxxx的形式,如下图:
 


以上是关于Spark写入相同目录的主要内容,如果未能解决你的问题,请参考以下文章

Spark:如何加速 foreachRDD?

在这个 spark 代码片段中 ordering.by 是啥意思?

大数据ClickHouse(十八):Spark 写入 ClickHouse API

python+spark程序代码片段

Spark 数据集写入 2 个不同的目录

六十三Spark-读取数据并写入数据库