如何通过Java程序提交yarn的MapReduce计算任务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何通过Java程序提交yarn的MapReduce计算任务相关的知识,希望对你有一定的参考价值。
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<textpair,value>,TextPair的格式为<key1,key2>。
3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用 参考技术A 1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的不同,这里变为了,TextPair的格式为。3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用 参考技术B
下面为MapReduce主程序,有几点须要提一下:
1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不正确文件进行切分。
2、为了控制reduce的处理过程。map的输出键的格式为组合键格式。
与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。
3、为了适应组合键,又一次设定了分组函数。即GroupComparator。分组规则为,仅仅要TextPair中的key1同样(不要求key2同样),则数据被分配到一个reduce容器中。这样,当同样key1的数据进入reduce容器后,key2起到了一个数据标识的作用。
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import util.Utils;
public class GEMIMain
public GEMIMain()
job = null;
public Job job;
public static class NamePartitioner extends
Partitioner<TextPair, BytesWritable>
@Override
public int getPartition(TextPair key, BytesWritable value,
int numPartitions)
return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
/**
* 分组设置类。仅仅要两个TextPair的第一个key同样。他们就属于同一组。
他们的Value就放到一个Value迭代器中,
* 然后进入Reducer的reduce方法中。
*
* @author hduser
*
*/
public static class GroupComparator extends WritableComparator
public GroupComparator()
super(TextPair.class, true);
@Override
public int compare(WritableComparable a, WritableComparable b)
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
// 比较同样则返回0,比较不同则返回-1
return t1.getFirst().compareTo(t2.getFirst()); // 仅仅要是第一个字段同样的就分成为同一组
public boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException
Configuration conf = new Configuration();
// 在conf中设置outputath变量,以在reduce函数中能够获取到该参数的值
conf.set("outputPath", args[args.length - 1].toString());
//设置HDFS中,每次任务生成产品的质量文件所在目录。args数组的倒数第二个原数为质量文件所在目录
conf.set("qualityFolder", args[args.length - 2].toString());
//假设在Server中执行。则须要获取web项目的根路径;假设以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件
//MapReduceProgress mprogress = new MapReduceProgress();
//String rootPath= mprogress.rootPath;
String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
conf.addResource(new Path(rootPath+"yarn-site.xml"));
conf.addResource(new Path(rootPath+"core-site.xml"));
conf.addResource(new Path(rootPath+"hdfs-site.xml"));
conf.addResource(new Path(rootPath+"mapred-site.xml"));
this.job = new Job(conf);
job.setJobName("Job name:" + args[0]);
job.setJarByClass(GEMIMain.class);
job.setMapperClass(GEMIMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(BytesWritable.class);
// 设置partition
job.setPartitionerClass(NamePartitioner.class);
// 在分区之后依照指定的条件分组
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(GEMIReducer.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(8);
// 设置计算输入数据的路径
for (int i = 1; i < args.length - 2; i++)
FileInputFormat.addInputPath(job, new Path(args[i]));
// args数组的最后一个元素为输出路径
FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
boolean flag = job.waitForCompletion(true);
return flag;
@SuppressWarnings("static-access")
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException
String[] inputPaths = new String[] "normalizeJob",
"hdfs://192.168.168.101:9000/user/hduser/red1/",
"hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
"hdfs://192.168.168.101:9000/user/hduser/test" ;
GEMIMain test = new GEMIMain();
boolean result = test.runJob(inputPaths);
下面为TextPair类
public class TextPair implements WritableComparable<TextPair>
private Text first;
private Text second;
public TextPair()
set(new Text(), new Text());
public TextPair(String first, String second)
set(new Text(first), new Text(second));
public TextPair(Text first, Text second)
set(first, second);
public void set(Text first, Text second)
this.first = first;
this.second = second;
public Text getFirst()
return first;
public Text getSecond()
return second;
@Override
public void write(DataOutput out) throws IOException
first.write(out);
second.write(out);
@Override
public void readFields(DataInput in) throws IOException
first.readFields(in);
second.readFields(in);
@Override
public int hashCode()
return first.hashCode() * 163 + second.hashCode();
@Override
public boolean equals(Object o)
if (o instanceof TextPair)
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
return false;
@Override
public String toString()
return first + "\\t" + second;
@Override
/**A.compareTo(B)
* 假设比较同样,则比较结果为0
* 假设A大于B,则比较结果为1
* 假设A小于B。则比较结果为-1
*
*/
public int compareTo(TextPair tp)
int cmp = first.compareTo(tp.first);
if (cmp != 0)
return cmp;
//此时实现的是升序排列
return second.compareTo(tp.second);
下面为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable>
@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException
// TODO Auto-generated method stub
return new WholeFileRecordReader();
@Override
protected boolean isSplitable(JobContext context, Path filename)
// TODO Auto-generated method stub
return false;
下面为WholeFileRecordReader类
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable>
private FileSplit fileSplit;
private FSDataInputStream fis;
private Text key = null;
private BytesWritable value = null;
private boolean processed = false;
@Override
public void close() throws IOException
// TODO Auto-generated method stub
// fis.close();
@Override
public Text getCurrentKey() throws IOException, InterruptedException
// TODO Auto-generated method stub
return this.key;
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException
// TODO Auto-generated method stub
return this.value;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
throws IOException, InterruptedException
fileSplit = (FileSplit) inputSplit;
Configuration job = tacontext.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
fis = fs.open(file);
@Override
public boolean nextKeyValue()
if (key == null)
key = new Text();
if (value == null)
value = new BytesWritable();
if (!processed)
byte[] content = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
System.out.println(file.getName());
key.set(file.getName());
try
IOUtils.readFully(fis, content, 0, content.length);
// value.set(content, 0, content.length);
value.set(new BytesWritable(content));
catch (IOException e)
// TODO Auto-generated catch block
e.printStackTrace();
finally
IOUtils.closeStream(fis);
processed = true;
return true;
return false;
@Override
public float getProgress() throws IOException, InterruptedException
// TODO Auto-generated method stub
return processed ? fileSplit.getLength() : 0;
如何找到Hive提交的SQL相对应的Yarn程序的applicationId
最近的工作是利用Hive做数据仓库的ETL转换,大致方式是将ETL转换逻辑写在一个hsql文件中,脚本当中都是简单的SQL语句,不包含判断、循环等存储过程中才有的写法,仅仅支持一些简单的变量替换,比如当前账期等。然后通过一个通用的shell脚本来执行hsql文件。该脚本是主要是调用了hive -f <hsql文件>来执行hsql文件中的SQL语句的,当然hive命令会通过--hivevar选项定义变量将当前账期等数值传进去供SQL使用。
简单说下环境信息,目前使用的大数据平台版本是HDP 3.1.0.0-78,Hive版本是3.1.0,而Tez版本是0.9.1。Hive 3.X系列的新特性主要包括:
1. 执行引擎不再支持mr,取而代之的是tez或者spark(在HDP平台默认是tez);
2. 不再支持胖客户端Hive CLI,被beeline取代(目前通过hive命令执行sql实际还是调用的beeline去连接hiveserver2服务);
3. 默认建表支持ACID语义;
4. 支持LLAP,即Live Long and Process,相当于内存计算,极大地优化了性能(该特性实际从Hive 2.X开始支持);
言归正传,回到本文的主题,比如Hive在运行过程中报错了,我们需要在Yarn上找到对应的application的日志以便定位问题,前提是需要知道Yarn程序对应的applicationId,但是beeline的输出信息中是没有applicationId的,那么如何找到Hive提交的SQL相对应的Yarn程序的applicationId呢?主要有以下几个步骤:
1. 我们通过shell脚本提交hsql文件时实际是通过beeline向hiveserver2服务提交hsql文件中的SQL语句,我们的shell脚本会将beeline的屏幕输出信息同时重定向到日志文件中,这个就是我们的第一个步骤的日志。我们找到这个日志文件,在其中搜索关键字"Completed executing command",可以得到queryId,其中每个SQL语句对应1个queryId,因为我们的hsql脚本中有4个SQL语句,所以搜索出来的信息如下:
INFO : Completed executing command(queryId=hive_20200502095437_1e9bf52d-e590-4519-a6e1-9e2e4ae91158); Time taken: 0.755 seconds
INFO : Completed executing command(queryId=hive_20200502095816_888a7dba-4403-439d-a3a7-f6cdc280c18a); Time taken: 52.929 seconds
INFO : Completed executing command(queryId=hive_20200502100121_5752f019-a6e2-463c-b413-a80bbe518a5c); Time taken: 52.66 seconds
INFO : Completed executing command(queryId=hive_20200502100428_9d4b8955-b84c-40be-a605-9ce19b4b7773); Time taken: 26.463 seconds
2. 找到对应的hiveserver2服务在哪台机器上。由于beeline是通过zookeeper随机连接一个hiveserver2服务,所以从上一步的日志中可以看到连接的是哪台机器上的hiveserver2服务。然后登录到该台主机,通过netstat和ps命令找到对应的hiveserver2进程,从ps命令输出的进程信息对应的命令行中,我们可以找到下面的参数。
-Dhive.log.dir=/var/log/hive -Dhive.log.file=hiveserver2.log
上面的参数说明了hiveserver2服务对应的日志名称和路径。这样我们就可以找到hiveserver2服务对应的日志,这是我们第二个步骤的日志。从这个日志里通过搜索关键字"callerId=<queryId>",<queryId>用上一步得到的真实的queryId替换(比如搜索"callerId=hive_20200502095816_888a7dba-4403-439d-a3a7-f6cdc280c18a")。我们将上面的4个queryId逐一用前述的关键字搜索,得到信息如下:
2020-05-02T10:00:30,440 INFO [Thread-536694]: client.TezClient (:()) - Submitting dag to TezSession, sessionName=HIVE-315802e2-f6e4-499d-a707-4d3057180abd, applicationId=application_1588062934554_53656, dagName=create temporary table ngdwt.rpt_to_etc_...t (Stage-1), callerContext={ context=HIVE, callerType=HIVE_QUERY_ID, callerId=hive_20200502095816_888a7dba-4403-439d-a3a7-f6cdc280c18a }
2020-05-02T10:00:57,229 INFO [Thread-536729]: client.TezClient (:()) - Submitting dag to TezSession, sessionName=HIVE-315802e2-f6e4-499d-a707-4d3057180abd, applicationId=application_1588062934554_53656, dagName=create temporary table ngdwt.rpt_to_etc_...t (Stage-4), callerContext={ context=HIVE, callerType=HIVE_QUERY_ID, callerId=hive_20200502095816_888a7dba-4403-439d-a3a7-f6cdc280c18a }
2020-05-02T10:03:22,043 INFO [Thread-537002]: client.TezClient (:()) - Submitting dag to TezSession, sessionName=HIVE-315802e2-f6e4-499d-a707-4d3057180abd, applicationId=application_1588062934554_53656, dagName=create temporary tabl...ov_in,a.statis_date (Stage-1), callerContext={ context=HIVE, callerType=HIVE_QUERY_ID, callerId=hive_20200502100121_5752f019-a6e2-463c-b413-a80bbe518a5c }
2020-05-02T10:07:39,627 INFO [Thread-537495]: client.TezClient (:()) - Submitting dag to TezSession, sessionName=HIVE-315802e2-f6e4-499d-a707-4d3057180abd, applicationId=application_1588062934554_53656, dagName=insert into ngdwt.rpt_to_etc_rece_d(re...t (Stage-1), callerContext={ context=HIVE, callerType=HIVE_QUERY_ID, callerId=hive_20200502100428_9d4b8955-b84c-40be-a605-9ce19b4b7773 }
可以看到第1个queryId用关键字"callerId=<queryId>"去搜索没有搜到信息,因为对应的第一个sql语句是ddl语句,不会向yarn提交程序(但是仅用"<queryId>"去搜索还是能搜到信息),后面第二个queryId搜索出来有2行,其他queryId只有1行。
可以看到这些queryId(hive命令输出信息)或者callerId(hiveserver2.log日志信息)对应的hive session和yarn application是同一个:
sessionName=HIVE-315802e2-f6e4-499d-a707-4d3057180abd
applicationId=application_1588062934554_53656
也就是说,同一个hsql文件中的不同SQL语句对应的是同一个hive session以及同一个yarn application.
来看下yarn web管理页面中该application的截图。
可以看到上面页面中的Name跟hiveserver2.log中的sessionName一致,Application Tags跟hiveserver2.log中的callerId(或者hive命令屏幕输出信息中的queryId)一致。
而yarn ui2中的程序信息如下:
3. 找到了applicationId就比较好办了,可以通过下面的命令将yarn日志从hdfs下载到本地(待yarn程序执行完毕)
yarn logs -applicationId application_1588062934554_53656 > application_1588062934554_53656.log
然后可以对application_1588062934554_53656.log做进一步的分析。
比如用关键字"Container: container_"搜索并去重排序后得到9个container的信息:
Container: container_e46_1588062934554_53656_01_000001 on hadoop19_45454_1588385301489
Container: container_e46_1588062934554_53656_01_000002 on hadoop31_45454_1588385300882
Container: container_e46_1588062934554_53656_01_000003 on hadoop40_45454_1588385301059
Container: container_e46_1588062934554_53656_01_000004 on hadoop27_45454_1588385300739
Container: container_e46_1588062934554_53656_01_000006 on hadoop36_45454_1588385301268
Container: container_e46_1588062934554_53656_01_000007 on hadoop57_45454_1588385301076
Container: container_e46_1588062934554_53656_01_000008 on hadoop22_45454_1588385301501
Container: container_e46_1588062934554_53656_01_000009 on hadoop31_45454_1588385300882
Container: container_e46_1588062934554_53656_01_000010 on hadoop21_45454_1588385301473
上面是该application对用的所有container.
或者用关键字"Assigning container to task:"搜索得到任务分配信息,其中container_e46_1588062934554_53656_01_000001因为是applicationmaster没有任务分配信息,其他8个container都有任务分配信息,其中container_e46_1588062934554_53656_01_000008和container_e46_1588062934554_53656_01_000009有2条记录,但attempt不同,表示这2个container里的任务之前有失败的,分别进行了2次尝试。为了简洁起见,这里仅列出搜索出来的第一条记录:
2020-05-01 22:00:40,603 [INFO] [DelayedContainerManager] |rm.YarnTaskSchedulerService|: Assigning container to task: containerId=container_e46_1588062934554_53656_01_000002, task=attempt_1588062934554_53656_1_00_000000_0, containerHost=hadoop31:45454, containerPriority= 11, containerResources=<memory:12288, vCores:1>, localityMatchType=RackLocal, matchedLocation=/default-rack, honorLocalityFlags=false, reusedContainer=false, delayedContainers=3
因为现在的Hive的执行引擎不再是mr,而是改成了tez,目前我对tez并不太熟悉,只是理解它为mr的升级版,在原来的map/reduce操作上增加了DAG,不同job之间的数据传递不必写到HDFS,而是类似数据流的方式,减少了中间环节,提升了效率。对于一个Tez程序,类似于MR程序的MRAppMaster和YarnChild进程,它会产生DAGAppMaster和TezChild进程,前者是master负责管理整个程序以及申请资源,后者是slave,负责执行具体的计算任务。
以上是关于如何通过Java程序提交yarn的MapReduce计算任务的主要内容,如果未能解决你的问题,请参考以下文章
如何找到Hive提交的SQL相对应的Yarn程序的applicationId