Hadoop中的DBOutputFormat

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop中的DBOutputFormat相关的知识,希望对你有一定的参考价值。

一:背景

为了方便MapReduce直接访问关系型数据库(mysql、Oracle等),Hadoop提供了DBInputFormat和DBOutputFormat两个类,通过DBInputFormat类把数据库表中的数据导入到HDFS中,通过DBOutputFormat类把数MapReduce产生的结果导出到数据库表中。

二:技术实现

我们接上一篇文章即通过通过DBInputFormat将数据库表中的数据导入到HDFS中,这里我们讲的是通过DBOutputFormat类将MapReduce产生的结果导出到数据库表中,我们在同一个数据库即myDB中创建user表,如下:

 

[java] view plain copy
 
  1. create table user(id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(32) NOT NULL);  

数据准备:在数据文件上传到HDFS中,数据如下图:

 

         技术分享

我这里使用的hadoop版本为hadoop1.X,具体的代码和相关的知识点我们写在注释里了,代码如下:

 

[java] view plain copy
 
  1. public class MyDBOutputFormat {  
  2.   
  3.     // 定义输出路径  
  4.     private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/user";  
  5.   
  6.     public static void main(String[] args) {  
  7.   
  8.         try {  
  9.             // 创建配置信息  
  10.             Configuration conf = new Configuration();  
  11.   
  12.             /* 
  13.              * //对Map端的输出进行压缩 
  14.              *  conf.setBoolean("mapred.compress.map.output", true);  
  15.              * //设置map端输出使用的压缩类 
  16.              *  conf.setClass("mapred.map.output.compression.codec",GzipCodec.class, CompressionCodec.class);  
  17.              * //对reduce端输出进行压缩  
  18.              * conf.setBoolean("mapred.output.compress", true);  
  19.              * //设置reduce端输出使用的压缩类 
  20.              * conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class); 
  21.              */  
  22.   
  23.             // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)  
  24.             /* 
  25.              * conf.addResource("classpath://hadoop/core-site.xml"); 
  26.              * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
  27.              * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
  28.              */  
  29.   
  30.             // 通过conf创建数据库配置信息  
  31.             DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB", "root", "134045");  
  32.               
  33.             /*// 创建文件系统 
  34.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); 
  35.  
  36.             // 如果输出目录存在就删除 
  37.             if (fileSystem.exists(new Path(OUT_PATH))) { 
  38.                 fileSystem.delete(new Path(OUT_PATH), true); 
  39.             }*/  
  40.   
  41.             // 创建任务  
  42.             Job job = new Job(conf, MyDBOutputFormat.class.getName());  
  43.   
  44.             // 1.1 设置输入数据格式化的类和设置数据来源  
  45.             job.setInputFormatClass(TextInputFormat.class);  
  46.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
  47.             //1.2 设置自定义的Mapper类和Mapper输出的key和value的类型  
  48.             job.setMapperClass(MyDBOutputFormatMapper.class);  
  49.             job.setMapOutputKeyClass(LongWritable.class);  
  50.             job.setMapOutputValueClass(User.class);  
  51.   
  52.             // 1.3 设置分区和reduce数量(reduce的数量和分区的数量对应,因为分区只有一个,所以reduce的个数也设置为一个)  
  53.             job.setPartitionerClass(HashPartitioner.class);  
  54.             job.setNumReduceTasks(1);  
  55.   
  56.             // 1.4 排序、分组  
  57.             // 1.5 归约  
  58.             // 2.1 Shuffle把数据从Map端拷贝到Reduce端  
  59.   
  60.             // 2.2 指定Reducer类和输出key和value的类型  
  61.             job.setReducerClass(MyDBOutputFormatReducer.class);  
  62.   
  63.             // 2.3 设置输出的格式化类和设置将reduce端输出的key值对应user表  
  64.             job.setOutputFormatClass(DBOutputFormat.class);  
  65.             DBOutputFormat.setOutput(job, "user", new String[] { "id", "name" });  
  66.   
  67.             // 提交作业 然后关闭虚拟机正常退出  
  68.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  69.   
  70.         } catch (Exception e) {  
  71.             e.printStackTrace();  
  72.         }  
  73.     }  
  74.   
  75.     /** 
  76.      *  自定义Mapper类 
  77.      * @author 廖钟民 
  78.      * time : 2015年1月15日下午3:37:31 
  79.      * @version 
  80.      */  
  81.     public static class MyDBOutputFormatMapper extends Mapper<LongWritable, Text, LongWritable, User>{  
  82.           
  83.         //创建写出去的value类型  
  84.         private User user = new User();  
  85.           
  86.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, User>.Context context) throws IOException,  
  87.                 InterruptedException {  
  88.             //对value进行切分  
  89.             String[] splits = value.toString().split("\t");  
  90.             //封装user对象  
  91.             user.setId(Integer.parseInt(splits[0]));  
  92.             user.setName(splits[1]);  
  93.             //把user对象作为value写出去  
  94.             context.write(key, user);  
  95.         }  
  96.     }  
  97.       
  98.   
  99.     /** 
  100.      * 关键是写出去的key要为User对象 
  101.      * 写出去的value值无所谓,为NullWritable都可以 
  102.      * @author 廖钟民 
  103.      * time : 2015年1月15日下午3:44:24 
  104.      * @version 
  105.      */  
  106.     public static class MyDBOutputFormatReducer extends Reducer<LongWritable, User, User, Text> {  
  107.           
  108.         protected void reduce(LongWritable key, Iterable<User> values, Reducer<LongWritable, User, User, Text>.Context context) throws IOException,  
  109.                 InterruptedException {  
  110.             for (User user : values){  
  111.                 context.write(user, new Text(new Text(user.getName())));  
  112.             }  
  113.         }  
  114.           
  115.     }  
  116. }  
  117. /** 
  118.  * 自定义实体对象要实现 
  119.  * 因为使用了DBOutputFormat,所以要实现DBWritable接口 
  120.  * @author 廖钟民 
  121.  * time : 2015年1月15日下午3:46:35 
  122.  * @version 
  123.  */  
  124. class User implements Writable, DBWritable {  
  125.   
  126.     private int id;  
  127.     private String name;  
  128.   
  129.     // 无参构造函数  
  130.     public User() {  
  131.     }  
  132.   
  133.     public int getId() {  
  134.         return id;  
  135.     }  
  136.   
  137.     public void setId(int id) {  
  138.         this.id = id;  
  139.     }  
  140.   
  141.     public String getName() {  
  142.         return name;  
  143.     }  
  144.   
  145.     public void setName(String name) {  
  146.         this.name = name;  
  147.     }  
  148.   
  149.     // 实现DBWritable接口要实现的方法  
  150.     public void readFields(ResultSet resultSet) throws SQLException {  
  151.         this.id = resultSet.getInt(1);  
  152.         this.name = resultSet.getString(2);  
  153.     }  
  154.   
  155.     // 实现DBWritable接口要实现的接口  
  156.     public void write(PreparedStatement preparedStatement) throws SQLException {  
  157.         preparedStatement.setInt(1, this.id);  
  158.         preparedStatement.setString(2, this.name);  
  159.     }  
  160.   
  161.     // 实现Writable接口要实现的方法  
  162.     public void readFields(DataInput dataInput) throws IOException {  
  163.         this.id = dataInput.readInt();  
  164.         this.name = Text.readString(dataInput);  
  165.     }  
  166.   
  167.     // 实现Writable接口要实现的接口  
  168.     public void write(DataOutput dataOutput) throws IOException {  
  169.         dataOutput.writeInt(this.id);  
  170.         Text.writeString(dataOutput, this.name);  
  171.     }  
  172.   
  173.     @Override  
  174.     public String toString() {  
  175.         return "User [id=" + id + ", name=" + name + "]";  
  176.     }  
  177.   
  178. }  

程序运行的结果如下:

 

技术分享

以上是关于Hadoop中的DBOutputFormat的主要内容,如果未能解决你的问题,请参考以下文章

在 Hadoop 中使用 DBOutputFormat

Hadoop3 - MapReduce DB 操作

Hadoop3 - MapReduce DB 操作

DBInputFormat和DBOutputFormat——悟空智慧教育

一步一步跟我学习hadoop----hadoop连接mysql数据库运行数据读写数据库操作

使用hadoop读写mysql