Hadoop3 - MapReduce DB 操作

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce DB 操作相关的知识,希望对你有一定的参考价值。

一、MapReduce DB 操作

对于本专栏的前面几篇文章的操作,基本都是读取本地或 HDFS 中的文件,如果有的数据是存在 DB 中的我们要怎么处理呢?

Hadoop 为我们提供了 DBInputFormatDBOutputFormat 两个类。顾名思义 DBInputFormat 负责从数据库中读取数据,DBOutputFormat负责把数据最终写入数据库中。

不过如果要把数据库内容映射成对象实体,还需要该实体实现 DBWritable 接口,其中 readFields 方法用来指定获取数据库中的字段,write 方法用于指定写入数据库字段。

下面还是使用本专栏上几篇文章所使用的COVID-19 案例进行试验,首先将文本类型的数据集导入 mysql 数据库中,然后读取表信息作为数据集分析每个州的 casesdeaths 的总数,并将计算结果写入 Mysql

COVID-19 案例地址:

https://blog.csdn.net/qq_43692950/article/details/127475811

二、文本类型的数据集导入 Mysql 数据库

首先在 Mysql 中创建表:

CREATE TABLE `covid_input` (
  `id` int NOT NULL AUTO_INCREMENT,
  `date` datetime DEFAULT NULL,
  `county` varchar(255) DEFAULT NULL,
  `state` varchar(255) DEFAULT NULL,
  `fips` varchar(255) DEFAULT NULL,
  `cases` int DEFAULT NULL,
  `deaths` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

创建对象实体类,实现 WritableComparableDBWritable

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable<CountEntity>, DBWritable 

    private String date; //日期
    private String county; // 县
    private String state; // 州
    private String fips; // 县编码code
    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException 
        out.writeUTF(date);
        out.writeUTF(county);
        out.writeUTF(state);
        out.writeUTF(fips);
        out.writeLong(cases);
        out.writeLong(deaths);
    

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException 
        this.date = in.readUTF();
        this.county = in.readUTF();
        this.state = in.readUTF();
        this.fips = in.readUTF();
        this.cases = in.readLong();
        this.deaths = in.readLong();
    

    /**
     * 指定写入 DB 中的字段,parameterIndex对应 DBOutputFormat.setOutput 中指定的 fieldNames
     */
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException 
        preparedStatement.setString(1, date);
        preparedStatement.setString(2, county);
        preparedStatement.setString(3, state);
        preparedStatement.setString(4, fips);
        preparedStatement.setLong(5, cases);
        preparedStatement.setLong(6, deaths);
    

    /**
     *  从数据库读取字段信息,由于是读取的文本文件写入 Mysql,没有读取 DB
     */
    @Override
    public void readFields(ResultSet resultSet) throws SQLException 

    

    //排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
    @Override
    public int compareTo(CountEntity o) 
        int i = this.state.compareTo(o.getState());
        if (i == 0) 
            return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
        
        return i;
    


编写 Mapper 类,由于这里无需聚合分组操作,可以不做 Reduces ,直接去 Mapper 的输出结果到 Mysql 即可,因此这里 key 输出实体对象,ValueNull 占位

@Slf4j
public class DBMapper extends Mapper<LongWritable, Text, CountEntity, NullWritable> 

    CountEntity outValue = new CountEntity();
    NullWritable outKey = NullWritable.get();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String[] fields = value.toString().split(",");
        if (fields.length >= 6)
            outValue.setDate(fields[0]);
            outValue.setCounty(fields[1]);
            outValue.setState(fields[2]);
            outValue.setFips(fields[3]);
            outValue.setCases(Long.parseLong(fields[4]));
            outValue.setDeaths(Long.parseLong(fields[5]));
            context.write(outValue, outKey);
        
    

最后编写驱动类,声明输出表及字段:

public class DBDriver extends Configured implements Tool 

    public static void main(String[] args) throws Exception
        Configuration conf = new Configuration();
        //配置当前作业需要使用的JDBC信息
        DBConfiguration.configureDB(
                conf,
                "com.mysql.cj.jdbc.Driver",
                "jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT",
                "root",
                "root"
        );
        int status = ToolRunner.run(conf, new DBDriver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(DBDriver.class);
        // 设置作业mapper reducer类
        job.setMapperClass(DBMapper.class);
        // 设置作业mapper阶段输出key value数据类型
        job.setMapOutputKeyClass(CountEntity.class);
        job.setMapOutputValueClass(NullWritable.class);

        //这里无需Recuces
        job.setNumReduceTasks(0);

        // 配置作业的输入数据路径
        FileInputFormat.addInputPath(job, new Path("D:/test/input"));
        // 配置作业的输出
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(
                job,
                "covid_input",
                "date", "county", "state", "fips", "cases","deaths");

        return job.waitForCompletion(true)? 0:1;
    

执行驱动类:


执行成功后,到 Mysql 查看结果:

已经写入成功,下面基于该表统计每个 state 州 的 casesdeaths 总数。

三、计算各个州的累积cases、deaths

现在和上面不同的是,输入和输出都是DB,首先创建结果输出表:

CREATE TABLE `covid_output` (
  `id` int NOT NULL AUTO_INCREMENT,
  `state` varchar(255) DEFAULT NULL,
  `cases` int DEFAULT NULL,
  `deaths` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

修改 CountEntity 实体,指定读取和输出 DB 字段:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class CountEntity implements WritableComparable<CountEntity>, DBWritable 

    private String date; //日期
    private String county; // 县
    private String state; // 州
    private String fips; // 县编码code
    private Long cases;//确诊病例数
    private Long deaths;//死亡病例数

    /**
     * 序列化方法
     */
    @Override
    public void write(DataOutput out) throws IOException 
        out.writeUTF(date);
        out.writeUTF(county);
        out.writeUTF(state);
        out.writeUTF(fips);
        out.writeLong(cases);
        out.writeLong(deaths);
    

    /**
     * 反序列化方法 注意顺序
     */
    @Override
    public void readFields(DataInput in) throws IOException 
        this.date = in.readUTF();
        this.county = in.readUTF();
        this.state = in.readUTF();
        this.fips = in.readUTF();
        this.cases = in.readLong();
        this.deaths = in.readLong();
    

    /**
     * 由于输出covid_output表字段
     */
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException 
        preparedStatement.setString(1, state);
        preparedStatement.setLong(2, cases);
        preparedStatement.setLong(3, deaths);
    

    /**
     * 读取covid_input 表中的字段
     */
    @Override
    public void readFields(ResultSet resultSet) throws SQLException 
        this.date = resultSet.getString("date");
        this.county = resultSet.getString("county");
        this.state = resultSet.getString("state");
        this.fips = resultSet.getString("fips");
        this.cases = resultSet.getLong("cases");
        this.deaths = resultSet.getLong("deaths");
    

    //排序规则,根据首先根据 state 正序排列,然后根据 deaths 倒序排序
    @Override
    public int compareTo(CountEntity o) 
        int i = this.state.compareTo(o.getState());
        if (i == 0) 
            return this.deaths.equals(o.getDeaths()) ? 0 : (this.deaths - o.getDeaths() > 0 ? -1 : 1);
        
        return i;
    


编写 Mapper 类,将 state 最为 keyvalue 行数据输出至 Reduces 中

public class DBMapper extends Mapper<LongWritable, CountEntity, Text, CountEntity> 

    Text outKey = new Text();

    @Override
    protected void map(LongWritable key, CountEntity value, Context context) throws IOException, InterruptedException 
        outKey.set(value.getState());
        context.write(outKey, value);
    

Reduces 中对 casesdeaths 进行求和,key 即为输出的结果:

public class DBReducer extends Reducer<Text, CountEntity, CountEntity, NullWritable> 

    CountEntity outKey = new CountEntity();
    NullWritable outValue = NullWritable.get();

    @Override
    protected void reduce(Text key, Iterable<CountEntity> values, Context context) throws IOException, InterruptedException 
        long totalCases = 0;
        long totalDeaths = 0;
        for (CountEntity value : values) 
            totalCases += value.getCases();
            totalDeaths += value.getDeaths();
        
        outKey.setState(key.toString());
        outKey.setCases(totalCases);
        outKey.setDeaths(totalDeaths);
        context.write(outKey, outValue);
    

最后编写驱动类,指定输入输出方式:

public class DBDriver extends Configured implements Tool 
    
    public static void main(String[] args) throws Exception 
        //配置文件对象
        Configuration conf = new Configuration();
        //配置当前作业需要使用的JDBC信息
        DBConfiguration.configureDB(
                conf,
                "com.mysql.cj.jdbc.Driver",
                "jdbc:mysql://127.0.0.1:3306/mapreduces?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT",
                "root",
                "root"
        );

        conf.set("mapreduce.framework.name", "local");
        int status = ToolRunner.run(conf, new DBDriver(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 创建作业实例
        Job job = Job.getInstance(getConf(), DBDriver.class.getSimpleName());
        // 设置作业驱动类
        job.setJarByClass(DBDriver.class);

        // 设置作业mapper reducer类
        job.setMapperClass(DBMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CountEntity.class);

        job.setReducerClass(DBReducer.class);
        job.setOutputKeyClass(CountEntity.class);
        job.setOutputValueClass(NullWritable.class);

        job.setInputFormatClass以上是关于Hadoop3 - MapReduce DB 操作的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce Join 关联注意点

Hadoop3 - MapReduce Join 关联注意点

Hadoop3 - MapReduce ORC 列式存储

Hadoop3 - MapReduce ORC 列式存储

Hadoop3 - MapReduce 数据压缩

Hadoop3 - MapReduce 数据压缩