大数据框架之Hadoop:MapReduceMapReduce框架原理——数据清洗(ETL)
Posted yiluohan0307
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据框架之Hadoop:MapReduceMapReduce框架原理——数据清洗(ETL)相关的知识,希望对你有一定的参考价值。
在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
3.9.1数据清洗案例实操-简单解析版
1、需求
去除日志中字段长度小于等于11的日志。
(1)输入数据
(2)期望输出数据
每行字段长度都大于11。
2、需求分析
需要在Map阶段对输入的数据根据规则进行过滤清洗。
3、实现代码
(1)编写LogMapper类
package com.cuiyf41.etl;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException
// 1 获取1行数据
String line = value.toString();
// 2 解析日志
boolean result = parseLog(line,context);
// 3 日志不合法退出
if (!result)
return;
// 4 设置key
k.set(line);
// 5 写出数据
context.write(k, NullWritable.get());
// 2 解析日志
private boolean parseLog(String line, Context context)
// 1 截取
String[] fields = line.split(" ");
// 2 日志长度大于11的为合法
if (fields.length > 11)
// 系统计数器
context.getCounter("map", "true").increment(1);
return true;
else
context.getCounter("map", "false").increment(1);
return false;
(2)编写LogDriver类
package com.cuiyf41.etl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] "e:/input/inputlog", "e:/output1" ;
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载jar包
job.setJarByClass(LogDriver.class);
// 3 关联map
job.setMapperClass(LogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 设置reducetask个数为0
job.setNumReduceTasks(0);
// 5 设置输入和输出路径
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 如果输出路径存在,则进行删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output))
fs.delete(output,true);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
// 6 提交
job.waitForCompletion(true);
3.9.2数据清洗案例实操-复杂解析版
1、需求
对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。
(1)输入数据
(2)期望输出数据
都是合法的数据
2、实现代码
(1)定义一个bean,用来记录日志数据中的各数据字段
package com.cuiyf41.etlu;
public class LogBean
private String remote_addr;// 记录客户端的ip地址
private String remote_user;// 记录客户端用户名称,忽略属性"-"
private String time_local;// 记录访问时间与时区
private String request;// 记录请求的url与http协议
private String status;// 记录请求状态;成功是200
private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
private String http_referer;// 用来记录从那个页面链接访问过来的
private String http_user_agent;// 记录客户浏览器的相关信息
private boolean valid = true;// 判断数据是否合法
public String getRemote_addr()
return remote_addr;
public void setRemote_addr(String remote_addr)
this.remote_addr = remote_addr;
public String getRemote_user()
return remote_user;
public void setRemote_user(String remote_user)
this.remote_user = remote_user;
public String getTime_local()
return time_local;
public void setTime_local(String time_local)
this.time_local = time_local;
public String getRequest()
return request;
public void setRequest(String request)
this.request = request;
public String getStatus()
return status;
public void setStatus(String status)
this.status = status;
public String getBody_bytes_sent()
return body_bytes_sent;
public void setBody_bytes_sent(String body_bytes_sent)
this.body_bytes_sent = body_bytes_sent;
public String getHttp_referer()
return http_referer;
public void setHttp_referer(String http_referer)
this.http_referer = http_referer;
public String getHttp_user_agent()
return http_user_agent;
public void setHttp_user_agent(String http_user_agent)
this.http_user_agent = http_user_agent;
public boolean isValid()
return valid;
public void setValid(boolean valid)
this.valid = valid;
@Override
public String toString()
StringBuilder sb = new StringBuilder();
sb.append(this.valid);
sb.append("\\001").append(this.remote_addr);
sb.append("\\001").append(this.remote_user);
sb.append("\\001").append(this.time_local);
sb.append("\\001").append(this.request);
sb.append("\\001").append(this.status);
sb.append("\\001").append(this.body_bytes_sent);
sb.append("\\001").append(this.http_referer);
sb.append("\\001").append(this.http_user_agent);
return sb.toString();
(2)编写LogMapper类
package com.cuiyf41.etlu;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>
Text k = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException
// 1 获取1行
String line = value.toString();
// 2 解析日志是否合法
LogBean bean = parseLog(line);
if (!bean.isValid())
return;
k.set(bean.toString());
// 3 输出
context.write(k, NullWritable.get());
// 解析日志
private LogBean parseLog(String line)
LogBean logBean = new LogBean();
// 1 截取
String[] fields = line.split(" ");
if (fields.length > 11)
// 2封装数据
logBean.setRemote_addr(fields[0]);
logBean.setRemote_user(fields[1]);
logBean.setTime_local(fields[3].substring(1));
logBean.setRequest(fields[6]);
logBean.setStatus(fields[8]);
logBean.setBody_bytes_sent(fields[9]);
logBean.setHttp_referer(fields[10]);
if (fields.length > 12)
logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
else
logBean.setHttp_user_agent(fields[11]);
// 大于400,HTTP错误
if (Integer.parseInt(logBean.getStatus()) >= 400)
logBean.setValid(false);
else
logBean.setValid(false);
return logBean;
(3)编写LogDriver类
package com.cuiyf41.etlu;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 加载jar包
job.setJarByClass(LogDriver.class);
// 3 关联map
job.setMapperClass(LogMapper.class);
// 4 设置最终输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 5 设置输入和输出路径
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// 如果输出路径存在,则进行删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output))
fs.delete(output,true);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
// 6 提交
job.waitForCompletion(true);
以上是关于大数据框架之Hadoop:MapReduceMapReduce框架原理——数据清洗(ETL)的主要内容,如果未能解决你的问题,请参考以下文章
大数据技术之Hadoop(MapReduce)框架原理数据压缩
大数据技术之Hadoop(MapReduce)框架原理数据压缩