通过SDK提交MapReduce作业
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过SDK提交MapReduce作业相关的知识,希望对你有一定的参考价值。
快速、完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分布式计算模型,能够更快速的解决用户海量数据计算问题,有效降低企业成本,并保障数据安全。
通过SDK提交MR作业的步骤如下:
步骤一:
编写MR程序,导出jar包,jar包可以不包含main方法(main方法是在本地执行)
步骤二:
上传jar包及所需的资源
(1) 通过console上传jar包到server端: add jar xxx.jar
(2)也可以通过SDK写程序上传,参考相关方法:com.aliyun.odps.ODPS.resources().create(xxx,xxx)
步骤三:
对main方法进行改进 ,主要包括两部分:
(1)设置账户信息(accessId/accessKey/endpoint),充当console/conf/odps_conf.ini中的配置功能
(2)设置MR中使用的资源,充当jar -resources xxx1.jar,xxx2.jar的功能
通过方法job.setResources("test13.jar");设置
注:本地用户Mapper类和Reducer类方法是空的(本地并不会执行这份代码),存在的目的是保证main方法编译通过
package com.aliyun.odps.examples.mr;
import com.aliyun.odps.Odps;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.conf.SessionState;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
/*
* 该示例展示了MapReduce程序中的基本结构
*
*/
public class WordCount {
public static class TokenizerMapper extends MapperBase {
}
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class SumReducer extends ReducerBase {
}
public static void main(String[] args) throws Exception {
// /////////////额外添加的代码//////////
String endpoint = "your_endpoint";
String accessId = "your_access_id";
String accessKey = "your_access_key";
String project = "your_project";
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setDefaultProject(project);
odps.setEndpoint(endpoint);
SessionState.get().setOdps(odps);
SessionState.get().setLocalRun(false);
// ///////////////////////////////
JobConf job = new JobConf();
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(SumCombiner.class);
job.setReducerClass(SumReducer.class);
// /////////////额外添加的代码//////////
// 资源名称列表,多个资源用逗号分隔
job.setResources("test13.jar");
// //////////////////////////////////
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
InputUtils.addTable(TableInfo.builder().tableName("wc_in").build(), job);
OutputUtils.addTable(TableInfo.builder().tableName("wc_out").build(), job);
RunningJob rj = JobClient.runJob(job);
rj.waitForCompletion();
}
}
以上是关于通过SDK提交MapReduce作业的主要内容,如果未能解决你的问题,请参考以下文章