Hadoop分布式缓存通过通用选项-文件
Posted
技术标签:
【中文标题】Hadoop分布式缓存通过通用选项-文件【英文标题】:Hadoop Distributed Cache via Generic Options -files 【发布时间】:2015-03-23 16:41:49 【问题描述】:当我阅读 Hadoop In Action 一书时,有一个选项指出,与其通过程序将小文件添加到分布式缓存中,不如使用 -files 通用选项来完成。
当我在代码的 setup() 中尝试此操作时,我在 fs.open() 处收到 FileNotFoundException,它向我显示了一个不确定的路径。
问题是: 如果我默认使用 -files 通用选项,文件在 HDFS 中被复制到哪里?
正在尝试执行的代码如下..
public class JoinMapSide2 extends Configured implements Tool
/* Program : JoinMapSide2.java
Description : Passing the small file via GenericOptionsParser
hadoop jar JoinMapSide2.jar -files orders.txt .........
Input : /data/patent/orders.txt(local file system), /data/patent/customers.txt
Output : /MROut/JoinMapSide2
Date : 23/03/2015
*/
protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>
// hash table to store the key+value from the distributed file or the background data
private Hashtable <String, String> joinData = new Hashtable <String, String>();
// setup function for filling up the joinData for each each map() call
protected void setup(Context context) throws IOException, InterruptedException
String line;
String[] tokens;
FileSystem fs;
FSDataInputStream fdis;
LineReader joinReader;
Configuration conf;
Text buffer = new Text();
// get configuration
conf = context.getConfiguration();
// get file system related to the configuration
fs = FileSystem.get(conf);
// get all the local cache files distributed as part of the job
URI[] localFiles = context.getCacheFiles();
System.out.println("Cache File Path:"+localFiles[0].toString());
// check if there are any distributed files
// in our case we are sure we will always one so use that only
if (localFiles.length > 0)
// since the file is now on HDFS FSDataInputStream to read through the file
fdis = fs.open(new Path(localFiles[0].toString()));
joinReader = new LineReader(fdis);
// read local file until EOF
try
while (joinReader.readLine(buffer) > 0)
line = buffer.toString();
// apply the split pattern only once
tokens = line.split(",",2);
// add key+value into the Hashtable
joinData.put(tokens[0], tokens[1]);
finally
joinReader.close();
fdis.close();
else
System.err.println("No Cache Files are distributed");
// map function
protected void map(Text key,Text value, Context context) throws IOException, InterruptedException
NullWritable kNull = null;
String joinValue = joinData.get(key.toString());
if (joinValue != null)
context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
@Override
public int run(String[] args) throws Exception
if (args.length < 2)
System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
Path inFile = new Path(args[0]); // input file(customers.txt)
Path outFile = new Path(args[1]); // output file file
Configuration conf = getConf();
// delimiter for the input file
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
Job job = Job.getInstance(conf, "Map Side Join2");
// this is not used as the small file is distributed to all the nodes in the cluster using
// generic options parser
// job.addCacheFile(disFile.toUri());
FileInputFormat.addInputPath(job, inFile);
FileOutputFormat.setOutputPath(job, outFile);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setJarByClass(JoinMapSide2.class);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return 0;
public static void main(String args[]) throws Exception
int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);
System.exit(ret);
这是我在跟踪中看到的以下异常
Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)
我开始工作就像
hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2
任何方向都会很有帮助。谢谢
【问题讨论】:
【参考方案1】:首先你需要将你的 orders.txt 移动到 hdfs 并且你必须使用 -files
【讨论】:
我不确定我是否同意这一点,因为 -files 选项的全部目的是将文件从本地文件系统复制到 HDFS。因此,您的解决方案超出了使用通用选项的目的。【参考方案2】:好的,经过一番搜索,我确实发现上面的代码中有 2 个错误。
-
我不应该使用
FileDataInputStream
来读取分布式文件,因为它是运行映射器的节点的本地文件,我应该使用File
。
我不应该使用URI.toString()
,而是应该使用添加到我的文件中的符号链接,即orders.txt
我已更正下面列出的代码,希望对您有所帮助。
package org.samples.hina.training;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JoinMapSide2 extends Configured implements Tool
/* Program : JoinMapSide2.java
Description : To learn Replicated Join using Distributed Cache via Generic Options -files
Input : file:/patent/join/orders1.txt(distributed to all nodes), /data/patent/customers.txt
Output : /MROut/JoinMapSide2
Date : 24/03/2015
*/
protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>
// hash table to store the key+value from the distributed file or the background data
private Hashtable <String, String> joinData = new Hashtable <String, String>();
// setup function for filling up the joinData for each each map() call
protected void setup(Context context) throws IOException, InterruptedException
String line;
String[] tokens;
// get all the cache files set in the configuration set in addCacheFile()
URI[] localFiles = context.getCacheFiles();
System.out.println("File1:"+localFiles[0].toString());
// check if there are any distributed files
// in our case we are sure we will always one so use that only
if (localFiles.length > 0)
// read from LOCAL copy
File localFile1 = new File("./orders1.txt");
// created reader to localFile1
BufferedReader joinReader = new BufferedReader(new FileReader(localFile1));
// read local file until EOF
try
while ((line = joinReader.readLine()) != null)
// apply the split pattern only once
tokens = line.split(",",2);
// add key+value into the Hashtable
joinData.put(tokens[0], tokens[1]);
finally
joinReader.close();
else
System.err.println("Local Cache File does not exist");
// map function
protected void map(Text key,Text value, Context context) throws IOException, InterruptedException
NullWritable kNull = null;
String joinValue = joinData.get(key.toString());
if (joinValue != null)
context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
@Override
public int run(String[] args) throws Exception
if (args.length < 2)
System.err.println("Usage JoinMapSide2 <inputFile> <outputFile>");
Path inFile = new Path(args[0]); // input file(customers.txt)
Path outFile = new Path(args[1]); // output file file
Configuration conf = getConf();
// delimiter for the input file
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
Job job = Job.getInstance(conf, "Map Side Join2");
// add the files orders1.txt, orders2.txt to distributed cache
// the files added by the Generic Options -files
//job.addCacheFile(disFile1);
FileInputFormat.addInputPath(job, inFile);
FileOutputFormat.setOutputPath(job, outFile);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setJarByClass(JoinMapSide2.class);
job.setMapperClass(MapClass.class);
job.setNumReduceTasks(0);
job.waitForCompletion(true);
return 0;
public static void main(String args[]) throws Exception
int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);
System.exit(ret);
【讨论】:
以上是关于Hadoop分布式缓存通过通用选项-文件的主要内容,如果未能解决你的问题,请参考以下文章