Hadoop分布式缓存通过通用选项-文件

Posted

技术标签:

【中文标题】Hadoop分布式缓存通过通用选项-文件【英文标题】:Hadoop Distributed Cache via Generic Options -files 【发布时间】:2015-03-23 16:41:49 【问题描述】:

当我阅读 Hadoop In Action 一书时,有一个选项指出,与其通过程序将小文件添加到分布式缓存中,不如使用 -files 通用选项来完成。

当我在代码的 setup() 中尝试此操作时,我在 fs.open() 处收到 FileNotFoundException,它向我显示了一个不确定的路径。

问题是: 如果我默认使用 -files 通用选项,文件在 HDFS 中被复制到哪里?

正在尝试执行的代码如下..

public class JoinMapSide2 extends Configured implements Tool

/*  Program     : JoinMapSide2.java 
    Description : Passing the small file via GenericOptionsParser
                  hadoop jar JoinMapSide2.jar -files orders.txt .........
    Input       : /data/patent/orders.txt(local file system), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 23/03/2015  
*/

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>

    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();

    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException 

        String line;
        String[] tokens;

        FileSystem fs;
        FSDataInputStream fdis;
        LineReader joinReader;
        Configuration conf;

        Text buffer = new Text();

        // get configuration
        conf = context.getConfiguration();
        // get file system related to the configuration
        fs = FileSystem.get(conf);

        // get all the local cache files distributed as part of the job
        URI[] localFiles = context.getCacheFiles();

        System.out.println("Cache File Path:"+localFiles[0].toString());

        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0)
            // since the file is now on HDFS FSDataInputStream to read through the file
            fdis = fs.open(new Path(localFiles[0].toString()));
            joinReader = new LineReader(fdis);

            // read local file until EOF
            try 
                while (joinReader.readLine(buffer) > 0) 
                    line = buffer.toString();
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                
             finally 
                joinReader.close();
                fdis.close();
            
        
        else
            System.err.println("No Cache Files are distributed");
        
    

    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException

        NullWritable kNull = null;

        String joinValue = joinData.get(key.toString());

        if (joinValue != null)
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
                           
    
   

@Override
public int run(String[] args) throws Exception 

    if (args.length < 2)
        System.err.println("Usage JoinMapSide -files <smallFile> <inputFile> <outputFile>");
    

    Path inFile  = new Path(args[0]); // input file(customers.txt)
    Path outFile = new Path(args[1]); // output file file

    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

    Job job = Job.getInstance(conf, "Map Side Join2");

    // this is not used as the small file is distributed to all the nodes in the cluster using
    // generic options parser
    // job.addCacheFile(disFile.toUri());   

    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);

    job.setNumReduceTasks(0);

    job.waitForCompletion(true);

    return 0;


public static void main(String args[]) throws Exception 
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

    System.exit(ret);

这是我在跟踪中看到的以下异常

Error: java.io.FileNotFoundException: File does not exist: /tmp/hadoop-yarn/staging/shiva/.staging/job_1427126201553_0003/files/orders.txt#orders.txt
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:64)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:54)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1795)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1738)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1718)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1690)

我开始工作就像

hadoop jar JoinMapSide2.jar -files orders.txt /data/patent/join/customers.txt /MROut/JoinMapSide2

任何方向都会很有帮助。谢谢

【问题讨论】:

【参考方案1】:

首先你需要将你的 orders.txt 移动到 hdfs 并且你必须使用 -files

【讨论】:

我不确定我是否同意这一点,因为 -files 选项的全部目的是将文件从本地文件系统复制到 HDFS。因此,您的解决方案超出了使用通用选项的目的。【参考方案2】:

好的,经过一番搜索,我确实发现上面的代码中有 2 个错误。

    我不应该使用FileDataInputStream 来读取分布式文件,因为它是运行映射器的节点的本地文件,我应该使用File。 我不应该使用URI.toString(),而是应该使用添加到我的文件中的符号链接,即orders.txt

我已更正下面列出的代码,希望对您有所帮助。

package org.samples.hina.training;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JoinMapSide2 extends Configured implements Tool

/*  Program     : JoinMapSide2.java 
    Description : To learn Replicated Join using Distributed Cache via Generic Options -files
    Input       : file:/patent/join/orders1.txt(distributed to all nodes), /data/patent/customers.txt
    Output      : /MROut/JoinMapSide2
    Date        : 24/03/2015  
*/

protected static class MapClass extends Mapper <Text,Text,NullWritable,Text>

    // hash table to store the key+value from the distributed file or the background data
    private Hashtable <String, String> joinData = new Hashtable <String, String>();

    // setup function for filling up the joinData for each each map() call
    protected void setup(Context context) throws IOException, InterruptedException 

        String line;
        String[] tokens;

        // get all the cache files set in the configuration set in addCacheFile()
        URI[] localFiles = context.getCacheFiles();

        System.out.println("File1:"+localFiles[0].toString());

        // check if there are any distributed files 
        // in our case we are sure we will always one so use that only 
        if (localFiles.length > 0)
            // read from LOCAL copy
            File localFile1 = new File("./orders1.txt");

            // created reader to localFile1
            BufferedReader joinReader = new BufferedReader(new FileReader(localFile1));

            // read local file until EOF
            try 
                while ((line = joinReader.readLine()) != null)
                    // apply the split pattern only once
                    tokens = line.split(",",2); 
                    // add key+value into the Hashtable
                    joinData.put(tokens[0], tokens[1]);
                
             finally 
                joinReader.close();                 
            

         else
            System.err.println("Local Cache File does not exist");
                   
    

    // map function
    protected void map(Text key,Text value, Context context) throws IOException, InterruptedException

        NullWritable kNull = null;

        String joinValue = joinData.get(key.toString());

        if (joinValue != null)
            context.write(kNull, new Text(key.toString() + "," + value.toString() + "," + joinValue));
                           
    
   

@Override
public int run(String[] args) throws Exception 

    if (args.length < 2)
        System.err.println("Usage JoinMapSide2 <inputFile> <outputFile>");
    

    Path inFile   = new Path(args[0]); // input file(customers.txt)
    Path outFile  = new Path(args[1]); // output file file

    Configuration conf = getConf();
    // delimiter for the input file
    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");

    Job job = Job.getInstance(conf, "Map Side Join2");

    // add the files orders1.txt, orders2.txt to distributed cache
    // the files added by the Generic Options -files
    //job.addCacheFile(disFile1);

    FileInputFormat.addInputPath(job, inFile);
    FileOutputFormat.setOutputPath(job, outFile);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);

    job.setJarByClass(JoinMapSide2.class);
    job.setMapperClass(MapClass.class);

    job.setNumReduceTasks(0);

    job.waitForCompletion(true);

    return 0;


public static void main(String args[]) throws Exception 
    int ret = ToolRunner.run(new Configuration(), new JoinMapSide2(), args);

    System.exit(ret);


【讨论】:

以上是关于Hadoop分布式缓存通过通用选项-文件的主要内容,如果未能解决你的问题,请参考以下文章

读取驱动程序 Hadoop 中的文件

Hadoop DistributedCache分布式缓存的使用

如何在分区器 hadoop 中使用分布式缓存?

Hadoop 分布式缓存 (Cloudera CH3)

转载:分布式缓存的通用方法—《可伸缩服务架构》

flink常见的核心概念