使用 MapReduce API 通过 Gzip 压缩复制 HDFS 内的文件

Posted

技术标签:

【中文标题】使用 MapReduce API 通过 Gzip 压缩复制 HDFS 内的文件【英文标题】:Using MapReduce API To Copy Files Inside HDFS With Gzip Compression 【发布时间】:2013-02-14 17:20:05 【问题描述】:

我正在用 Java 编写一个归档程序。将归档的文件已经驻留在 HDFS 中。我需要能够将文件从 HDFS 中的一个位置移动到另一个位置,并使用 Gzip 压缩最终文件。要移动的文件可能非常大,因此使用 HDFS API 移动和压缩它们可能效率很低。所以我想我可以在我的代码中编写一个 mapreduce 作业来为我做这件事。

但是,我找不到任何示例来说明如何使用 MapReduce API 复制这些文件并将它们以 gzip 格式输出。事实上,我什至很难找到一个如何通过 mapreduce 在 HDFS 中复制文件的编程示例。

任何人都可以阐明我如何使用 MapReduce API 完成此任务吗?

编辑:这是我目前的作业配置代码,改编自 Amar 给我的帮助:

        conf.setBoolean("mapred.output.compress", true); 
        conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec");
        Job job = new Job(conf);
        job.setJarByClass(LogArchiver.class);
        job.setJobName("ArchiveMover_"+dbname);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //job.setMapperClass(IdentityMapper.class);
        //job.setReducerClass(IdentityReducer.class);
        job.setInputFormatClass(NonSplittableTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setNumReduceTasks(0);
        FileInputFormat.setInputPaths(job, new Path(archiveStaging+"/"+dbname+"/*/*"));
        FileOutputFormat.setOutputPath(job, new Path(archiveRoot+"/"+dbname));
        job.submit();

这是在 LogArchiver 类中的 NonSplittableTextInputFormat 的类声明

public class NonSplittableTextInputFormat extends TextInputFormat 
    public NonSplittableTextInputFormat () 
    

    @Override
    protected boolean isSplitable(JobContext context, Path file) 
        return false;
    

【问题讨论】:

不确定这是否有帮助,但您是否研究过 Hadoop Streaming? wiki.apache.org/hadoop/HadoopStreaming 很可能我需要使用它,但我更喜欢通过 MapReduce API 来实现它。 【参考方案1】:

您可以使用IdentityMapperIdentityReducer 编写custom jar implementation。 您可以生成 gzip 文件作为输出,而不是纯文本文件。在run()中设置如下配置:

conf.setBoolean("mapred.output.compress", true); 
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec");

为了保证输入和输出的文件数量相同,只是输出文件必须经过gzip压缩,你必须做两件事:

    实现 NonSplittableTextInputFormat 将减少任务设置为零。

为了确保每个映射器读取一个文件,您可以扩展 TextInputFormat 如下:

import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat 
    @Override
    protected boolean isSplitable(FileSystem fs, Path file) 
        return false;
    

并将上述实现用作:

job.setInputFormatClass(NonSplittableTextInputFormat.class);

要将 reduce 任务设置为零,请执行以下操作:

job.setNumReduceTasks(0);

这将为您完成工作,但最后一件事是文件名将不一样!但我也确信这里必须有一个解决方法。

【讨论】:

太棒了。我会试试这个。再次感谢! 您确实让我朝着目标迈进了一大步。我现在只有两个问题。 1) 假设我有一个这样的目录结构:top_level/second_level/third_level/file, top_level/second_level/third_level2/file 我怎样才能使输出与输入的目录结构匹配?现在它只是将所有输出文件放在一个目录中。 2) 输出文件的每一行之前似乎都有一个数字。它随每一行递增。几乎看起来像一个字符数。如何确保该数字不会出现在输出中? 好的,所以我能够通过实现我自己版本的排除密钥输出的 TextOutputFormat 类来解决第二个问题。仍在尝试弄清楚如何使输入和输出之间的目录结构匹配。 我想你可以使用这里解释的 MultipleTextOutputFormat 来解决这个问题:infoq.com/articles/HadoopOutputFormat 为此,您需要首先使用map() 中的以下内容读取输入文件路径:((FileSplit) context.getInputSplit()).getPath().getName(),然后使用它来相应地设置输出的密钥,然后完整的有效载荷作为值。首先阅读上面的链接,然后您可能会更好地理解它。让我们看看这是否有帮助。

以上是关于使用 MapReduce API 通过 Gzip 压缩复制 HDFS 内的文件的主要内容,如果未能解决你的问题,请参考以下文章

在 hadoop / hive 中处理损坏的 gzip 文件

Mailchimp API 在使用 node-fetch 而不是 json 时返回一个大的 gzip 对象

Gmail API - 值 '= Get Labels google-api-dotnet-client/1.25.0.0 (gzip)' 的格式无效

使用 cronjob + API 的 Amazon MapReduce

Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

MapReduce编程实战-词频统计结果存入mysql数据库