如何使用新 API 以编程方式获取 Hadoop 集群中所有正在运行的作业?

Posted

技术标签:

【中文标题】如何使用新 API 以编程方式获取 Hadoop 集群中所有正在运行的作业?【英文标题】:How to programmatically get all running jobs in a Hadoop cluster using the new API? 【发布时间】:2015-04-15 08:14:48 【问题描述】:

我有一个向 Hadoop 提交 MR 作业的软件组件。我现在想在提交之前检查是否有其他作业正在运行。我发现新 API 中有一个 Cluster 对象,可用于查询集群中正在运行的作业、获取它们的配置并从中提取相关信息。但是我在使用它时遇到了问题。

只做new Cluster(conf) 其中conf 是一个有效的Configuration 可用于访问此集群(例如,向其提交作业)使对象未配置,并且ClustergetAllJobStatuses() 方法返回null

从配置中提取mapreduce.jobtracker.address,从中构造一个InetSocketAddress并使用Cluster的另一个构造函数抛出Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.

使用旧的 api,执行new JobClient(conf).getAllJobs() 之类的操作会引发 NPE。

我在这里缺少什么?如何以编程方式获取正在运行的作业?

【问题讨论】:

那么您运行 Hadoop 2.x 吗?你为mapreduce.framework.name定义了什么? 是的,我运行 Hadoop 2.x。当我为mapreduce.framework.name 设置yarn 并添加jobclient 依赖项时,我会更进一步——new Cluster(...) 运行没有错误,但getAllJobStatuses() 挂起。我认为问题可能出在我用于集成测试的迷你集群上,我必须检查是否在“真实”集群上运行它。 我会这样认为,我相当确定它可以在真实集群上运行;) 我调查了一些,我很确定这是因为迷你集群。我向它提交了一个虚拟作业(空输入和输出目录,仅此而已),调用job.getCluster().getAllJobStatuses() 并再次收到null 【参考方案1】:

我进行了更多调查,并解决了它。 Thomas Jungblut 是对的,这是因为迷你集群。我使用this blog post 之后的迷你集群,结果证明它适用于 MR 作业,但是以不推荐的方式设置了迷你集群,配置不完整。 Hadoop Wiki 有 a page on how to develop unit tests,它还解释了如何正确设置迷你集群。

基本上,我通过以下方式设置迷你集群:

// Create a YarnConfiguration for bootstrapping the minicluster
final YarnConfiguration bootConf = new YarnConfiguration();
// Base directory to store HDFS data in
final File hdfsBase = Files.createTempDirectory("temp-hdfs-").toFile();
bootConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBase.getAbsolutePath());
// Start Mini DFS cluster
final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(bootConf).build();
// Configure and start Mini MR YARN cluster
bootConf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
bootConf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
final MiniMRYarnCluster yarnCluster = new MiniMRYarnCluster("test-cluster", 1);
yarnCluster.init(bootConf);
yarnCluster.start();
// Get the "real" Configuration to use from now on
final Configuration conf = yarnCluster.getConfig();
// Get the filesystem
final FileSystem fs = new Path ("hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/").getFileSystem(conf);

现在,我有 conffs 可以用来提交作业和访问 HDFS,new Cluster(conf)cluster.getAllJobStatuses 可以正常工作。

当一切都完成后,为了关闭和清理,我调用:

yarnCluster.stop();
hdfsCluster.shutdown();
FileUtils.deleteDirectory(hdfsBase); // from Apache Commons IO

注意: JAVA_HOME 必须设置为使其工作。在 Jenkins 上构建时,请确保为默认 JDK 设置了 JAVA_HOME。或者,您可以明确声明要使用的 JDK,然后 Jenkins 会自动设置 JAVA_HOME

【讨论】:

【参考方案2】:

我试过这样,它对我有用,但它是在提交作业之后

JobClient jc = new JobClient(job.getConfiguration());

  for(JobStatus js: jc.getAllJobs())
  
    if(js.getState().getValue() == State.RUNNING.getValue())
    

    
  

  jc.close();

否则我们可以从作业 api 获取集群,并且有一些方法可以返回所有作业、作业状态

cluster.getAllJobStatuses();

【讨论】:

以上是关于如何使用新 API 以编程方式获取 Hadoop 集群中所有正在运行的作业?的主要内容,如果未能解决你的问题,请参考以下文章

以编程方式获取设备的 Android API 级别?

SMS Retriever API - 如何以编程方式获取 SMS?

如何使用 Android 4.0 以编程方式创建新的 *** 接口?

如何以编程方式在谷歌云运行 api 中获取当前项目 ID

如何以编程方式在 Android 的 API 23 及更高版本中获取位置(纬度、经度)?

以编程方式获取加载 URL 时调用的 API 列表