如何使用新 API 以编程方式获取 Hadoop 集群中所有正在运行的作业?
Posted
技术标签:
【中文标题】如何使用新 API 以编程方式获取 Hadoop 集群中所有正在运行的作业?【英文标题】:How to programmatically get all running jobs in a Hadoop cluster using the new API? 【发布时间】:2015-04-15 08:14:48 【问题描述】:我有一个向 Hadoop 提交 MR 作业的软件组件。我现在想在提交之前检查是否有其他作业正在运行。我发现新 API 中有一个 Cluster
对象,可用于查询集群中正在运行的作业、获取它们的配置并从中提取相关信息。但是我在使用它时遇到了问题。
只做new Cluster(conf)
其中conf
是一个有效的Configuration
可用于访问此集群(例如,向其提交作业)使对象未配置,并且Cluster
的getAllJobStatuses()
方法返回null
。
从配置中提取mapreduce.jobtracker.address
,从中构造一个InetSocketAddress
并使用Cluster
的另一个构造函数抛出Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses.
。
使用旧的 api,执行new JobClient(conf).getAllJobs()
之类的操作会引发 NPE。
我在这里缺少什么?如何以编程方式获取正在运行的作业?
【问题讨论】:
那么您运行 Hadoop 2.x 吗?你为mapreduce.framework.name
定义了什么?
是的,我运行 Hadoop 2.x。当我为mapreduce.framework.name
设置yarn
并添加jobclient
依赖项时,我会更进一步——new Cluster(...)
运行没有错误,但getAllJobStatuses()
挂起。我认为问题可能出在我用于集成测试的迷你集群上,我必须检查是否在“真实”集群上运行它。
我会这样认为,我相当确定它可以在真实集群上运行;)
我调查了一些,我很确定这是因为迷你集群。我向它提交了一个虚拟作业(空输入和输出目录,仅此而已),调用job.getCluster().getAllJobStatuses()
并再次收到null
。
【参考方案1】:
我进行了更多调查,并解决了它。 Thomas Jungblut 是对的,这是因为迷你集群。我使用this blog post 之后的迷你集群,结果证明它适用于 MR 作业,但是以不推荐的方式设置了迷你集群,配置不完整。 Hadoop Wiki 有 a page on how to develop unit tests,它还解释了如何正确设置迷你集群。
基本上,我通过以下方式设置迷你集群:
// Create a YarnConfiguration for bootstrapping the minicluster
final YarnConfiguration bootConf = new YarnConfiguration();
// Base directory to store HDFS data in
final File hdfsBase = Files.createTempDirectory("temp-hdfs-").toFile();
bootConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBase.getAbsolutePath());
// Start Mini DFS cluster
final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(bootConf).build();
// Configure and start Mini MR YARN cluster
bootConf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
bootConf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
final MiniMRYarnCluster yarnCluster = new MiniMRYarnCluster("test-cluster", 1);
yarnCluster.init(bootConf);
yarnCluster.start();
// Get the "real" Configuration to use from now on
final Configuration conf = yarnCluster.getConfig();
// Get the filesystem
final FileSystem fs = new Path ("hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/").getFileSystem(conf);
现在,我有 conf
和 fs
可以用来提交作业和访问 HDFS,new Cluster(conf)
和 cluster.getAllJobStatuses
可以正常工作。
当一切都完成后,为了关闭和清理,我调用:
yarnCluster.stop();
hdfsCluster.shutdown();
FileUtils.deleteDirectory(hdfsBase); // from Apache Commons IO
注意: JAVA_HOME
必须设置为使其工作。在 Jenkins 上构建时,请确保为默认 JDK 设置了 JAVA_HOME
。或者,您可以明确声明要使用的 JDK,然后 Jenkins 会自动设置 JAVA_HOME
。
【讨论】:
【参考方案2】:我试过这样,它对我有用,但它是在提交作业之后
JobClient jc = new JobClient(job.getConfiguration());
for(JobStatus js: jc.getAllJobs())
if(js.getState().getValue() == State.RUNNING.getValue())
jc.close();
否则我们可以从作业 api 获取集群,并且有一些方法可以返回所有作业、作业状态
cluster.getAllJobStatuses();
【讨论】:
以上是关于如何使用新 API 以编程方式获取 Hadoop 集群中所有正在运行的作业?的主要内容,如果未能解决你的问题,请参考以下文章
SMS Retriever API - 如何以编程方式获取 SMS?
如何使用 Android 4.0 以编程方式创建新的 *** 接口?