使用 MapReduce API 通过 Gzip 压缩复制 HDFS 内的文件
Posted
技术标签:
【中文标题】使用 MapReduce API 通过 Gzip 压缩复制 HDFS 内的文件【英文标题】:Using MapReduce API To Copy Files Inside HDFS With Gzip Compression 【发布时间】:2013-02-14 17:20:05 【问题描述】:我正在用 Java 编写一个归档程序。将归档的文件已经驻留在 HDFS 中。我需要能够将文件从 HDFS 中的一个位置移动到另一个位置,并使用 Gzip 压缩最终文件。要移动的文件可能非常大,因此使用 HDFS API 移动和压缩它们可能效率很低。所以我想我可以在我的代码中编写一个 mapreduce 作业来为我做这件事。
但是,我找不到任何示例来说明如何使用 MapReduce API 复制这些文件并将它们以 gzip 格式输出。事实上,我什至很难找到一个如何通过 mapreduce 在 HDFS 中复制文件的编程示例。
任何人都可以阐明我如何使用 MapReduce API 完成此任务吗?
编辑:这是我目前的作业配置代码,改编自 Amar 给我的帮助:
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec");
Job job = new Job(conf);
job.setJarByClass(LogArchiver.class);
job.setJobName("ArchiveMover_"+dbname);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setMapperClass(IdentityMapper.class);
//job.setReducerClass(IdentityReducer.class);
job.setInputFormatClass(NonSplittableTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(archiveStaging+"/"+dbname+"/*/*"));
FileOutputFormat.setOutputPath(job, new Path(archiveRoot+"/"+dbname));
job.submit();
这是在 LogArchiver 类中的 NonSplittableTextInputFormat 的类声明
public class NonSplittableTextInputFormat extends TextInputFormat
public NonSplittableTextInputFormat ()
@Override
protected boolean isSplitable(JobContext context, Path file)
return false;
【问题讨论】:
不确定这是否有帮助,但您是否研究过 Hadoop Streaming? wiki.apache.org/hadoop/HadoopStreaming 很可能我需要使用它,但我更喜欢通过 MapReduce API 来实现它。 【参考方案1】:您可以使用IdentityMapper
和IdentityReducer
编写custom jar implementation。
您可以生成 gzip 文件作为输出,而不是纯文本文件。在run()
中设置如下配置:
conf.setBoolean("mapred.output.compress", true);
conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec");
为了保证输入和输出的文件数量相同,只是输出文件必须经过gzip压缩,你必须做两件事:
-
实现 NonSplittableTextInputFormat
将减少任务设置为零。
为了确保每个映射器读取一个文件,您可以扩展 TextInputFormat
如下:
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapred.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat
@Override
protected boolean isSplitable(FileSystem fs, Path file)
return false;
并将上述实现用作:
job.setInputFormatClass(NonSplittableTextInputFormat.class);
要将 reduce 任务设置为零,请执行以下操作:
job.setNumReduceTasks(0);
这将为您完成工作,但最后一件事是文件名将不一样!但我也确信这里必须有一个解决方法。
【讨论】:
太棒了。我会试试这个。再次感谢! 您确实让我朝着目标迈进了一大步。我现在只有两个问题。 1) 假设我有一个这样的目录结构:top_level/second_level/third_level/file, top_level/second_level/third_level2/file 我怎样才能使输出与输入的目录结构匹配?现在它只是将所有输出文件放在一个目录中。 2) 输出文件的每一行之前似乎都有一个数字。它随每一行递增。几乎看起来像一个字符数。如何确保该数字不会出现在输出中? 好的,所以我能够通过实现我自己版本的排除密钥输出的 TextOutputFormat 类来解决第二个问题。仍在尝试弄清楚如何使输入和输出之间的目录结构匹配。 我想你可以使用这里解释的 MultipleTextOutputFormat 来解决这个问题:infoq.com/articles/HadoopOutputFormat 为此,您需要首先使用map()
中的以下内容读取输入文件路径:((FileSplit) context.getInputSplit()).getPath().getName()
,然后使用它来相应地设置输出的密钥,然后完整的有效载荷作为值。首先阅读上面的链接,然后您可能会更好地理解它。让我们看看这是否有帮助。以上是关于使用 MapReduce API 通过 Gzip 压缩复制 HDFS 内的文件的主要内容,如果未能解决你的问题,请参考以下文章
在 hadoop / hive 中处理损坏的 gzip 文件
Mailchimp API 在使用 node-fetch 而不是 json 时返回一个大的 gzip 对象
Gmail API - 值 '= Get Labels google-api-dotnet-client/1.25.0.0 (gzip)' 的格式无效
使用 cronjob + API 的 Amazon MapReduce