Hadoop DistributedCache 已弃用 - 首选 API 是啥?

Posted

技术标签:

【中文标题】Hadoop DistributedCache 已弃用 - 首选 API 是啥?【英文标题】:Hadoop DistributedCache is deprecated - what is the preferred API?Hadoop DistributedCache 已弃用 - 首选 API 是什么? 【发布时间】:2014-02-09 23:03:10 【问题描述】:

我的地图任务需要一些配置数据,我想通过分布式缓存分发这些数据。

Hadoop的MapReduce Tutorial显示了DistributedCache类的usage,大致如下:

// In the driver
JobConf conf = new JobConf(getConf(), WordCount.class);
...
DistributedCache.addCacheFile(new Path(filename).toUri(), conf); 

// In the mapper
Path[] myCacheFiles = DistributedCache.getLocalCacheFiles(job);
...

但是,DistributedCache 在 Hadoop 2.2.0 中是 marked as deprecated。

实现这一目标的新首选方法是什么?是否有涵盖此 API 的最新示例或教程?

【问题讨论】:

【参考方案1】:

提到的解决方案都没有完全对我有用。这可能是因为 Hadoop 版本不断变化,我使用的是 hadoop 2.6.4。本质上,不推荐使用 DistributedCache,所以我不想使用它。正如一些帖子建议我们使用 addCacheFile() 然而,它已经改变了一点。这是它对我的工作方式

job.addCacheFile(new URI("hdfs://X.X.X.X:9000/EnglishStop.txt#EnglishStop.txt"));

这里的 X.X.X.X 可以是 Master IP 地址或 localhost。 EnglishStop.txt 存储在 HDFS 的 / 位置。

hadoop fs -ls /

输出是

-rw-r--r--   3 centos supergroup       1833 2016-03-12 20:24 /EnglishStop.txt
drwxr-xr-x   - centos supergroup          0 2016-03-12 19:46 /test

有趣但方便,#EnglishStop.txt 意味着现在我们可以在映射器中以“EnglishStop.txt”的形式访问它。这是相同的代码

public void setup(Context context) throws IOException, InterruptedException     

    File stopwordFile = new File("EnglishStop.txt");
    FileInputStream fis = new FileInputStream(stopwordFile);
    BufferedReader reader = new BufferedReader(new InputStreamReader(fis));

    while ((stopWord = reader.readLine()) != null) 
        // stopWord is a word read from Cache
    

这对我有用。您可以从存储在 HDFS 中的文件中读取行

【讨论】:

【参考方案2】:

我没有使用 job.addCacheFile()。相反,我像以前一样使用了 -files 选项,例如“-files /path/to/myfile.txt#myfile”。然后在 mapper 或 reducer 代码中我使用下面的方法:

/**
 * This method can be used with local execution or HDFS execution. 
 * 
 * @param context
 * @param symLink
 * @param throwExceptionIfNotFound
 * @return
 * @throws IOException
 */
public static File findDistributedFileBySymlink(JobContext context, String symLink, boolean throwExceptionIfNotFound) throws IOException

    URI[] uris = context.getCacheFiles();
    if(uris==null||uris.length==0)
    
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    
    URI symlinkUri = null;
    for(URI uri: uris)
    
        if(symLink.equals(uri.getFragment()))
        
            symlinkUri = uri;
            break;
        
       
    if(symlinkUri==null)
    
        if(throwExceptionIfNotFound)
            throw new RuntimeException("Unable to find file with symlink '"+symLink+"' in distributed cache");
        return null;
    
    //if we run this locally the file system URI scheme will be "file" otherwise it should be a symlink
    return "file".equalsIgnoreCase(FileSystem.get(context.getConfiguration()).getScheme())?(new File(symlinkUri.getPath())):new File(symLink);


然后在mapper/reducer中:

@Override
protected void setup(Context context) throws IOException, InterruptedException

    super.setup(context);

    File file = HadoopUtils.findDistributedFileBySymlink(context,"myfile",true);
    ... do work ...

请注意,如果我直接使用“-files /path/to/myfile.txt”,那么我需要使用“myfile.txt”来访问该文件,因为这是默认的符号链接名称。

【讨论】:

【参考方案3】:

我遇到了同样的问题。不仅不推荐使用 DistributedCach,而且不推荐使用 getLocalCacheFiles 和“新作业”。所以对我有用的是:

司机:

Configuration conf = getConf();
Job job = Job.getInstance(conf);
...
job.addCacheFile(new Path(filename).toUri());

在 Mapper/Reducer 设置中:

@Override
protected void setup(Context context) throws IOException, InterruptedException

    super.setup(context);

    URI[] files = context.getCacheFiles(); // getCacheFiles returns null

    Path file1path = new Path(files[0])
    ...

【讨论】:

【参考方案4】:

为了扩展 @jtravaglini,在 YARN/MapReduce 2 中使用 DistributedCache 的首选方式如下:

在您的驱动程序中,使用Job.addCacheFile()

public int run(String[] args) throws Exception 
    Configuration conf = getConf();

    Job job = Job.getInstance(conf, "MyJob");

    job.setMapperClass(MyMapper.class);

    // ...

    // Mind the # sign after the absolute file location.
    // You will be using the name after the # sign as your
    // file name in your Mapper/Reducer
    job.addCacheFile(new URI("/user/yourname/cache/some_file.json#some"));
    job.addCacheFile(new URI("/user/yourname/cache/other_file.json#other"));

    return job.waitForCompletion(true) ? 0 : 1;

在您的 Mapper/Reducer 中,覆盖 setup(Context context) 方法:

@Override
protected void setup(
        Mapper<LongWritable, Text, Text, Text>.Context context)
        throws IOException, InterruptedException 
    if (context.getCacheFiles() != null
            && context.getCacheFiles().length > 0) 

        File some_file = new File("./some");
        File other_file = new File("./other");

        // Do things to these two files, like read them
        // or parse as JSON or whatever.
    
    super.setup(context);

【讨论】:

这是在哪里记录的?【参考方案5】:

org.apache.hadoop.mapreduce.Job 类中可以找到用于 YARN/MR2 的新 DistributedCache API。

   Job.addCacheFile()

不幸的是,目前还没有很多这样的综合教程式示例。

http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapreduce/Job.html#addCacheFile%28java.net.URI%29

【讨论】:

我不知道如何检索这些使用Job.addCacheFile(URI) 添加的缓存文件。使用旧方法 (context.getCacheFiles()) 对我不起作用,因为文件为空。【参考方案6】:

分布式缓存的 API 可以在 Job 类本身中找到。在此处查看文档:http://hadoop.apache.org/docs/stable2/api/org/apache/hadoop/mapreduce/Job.html 代码应该类似于

Job job = new Job();
...
job.addCacheFile(new Path(filename).toUri());

在您的映射器代码中:

Path[] localPaths = context.getLocalCacheFiles();
...

【讨论】:

谢谢 - 我假设我因此需要使用更新的 mapreduce API 而不是 mapred,否则 JobContext 对象不会提供给映射器... 我认为 getLocalCacheFiles() 已被弃用,但 getCacheFiles() 没问题 - 虽然返回 URI 而不是路径。 不错!这是一个比使用 DistributedCache 更干净、更简单的 API。 @DNA 我不认为getLocalCacheFiles()getCacheFiles() 是一样的。你可以查看我的问题(***.com/questions/26492964/…)。如果你想访问本地化文件但不想使用已弃用的api,可以使用文件名直接打开它(后面的技术称为符号链接)。 但是如果我们使用一些框架(如级联)来创造工作机会呢?我们只能将 jobconf 传递给级联框架 - 在这种情况下,分布式缓存的替代方案是什么?

以上是关于Hadoop DistributedCache 已弃用 - 首选 API 是啥?的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop DistributedCache 无法报告状态

Hadoop DistributedCache使用案例

Hadoop 2.x 中的分布式缓存

添加外部Jar时Hadoop NoClassDefFoundError

目录是不是由 Hadoop 缓存符号链接处理?

使用 DistributedCache 访问 MapFile 时出现 FileNotFoundException