Hadoop中的DBOutputFormat
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop中的DBOutputFormat相关的知识,希望对你有一定的参考价值。
一:背景
为了方便MapReduce直接访问关系型数据库(mysql、Oracle等),Hadoop提供了DBInputFormat和DBOutputFormat两个类,通过DBInputFormat类把数据库表中的数据导入到HDFS中,通过DBOutputFormat类把数MapReduce产生的结果导出到数据库表中。
二:技术实现
我们接上一篇文章即通过通过DBInputFormat将数据库表中的数据导入到HDFS中,这里我们讲的是通过DBOutputFormat类将MapReduce产生的结果导出到数据库表中,我们在同一个数据库即myDB中创建user表,如下:
- create table user(id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(32) NOT NULL);
数据准备:在数据文件上传到HDFS中,数据如下图:
我这里使用的hadoop版本为hadoop1.X,具体的代码和相关的知识点我们写在注释里了,代码如下:
- public class MyDBOutputFormat {
- // 定义输出路径
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/user";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- /*
- * //对Map端的输出进行压缩
- * conf.setBoolean("mapred.compress.map.output", true);
- * //设置map端输出使用的压缩类
- * conf.setClass("mapred.map.output.compression.codec",GzipCodec.class, CompressionCodec.class);
- * //对reduce端输出进行压缩
- * conf.setBoolean("mapred.output.compress", true);
- * //设置reduce端输出使用的压缩类
- * conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
- */
- // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
- /*
- * conf.addResource("classpath://hadoop/core-site.xml");
- * conf.addResource("classpath://hadoop/hdfs-site.xml");
- * conf.addResource("classpath://hadoop/hdfs-site.xml");
- */
- // 通过conf创建数据库配置信息
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB", "root", "134045");
- /*// 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在就删除
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }*/
- // 创建任务
- Job job = new Job(conf, MyDBOutputFormat.class.getName());
- // 1.1 设置输入数据格式化的类和设置数据来源
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- //1.2 设置自定义的Mapper类和Mapper输出的key和value的类型
- job.setMapperClass(MyDBOutputFormatMapper.class);
- job.setMapOutputKeyClass(LongWritable.class);
- job.setMapOutputValueClass(User.class);
- // 1.3 设置分区和reduce数量(reduce的数量和分区的数量对应,因为分区只有一个,所以reduce的个数也设置为一个)
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
- // 1.4 排序、分组
- // 1.5 归约
- // 2.1 Shuffle把数据从Map端拷贝到Reduce端
- // 2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(MyDBOutputFormatReducer.class);
- // 2.3 设置输出的格式化类和设置将reduce端输出的key值对应user表
- job.setOutputFormatClass(DBOutputFormat.class);
- DBOutputFormat.setOutput(job, "user", new String[] { "id", "name" });
- // 提交作业 然后关闭虚拟机正常退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 自定义Mapper类
- * @author 廖钟民
- * time : 2015年1月15日下午3:37:31
- * @version
- */
- public static class MyDBOutputFormatMapper extends Mapper<LongWritable, Text, LongWritable, User>{
- //创建写出去的value类型
- private User user = new User();
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, User>.Context context) throws IOException,
- InterruptedException {
- //对value进行切分
- String[] splits = value.toString().split("\t");
- //封装user对象
- user.setId(Integer.parseInt(splits[0]));
- user.setName(splits[1]);
- //把user对象作为value写出去
- context.write(key, user);
- }
- }
- /**
- * 关键是写出去的key要为User对象
- * 写出去的value值无所谓,为NullWritable都可以
- * @author 廖钟民
- * time : 2015年1月15日下午3:44:24
- * @version
- */
- public static class MyDBOutputFormatReducer extends Reducer<LongWritable, User, User, Text> {
- protected void reduce(LongWritable key, Iterable<User> values, Reducer<LongWritable, User, User, Text>.Context context) throws IOException,
- InterruptedException {
- for (User user : values){
- context.write(user, new Text(new Text(user.getName())));
- }
- }
- }
- }
- /**
- * 自定义实体对象要实现
- * 因为使用了DBOutputFormat,所以要实现DBWritable接口
- * @author 廖钟民
- * time : 2015年1月15日下午3:46:35
- * @version
- */
- class User implements Writable, DBWritable {
- private int id;
- private String name;
- // 无参构造函数
- public User() {
- }
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- // 实现DBWritable接口要实现的方法
- public void readFields(ResultSet resultSet) throws SQLException {
- this.id = resultSet.getInt(1);
- this.name = resultSet.getString(2);
- }
- // 实现DBWritable接口要实现的接口
- public void write(PreparedStatement preparedStatement) throws SQLException {
- preparedStatement.setInt(1, this.id);
- preparedStatement.setString(2, this.name);
- }
- // 实现Writable接口要实现的方法
- public void readFields(DataInput dataInput) throws IOException {
- this.id = dataInput.readInt();
- this.name = Text.readString(dataInput);
- }
- // 实现Writable接口要实现的接口
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeInt(this.id);
- Text.writeString(dataOutput, this.name);
- }
- @Override
- public String toString() {
- return "User [id=" + id + ", name=" + name + "]";
- }
- }
程序运行的结果如下:
以上是关于Hadoop中的DBOutputFormat的主要内容,如果未能解决你的问题,请参考以下文章
DBInputFormat和DBOutputFormat——悟空智慧教育