通过MapReduce JobID 停止(kill)指定任务

Posted 莫西里

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过MapReduce JobID 停止(kill)指定任务相关的知识,希望对你有一定的参考价值。

一、说明

       有时候我们在提交任务以后,能够获取一个MapReduce任务的ID,一般为Job_**********_xxxx的组合,下面将介绍如何获取JobID,与通过其他程序与JOBID停止一个正在运行的任务。

二、流程

1、提交任务并获取ID值。

通常情况下,我们进行远程提交时,都会使用job.waitForCompletion(true);函数去提交一个任务并且在eclipse中远程监听任务的执行情况。但是如果使用job.submit()方法后,则只是将任务提交到集群结束,即不在远程坚挺集群上任务执行的情况。

job.submit();
System.out.println(job.getJobID());

通过job.submit()方法,我们可以在提交任务后,直接结束客户端的提交行为。但是job在提交到集群前的相关数据已经封装到了job对象中。因此我们可以通过job.getJobID()获取刚刚提交的任务的ID值。

2、通过jobid停止任务

想要通过jobid停止任务,则需要重新获取集群(cluster)上的任务的示例,因此需要通过JobClient去完成相关的任务。通过下述代码可以完成对集群上正在运行任务的ID值:

package mr;

import java.io.IOException;
import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;

public class MRKillJob 
	
	public static void main(String[] args)
	
		MRKillJob test=new MRKillJob();
		test.killJob("job_1461739723866_0094");
	
	
	/**
	 * @author wozipa
	 * @Date 2016-6-9 15:02
	 * @see 删除某一个任务
	 * @param jobId
	 */
	public void killJob(String id)
	
		Configuration conf=new Configuration();
		conf.set("fs.defaultFS", "hdfs://hadoop1:9000");
 		conf.set("yarn.resourcemanager.address", "hadoop1:8032");
		conf.set("mapreduce.jobhistory.address", "192.98.12.234:10020");
		conf.set("yarn.resourcemanager.scheduler.address", "hadoop1:8030");
		conf.set("mapreduce.framework.name", "yarn");
		conf.set("mapreduce.app-submission.cross-platform", "true");
		try 
			JobClient client=new JobClient(new InetSocketAddress("192.98.12.234",8032), conf);
			RunningJob job=client.getJob(id);
			System.out.println(job.setupProgress());
			System.out.println(job.cleanupProgress());
			System.out.println(job.mapProgress());
			System.out.println(job.reduceProgress());
			System.out.println(job.getJobName());
			System.out.println(job.getJobState());
			System.out.println(job.isComplete());
			System.out.println(job.getFailureInfo());
			System.out.println(job.getHistoryUrl().toString());
			System.out.println(job.getID());
			JobStatus status=job.getJobStatus();
			System.out.println(status.toString());
			System.out.println(job.getTrackingURL());
		 catch (IOException e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		
		
	

在Hadoop2.x版本中,有两种执行MapReduce的方式,MR V1与YARN。因为在这里我的集群使用的YARN的模式(mapreduce.framework.name=yarn);因此任务的调度都是有YARN中的ResourceManager进程完成,因此想要获取JobClienti对象与集群的连接,就需要与ResourceMnager进行进行交互。因此在创建JobClient对象时的网络位置写的是ResouceManager进程的IP地址与端口号。

并且conf中需要制定ResourceManager进程的位置(id+port),因此这里照搬了任务远程提交时的配置。

当获取JobClient对象后,可以通过

RunningJob job=client.getJob(id);
函数获取相应正在执行的任务句柄,然后就可以相应的再行执行的任务了。

在测试过程中发现,当任务执行完成后,jobclient对象会自动去Job History Server上获取任务的执行信息。如果没有早conf中指定history的地址的话,jobclient会去本地0.0.0.0:10020位置上去寻找历史服务器,因此需要在conf中指定历史服务器的位置。因此可知为何在conf中设置resoucemanager的ip+port了。

如果需要停止任务的话,则使用

job.killJob();

就可以完成对任务的终止了。

RunningJob对象中也含有任务执行过程中的其他信息,可以通过job对象中的方法进行获取,例如setupProgress属性是指setup程序完成的百分比,一次类推就知道mapProgress、reduceProgress、cleanupProgress的含义了。其他属性用户可以自己去进行测试。


以上是关于通过MapReduce JobID 停止(kill)指定任务的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce工作原理图文详解

MapReduce工作原理图文详解

MapReduce的工作机制

解决运行MapReduce时遇到:Container killed on request. Exit code is 143

Hadoop 对MapReduce的理解

如何杀死hadoop工作