Spark写入相同目录
Posted DataRain
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark写入相同目录相关的知识,希望对你有一定的参考价值。
多个Spark任务输出到同一个目录是会报错的,这是因为Spark无法判断已经存在的目录里面是否含有和将要输出文件同名的文件,为了保证spark任务运算完毕后的输出能够正常进行,spark默认不接受已经存在的输出目录。
但是有时会遇到这种需求,就是需要將多个spark任务的输出放到同一个目录里面,简单的处理的话可以使用二级目录来处理,输出在同一个一级目录下面的子目录里也是可以的。但是如果能够在同一个目录下就更方便了,起码读取的时候也不需要做更多操作,其实使用自定义的输出类是可以让多个spark任务输出到同一个目录下面的,这是看了一个大神用的方法后发现的,这里贴一下看的原文链接:https://www.cnblogs.com/Gxiaobai/p/10705712.html。
1、需要存储的RDD先转换成PairRDD,对于这些PairRDD的key,每个job都赋值一个相同的,例如第一次向那个文件夹写的key就是0,第二次向那个文件夹写的key就是1,这个key可以用于输出文件命名中,用来避免每次输出得到相同的文件名。
resultDataset.javaRDD()
.mapToPair(x -> new Tuple2<>(String.valueOf(partitionNum), x.mkString(",")))
.saveAsHadoopFile(fileUrl, String.class, String.class, MyOutputFormat.class);
2、调用saveAsHadoopFile方法进行保存,并且自定义一个OutputFormat类,重写一些方法用于实现这个方式的输出。generateFileNameForKeyValue方法就是用于自定义输出的partition文件名称的,可以根据key获得一个前缀。另外修改一下checkOutputSpecs方法,这里只要把if (fs.exists(outDir))这部分的判断逻辑删除就可以了,这样即使目录已经存在也不会报错。
public class MyOutputFormat extends MultipleTextOutputFormat<String, String> {
protected String generateFileNameForKeyValue(String key, String value, String name) {
// 一个批次的数据拥有同一个key,以此为前缀避免多批次写入同一个目录名字重复
return "subTask_" + key + "_" + super.generateFileNameForKeyValue(key, value, name);
}
protected String generateActualKey(String key, String value) {
return null;
}
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException {
// 改动一下判断文件夹是否存在的逻辑:super.checkOutputSpecs(ignored, job)
Path outDir = getOutputPath(job);
if (outDir == null && job.getNumReduceTasks() != 0) {
throw new InvalidJobConfException("Output directory not set in JobConf.");
} else {
if (outDir != null) {
FileSystem fs = outDir.getFileSystem(job);
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] {outDir}, job);
}
}
}
}
最终输出的文件格式就是subTask_{key}_part-xxxxx的形式,如下图:
以上是关于Spark写入相同目录的主要内容,如果未能解决你的问题,请参考以下文章
在这个 spark 代码片段中 ordering.by 是啥意思?