ODPS MapReduce(MR2)用例记录
Posted 二两窝子面
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ODPS MapReduce(MR2)用例记录相关的知识,希望对你有一定的参考价值。
前言:上一篇写ODPS Spark时提到这茬了,于是想着把老早前写的ODPS MR也贴下做个记录,可以当MR的复习材料。
当时的场景是由于使用SQL时发现lead函数无法动态偏移(基于本人工作环境的ODPS),因此写了MR,写的时候发现直接SQL打序自连接不就行了(脑子没转过来),但是MR平时很少写,于是还是简化地写出来了。
数据集操作逻辑:实现能够动态偏移的lead函数效果————在MR中是通过分组打序,然后自连接实现的(JoinMapper=>IniReducer=>JoinReducer)。
值得注意的是:
1.ODPS中的MR2是拓展的MR,它提供的Pipeline可以实现Hadoop Chain的串行化效果,更进一步的,它不仅仅可以在Reducer后面接Mapper,还能再接任意个Reducer,因此可以使用一个作业来运行整个多步骤的复杂逻辑,还是非常好用的。
2.因为使用的是ODPS MR2,因此配置入口、数据类型都有封装好的接口,可以直接支持ODPS数仓,这里就也不再具体说明了(可参考官方文档)。
这里先简单贴下POM和完整代码,等闲下来有时间了再添加具体的业务描述和代码说明。
POM:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>MR_test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.36.4-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-commons</artifactId>
<version>0.36.4-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.36.4-public</version>
</dependency>
</dependencies>
</project>
代码:
import java.io.IOException;
//import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.aliyun.odps.mapred.Job;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
//import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
//import com.aliyun.odps.mapred.conf.JobConf;
//import com.aliyun.odps.mapred.utils.InputUtils;
//import com.aliyun.odps.mapred.utils.OutputUtils;
//import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.pipeline.Pipeline;
import com.aliyun.odps.OdpsException;
//import sun.util.resources.ga.LocaleNames_ga;
/**
* InputTable:mr_test(idnum:string,xm:string,city:string,county:string,import_time:string\\
* ,rule_param_cnt:bigint,rule_param_days:bigint,checktime:datetime)
* OutputTable:mr_test_ret(idnum:string,xm:string,rule_param_cnt:bigint,rule_param_days:bigint\\
* ,checktime:datetime,pre_checktime:datetime)
*/
public class MR_chain_test
public static final Log LOG = LogFactory.getLog(MRacid.class);
public static class JoinMapper extends MapperBase
private Record mapkey;
private Record mapvalue;
@Override
public void setup(TaskContext context) throws IOException
mapkey = context.createMapOutputKeyRecord();
mapvalue = context.createMapOutputValueRecord();
// tag = context.getInputTableInfo().getLabel().equals("left") ? 1 : 0;
@Override
public void map(long key, Record record, TaskContext context)
throws IOException
mapkey.set(0, record.get(0));
mapkey.set(1, record.get(7));
for (int i = 1; i < record.getColumnCount()-1; i++)
mapvalue.set(i - 1, record.get(i));
context.write(mapkey, mapvalue);
public static class IniReducer extends ReducerBase
private Record nkey;
// private Record value;
@Override
public void setup(TaskContext context) throws IOException
nkey = context.createOutputKeyRecord();
// value = context.createOutputRecord();
/**reduce函数每次的输入会是key相同的所有record。*/
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException
long rn = 0L;
long ans;
nkey.set(0,key.get(0));
while (values.hasNext())
rn++;
nkey.set(1,key.get(1));
nkey.setBigint(2,rn);
Record value = values.next();
context.write(nkey, value);
if (rn>(Long) value.get(4)-1L)
ans = rn - (Long) value.get(4) + 1L;
nkey.setBigint(2, ans);
context.write(nkey, value);
public static class JoinReducer extends ReducerBase
private Record result = null;
@Override
public void setup(TaskContext context) throws IOException
result = context.createOutputRecord();
/**reduce函数每次的输入会是key相同的所有record。*/
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException
long limit;
Date now_checktime;
Date pre_checktime;
Record value = values.next();
pre_checktime=(Date) key.get(1);
result.set(0,key.get(0));
result.set(5,pre_checktime);
limit=(Long) value.get(5)*24*3600*1000;
/**由于设置了outputKeySortColumn是key + tag组合,这样可以保证reduce函数的输入record中,left表的record数据在后面。*/
while (values.hasNext())
value = values.next();
now_checktime=(Date) key.get(1);
if ((Long)(now_checktime.getTime()-pre_checktime.getTime())<=limit)
result.set(1, value.get(0));
result.set(2, value.get(4));
result.set(3, value.get(5));
result.set(4, now_checktime);
context.write(result);
public static void main(String[] args) throws OdpsException
if (args.length != 2)
System.err.println("Usage: Pipeline <in_table> <out_table>");
System.exit(2);
Job job = new Job();
/**构造Pipeline的过程中,如果不指定Mapper的OutputKeySortColumns、PartitionColumns、OutputGroupingColumns,框架会默认使用其OutputKey作为此三者的默认配置。
*/
Pipeline pipeline = Pipeline.builder()
.addMapper(JoinMapper.class)
.setOutputKeySchema(
new Column[]
new Column("idnum", OdpsType.STRING),
new Column("checktime", OdpsType.DATETIME)
)
.setOutputValueSchema(
new Column[]
new Column("xm", OdpsType.STRING),
new Column("city", OdpsType.STRING),
new Column("county", OdpsType.STRING),
new Column("import_time", OdpsType.STRING),
new Column("rule_param_cnt", OdpsType.BIGINT),
new Column("rule_param_days", OdpsType.BIGINT)
)
.setOutputKeySortColumns(new String[] "idnum","checktime" )
.setPartitionColumns(new String[] "idnum" )
.setOutputGroupingColumns(new String[] "idnum" )
.addReducer(IniReducer.class)
.setOutputKeySchema(
new Column[]
new Column("idnum", OdpsType.STRING),
new Column("checktime", OdpsType.DATETIME),
new Column("rn", OdpsType.BIGINT)
)
.setOutputValueSchema(
new Column[]
new Column("xm", OdpsType.STRING),
new Column("city", OdpsType.STRING),
new Column("county", OdpsType.STRING),
new Column("import_time", OdpsType.STRING),
new Column("rule_param_cnt", OdpsType.BIGINT),
new Column("rule_param_days", OdpsType.BIGINT)
)
.setOutputKeySortColumns(new String[] "idnum","rn","checktime" )
.setPartitionColumns(new String[] "idnum" )
.setOutputGroupingColumns(new String[] "idnum","rn" )
.addReducer(JoinReducer.class).createPipeline();
/**将pipeline的设置到jobconf中,如果需要设置combiner,是通过jobconf来设置。*/
job.setPipeline(pipeline);
// job.setCombinerClass();
/**设置输入输出表。*/
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// job.setNumReduceTasks(5);
// Date startTime = new Date();
// System.out.println("Job started: " + startTime);
/**作业提交并等待结束。*/
job.submit();
job.waitForCompletion();
System.exit(job.isSuccessful() == true ? 0 : 1);
// Date end_time = new Date();
// System.out.println("Job ended: " + end_time);
// System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
最后:
打成Jar资源上传到DataWorks,新建ODPS MR节点,节点中命令行引用jar资源、指定类路径和主类来执行(或者上调度)就可以了(可参考官方文档)。
PS.本文没有涉及到多路输入输出,若需要则可以使用Label来标记和区分不同的输入输出源(M和R的过程中都可以从上下文context中提取到Label,具体可参考官方文档)。
以上是关于ODPS MapReduce(MR2)用例记录的主要内容,如果未能解决你的问题,请参考以下文章
HDP3.1 中 YRAN 和 MR2 的内存大小配置的计算方式