MapReduce实战

Posted 陈驰字新宇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce实战相关的知识,希望对你有一定的参考价值。

需求:

利用MapReduce程序,实现SQL语句中的join关联查询。

订单数据表order:

id date pid amount
1001 20150710 P0001 2
1002 20150710 P0001 3
1002 20150710 P0002 3
1003 20150710 P0003 4

 

商品信息表product:

pid pname category_id price
P0001 小米6 1000 2499
P0002 锤子T3 1001 2500
P0003 三星S8 1002 6999

假如数据量巨大,两表的数据是以文件的形式存储在HDFS中,需要用mapreduce程序来实现一下SQL查询运算:

select  a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

分析:

通过将关联的条件作为map输出的key,将两表满足join条件的数据并携带数据所来源的文件信息,发往同一个reduce task,在reduce中进行数据的串联。

实现:

首先,我们将表中的数据转换成我们需要的格式:

order.txt:

1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
1003,20150710,P0003,4

product.txt:

P0001,小米6,1000,2499
P0002,锤子T3,1001,2500
P0003,三星S8,1002,6999

并且导入到HDFS的/join/srcdata目录下面。

因为我们有两种格式的文件,所以在map阶段需要根据文件名进行一下判断,不同的文案进行不同的处理。同理,在reduce阶段我们也要针对同一key(pid)的不同种类数据进行判断,是通过判断id是否为空字符串进行判断的。

InfoBean.java:

package com.darrenchan.mr.bean;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * id date pid amount pname category_id price
 * 
 * @author chenchi
 *
 */
public class InfoBean implements Writable {
    private String id;// 订单id
    private String date;
    private String pid;// 产品id
    private String amount;
    private String pname;
    private String category_id;
    private String price;

    public InfoBean() {

    }

    public InfoBean(String id, String date, String pid, String amount, String pname, String category_id, String price) {
        super();
        this.id = id;
        this.date = date;
        this.pid = pid;
        this.amount = amount;
        this.pname = pname;
        this.category_id = category_id;
        this.price = price;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getPid() {
        return pid;
    }

    public void setPid(String pid) {
        this.pid = pid;
    }

    public String getAmount() {
        return amount;
    }

    public void setAmount(String amount) {
        this.amount = amount;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public String getCategory_id() {
        return category_id;
    }

    public void setCategory_id(String category_id) {
        this.category_id = category_id;
    }

    public String getPrice() {
        return price;
    }

    public void setPrice(String price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "InfoBean [id=" + id + ", date=" + date + ", pid=" + pid + ", amount=" + amount + ", pname=" + pname
                + ", category_id=" + category_id + ", price=" + price + "]";
    }

    /**
     * id date pid amount pname category_id price
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readUTF();
        date = in.readUTF();
        pid = in.readUTF();
        amount = in.readUTF();
        pname = in.readUTF();
        category_id = in.readUTF();
        price = in.readUTF();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(id);
        out.writeUTF(date);
        out.writeUTF(pid);
        out.writeUTF(amount);
        out.writeUTF(pname);
        out.writeUTF(category_id);
        out.writeUTF(price);
    }

}

Join.java:

package com.darrenchan.mr.join;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.darrenchan.mr.bean.InfoBean;

public class Join {
    /**
     * Mapper类
     * @author chenchi
     *
     */
    public static class JoinMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
        //提前在这里new一个对象,剩下的就是改变它的值,不至于在map方法中创建出大量的InfoBean对象
        InfoBean infoBean = new InfoBean();
        Text text = new Text();//理由同上
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //首先,要判断文件名称,读的是订单数据还是商品数据
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            String name = inputSplit.getPath().getName();//文件名称
            if(name.startsWith("order")){//来自订单数据
                String line = value.toString();
                String[] fields = line.split(",");
                String id = fields[0];
                String date = fields[1];
                String pid = fields[2];
                String amount = fields[3];
                
                infoBean.setId(id);
                infoBean.setDate(date);
                infoBean.setPid(pid);
                infoBean.setAmount(amount);
                //对于订单数据来说,后面三个属性都置为""
                //之所以不置为null,是因为其要进行序列化和反序列化
                infoBean.setPname("");
                infoBean.setCategory_id("");
                infoBean.setPrice("");
                
                text.set(pid);
                context.write(text, infoBean);
            }else{//来自商品数据
                String line = value.toString();
                String[] fields = line.split(",");
                String pid = fields[0];
                String pname = fields[1];
                String category_id = fields[2];
                String price = fields[3];
                
                infoBean.setPname(pname);
                infoBean.setCategory_id(category_id);
                infoBean.setPrice(price);
                infoBean.setPid(pid);
                //对于订单数据来说,后面三个属性都置为""
                //之所以不置为null,是因为其要进行序列化和反序列化
                infoBean.setId("");
                infoBean.setDate("");
                infoBean.setAmount("");
                
                text.set(pid);
                context.write(text, infoBean);
            }
        }
    }
    
    
    public static class JoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable>{
        //订单数据中一个pid会有多条数据
        //商品数据中一个pid只有一条
        
        @Override
        protected void reduce(Text key, Iterable<InfoBean> values, Context context) throws IOException, InterruptedException {
            List<InfoBean> list = new ArrayList<InfoBean>();//存储订单数据中的多条
            InfoBean info = new InfoBean();//存储商品数据中的一条
            for (InfoBean infoBean : values) {
                if(!"".equals(infoBean.getId())){//来自订单数据
                    InfoBean infoBean2 = new InfoBean();
                    try {
                        BeanUtils.copyProperties(infoBean2, infoBean);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    list.add(infoBean2);
                }else{//来自商品数据
                    try {
                        BeanUtils.copyProperties(info, infoBean);
                    } catch (IllegalAccessException | InvocationTargetException e) {
                        e.printStackTrace();
                    }
                }
            }
            for (InfoBean infoBean : list) {
                infoBean.setPname(info.getPname());
                infoBean.setCategory_id(info.getCategory_id());
                infoBean.setPrice(info.getPrice());
                
                context.write(infoBean, NullWritable.get());
            }
        }
    }
    
    
    
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(Join.class);
        
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(InfoBean.class);
        
        job.setOutputKeyClass(InfoBean.class);
        job.setOutputValueClass(NullWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
    
}

注:这里有一个地方需要注意,就是reduce方法的Iterable<InfoBean> values,一定要new 新对象,不能直接赋值,因为迭代器的内容在不断变化。

执行指令:hadoop jar mywc.jar cn.darrenchan.hadoop.mr.wordcount.WCRunner /wc/src /wc/output

 

运行效果:

 

但是呢?这种方式是有缺陷的,什么缺陷呢?

这种方式中,join的操作是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。什么叫数据倾斜呢?比如在中国买小米6的人特别多,三星S8的人特别少,汇总的时候,当汇总小米6的pid的时候就运算压力特别大,而S8的pid的时候运算压力就特别小,显然负载不均衡。

那么我们应该用什么方法进行解决呢?就是map端join实现方式了。

我们将业务操作移到了map端,reduce甚至可以不用了,因为商品表一般内容不多,所以我们可以提前加载到内存中,运行map方法的时候直接查找即可,利用了MapReduce的分布式缓存。

代码如下:

package com.darrenchan.mr.mapedjoin;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.darrenchan.mr.bean.InfoBean;

public class MapedJoin {

    public static class MapedJoinMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable> {

        // 用一个map来存储商品信息表
        private Map<String, String> map = new HashMap<>();
        //提前在这里new一个对象,剩下的就是改变它的值,不至于在map方法中创建出大量的InfoBean对象
        InfoBean infoBean = new InfoBean();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // 因为已经加载到本地目录了,所以可以本地读取
            FileInputStream inputStream = new FileInputStream(new File("product.txt"));
            InputStreamReader isr = new InputStreamReader(inputStream);
            BufferedReader br = new BufferedReader(isr);
            String line = null;

            while ((line = br.readLine()) != null) {
                String[] fields = line.split(",");
                map.put(fields[0], line);
            }

            br.close();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 判断文件类型,就不用读取商品数据了
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            String name = inputSplit.getPath().getName();
            if (name.startsWith("order")) {
                String line = value.toString();
                String[] fields = line.split(",");
                String id = fields[0];
                String date = fields[1];
                String pid = fields[2];
                String amount = fields[3];
                
                infoBean.setId(id);
                infoBean.setDate(date);
                infoBean.setPid(pid);
                infoBean.setAmount(amount);
                
                String product = map.get(pid);
                String[] splits = product.split(",");
                String pname = splits[1];
                String category_id = splits[2];
                String price = splits[3];
                
                infoBean.setPname(pname);
                infoBean.setCategory_id(category_id);
                infoBean.setPrice(price);
                
                context.write(infoBean, NullWritable.get());
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MapedJoin.class);

        job.setMapperClass(MapedJoinMapper.class);

        job.setMapOutputKeyClass(InfoBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        // map端join的逻辑不需要reduce阶段,设置reducetask数量为0
        // 因为即便不写reduce,它也默认启动一个reduce
        job.setNumReduceTasks(0);

        // 指定需要缓存一个文件到所有的maptask运行节点工作目录
        /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
        /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
        /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
        /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录

        // 将产品表文件缓存到task工作节点的工作目录中去
        // 就可以直接本地读取了
        job.addCacheFile(new URI("/join/srcdata/product.txt"));

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

结果同上。

 

以上是关于MapReduce实战的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop实战-MapReduce之maxminavg统计

大数据MapReduce 编程实战

solr分布式索引实战分片配置读取:工具类configUtil.java,读取配置代码片段,配置实例

干货分享:编写MapReduce程序清洗电商评论数据

MapReduce实战

MapReduce实战