ODPS MapReduce(MR2)用例记录

Posted 二两窝子面

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ODPS MapReduce(MR2)用例记录相关的知识,希望对你有一定的参考价值。

        前言:上一篇写ODPS Spark时提到这茬了,于是想着把老早前写的ODPS MR也贴下做个记录,可以当MR的复习材料。
        当时的场景是由于使用SQL时发现lead函数无法动态偏移(基于本人工作环境的ODPS),因此写了MR,写的时候发现直接SQL打序自连接不就行了(脑子没转过来),但是MR平时很少写,于是还是简化地写出来了。

        数据集操作逻辑:实现能够动态偏移的lead函数效果————在MR中是通过分组打序,然后自连接实现的(JoinMapper=>IniReducer=>JoinReducer)。

        值得注意的是:
          1.ODPS中的MR2是拓展的MR,它提供的Pipeline可以实现Hadoop Chain的串行化效果,更进一步的,它不仅仅可以在Reducer后面接Mapper,还能再接任意个Reducer,因此可以使用一个作业来运行整个多步骤的复杂逻辑,还是非常好用的。
          2.因为使用的是ODPS MR2,因此配置入口、数据类型都有封装好的接口,可以直接支持ODPS数仓,这里就也不再具体说明了(可参考官方文档)。


        这里先简单贴下POM和完整代码,等闲下来有时间了再添加具体的业务描述和代码说明。

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>MR_test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-mapred</artifactId>
            <version>0.36.4-public</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-commons</artifactId>
            <version>0.36.4-public</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>odps-sdk-core</artifactId>
            <version>0.36.4-public</version>
        </dependency>
    </dependencies>

</project>

代码:

import java.io.IOException;
//import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import com.aliyun.odps.Column;
import com.aliyun.odps.OdpsType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.aliyun.odps.mapred.Job;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
//import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
//import com.aliyun.odps.mapred.conf.JobConf;
//import com.aliyun.odps.mapred.utils.InputUtils;
//import com.aliyun.odps.mapred.utils.OutputUtils;
//import com.aliyun.odps.mapred.utils.SchemaUtils;
import com.aliyun.odps.pipeline.Pipeline;
import com.aliyun.odps.OdpsException;
//import sun.util.resources.ga.LocaleNames_ga;

/**
 * InputTable:mr_test(idnum:string,xm:string,city:string,county:string,import_time:string\\
 * ,rule_param_cnt:bigint,rule_param_days:bigint,checktime:datetime)
 * OutputTable:mr_test_ret(idnum:string,xm:string,rule_param_cnt:bigint,rule_param_days:bigint\\
 * ,checktime:datetime,pre_checktime:datetime)
 */
public class MR_chain_test 
    public static final Log LOG = LogFactory.getLog(MRacid.class);


    public static class JoinMapper extends MapperBase 
        private Record mapkey;
        private Record mapvalue;
        @Override
        public void setup(TaskContext context) throws IOException 
            mapkey = context.createMapOutputKeyRecord();
            mapvalue = context.createMapOutputValueRecord();
//            tag = context.getInputTableInfo().getLabel().equals("left") ? 1 : 0;
        
        @Override
        public void map(long key, Record record, TaskContext context)
                throws IOException 
            mapkey.set(0, record.get(0));
            mapkey.set(1, record.get(7));
            for (int i = 1; i < record.getColumnCount()-1; i++) 
                mapvalue.set(i - 1, record.get(i));
            
            context.write(mapkey, mapvalue);
        
    

    public static class IniReducer extends ReducerBase 
        private Record nkey;
//        private Record value;
        @Override
        public void setup(TaskContext context) throws IOException 
            nkey = context.createOutputKeyRecord();
//            value = context.createOutputRecord();
        
        /**reduce函数每次的输入会是key相同的所有record。*/
        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
                throws IOException 
            long rn = 0L;
            long ans;
            nkey.set(0,key.get(0));
            while (values.hasNext()) 
                rn++;
                nkey.set(1,key.get(1));
                nkey.setBigint(2,rn);
                Record value = values.next();
                context.write(nkey, value);
                if (rn>(Long) value.get(4)-1L) 
                    ans = rn - (Long) value.get(4) + 1L;
                    nkey.setBigint(2, ans);
                    context.write(nkey, value);
                
            
        

    


    public static class JoinReducer extends ReducerBase 
        private Record result = null;
        @Override
        public void setup(TaskContext context) throws IOException 
            result = context.createOutputRecord();
        
        /**reduce函数每次的输入会是key相同的所有record。*/
        @Override
        public void reduce(Record key, Iterator<Record> values, TaskContext context)
                throws IOException 
            long limit;
            Date now_checktime;
            Date pre_checktime;
            Record value = values.next();
            pre_checktime=(Date) key.get(1);
            result.set(0,key.get(0));
            result.set(5,pre_checktime);
            limit=(Long) value.get(5)*24*3600*1000;
            /**由于设置了outputKeySortColumn是key + tag组合,这样可以保证reduce函数的输入record中,left表的record数据在后面。*/
            while (values.hasNext()) 
                value = values.next();
                now_checktime=(Date) key.get(1);
                    if ((Long)(now_checktime.getTime()-pre_checktime.getTime())<=limit) 
                    result.set(1, value.get(0));
                    result.set(2, value.get(4));
                    result.set(3, value.get(5));
                    result.set(4, now_checktime);
                    context.write(result);
                
            
        
    

    public static void main(String[] args) throws OdpsException 
        if (args.length != 2) 
            System.err.println("Usage: Pipeline <in_table> <out_table>");
            System.exit(2);
        
        Job job = new Job();
        /**构造Pipeline的过程中,如果不指定Mapper的OutputKeySortColumns、PartitionColumns、OutputGroupingColumns,框架会默认使用其OutputKey作为此三者的默认配置。
         */
        Pipeline pipeline = Pipeline.builder()
                .addMapper(JoinMapper.class)
                .setOutputKeySchema(
                        new Column[] 
                                new Column("idnum", OdpsType.STRING),
                                new Column("checktime", OdpsType.DATETIME)
                        )
                .setOutputValueSchema(
                        new Column[] 
                                new Column("xm", OdpsType.STRING),
                                new Column("city", OdpsType.STRING),
                                new Column("county", OdpsType.STRING),
                                new Column("import_time", OdpsType.STRING),
                                new Column("rule_param_cnt", OdpsType.BIGINT),
                                new Column("rule_param_days", OdpsType.BIGINT)
                        )
                .setOutputKeySortColumns(new String[]  "idnum","checktime" )
                .setPartitionColumns(new String[]  "idnum" )
                .setOutputGroupingColumns(new String[]  "idnum" )
                .addReducer(IniReducer.class)
                .setOutputKeySchema(
                        new Column[] 
                                new Column("idnum", OdpsType.STRING),
                                new Column("checktime", OdpsType.DATETIME),
                                new Column("rn", OdpsType.BIGINT)
                        )
                .setOutputValueSchema(
                        new Column[] 
                                new Column("xm", OdpsType.STRING),
                                new Column("city", OdpsType.STRING),
                                new Column("county", OdpsType.STRING),
                                new Column("import_time", OdpsType.STRING),
                                new Column("rule_param_cnt", OdpsType.BIGINT),
                                new Column("rule_param_days", OdpsType.BIGINT)
                        )
                .setOutputKeySortColumns(new String[]  "idnum","rn","checktime" )
                .setPartitionColumns(new String[]  "idnum" )
                .setOutputGroupingColumns(new String[]  "idnum","rn" )
                .addReducer(JoinReducer.class).createPipeline();
        /**将pipeline的设置到jobconf中,如果需要设置combiner,是通过jobconf来设置。*/
        job.setPipeline(pipeline);
//        job.setCombinerClass();
        /**设置输入输出表。*/
        job.addInput(TableInfo.builder().tableName(args[0]).build());
        job.addOutput(TableInfo.builder().tableName(args[1]).build());
//        job.setNumReduceTasks(5);
//        Date startTime = new Date();
//        System.out.println("Job started: " + startTime);
        /**作业提交并等待结束。*/
        job.submit();
        job.waitForCompletion();
        System.exit(job.isSuccessful() == true ? 0 : 1);
//        Date end_time = new Date();
//        System.out.println("Job ended: " + end_time);
//        System.out.println("The job took " + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
    

最后:

        打成Jar资源上传到DataWorks,新建ODPS MR节点,节点中命令行引用jar资源、指定类路径和主类来执行(或者上调度)就可以了(可参考官方文档)。

PS.本文没有涉及到多路输入输出,若需要则可以使用Label来标记和区分不同的输入输出源(M和R的过程中都可以从上下文context中提取到Label,具体可参考官方文档)。

以上是关于ODPS MapReduce(MR2)用例记录的主要内容,如果未能解决你的问题,请参考以下文章

YARN(含MR2)常用配置

YARN(含MR2)常用配置

HDP3.1 中 YRAN 和 MR2 的内存大小配置的计算方式

mapreduce的resourcemanager,applicationMaster介绍

记录 odps(hive) 的oom

大数据平台Hive数据迁移至阿里云ODPS平台流程与问题记录