关于mybatis批处理那点事

Posted 张子行的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了关于mybatis批处理那点事相关的知识,希望对你有一定的参考价值。

前言

最近在写爬虫的时候,需要定时的将数据爬取然后导入到数据库中(数据量有点大哦),我最开始的写法是这样的,每爬取到一条数据就立即将数据入库,IO了好多次,这样在无形之中给数据库施压了,唉我这个猪队友…不是还有一个叫做批处理的东西存在嘛!!!于是我用批处理的技术优化了一下代码,顺便研究了一波批处理流程的源码,这也是我写下本文的原因嘻嘻,下文将会按照如下流程来介绍批处理(包含了一丢丢源码分析)。SQL层面->JDBC批处理->mybatis批处理->mybatisPlus批处理。读者可以根据自己的意愿自行读取相应部分

SQL层面来实现批处理

把要插入的数据一条条的进行SQL拼接,拼接完成后,执行一次最终的那条SQL就可以将批量的数据导入到数据库中,如果我们不嫌麻烦在开发我们的项目的时候,利用SQL拼接的思想,亦可以轻松的实现批处理,其实 mybatis 也是用的这个思想

insert into user(name,age) values ("zzh",18) ,("zzh",21)......

JDBC的批处理技术

如果用JDBC来实现批处理的话,大致的几个步骤如下。下面的代码演示了,向数据库中插入俩条name=zzh,age=21的数据

        Class.forName(driver);
        Connection conn = DriverManager.getConnection(url, username, password);
        conn.setAutoCommit(false);
        String insert = "insert into user(name,age) values(?,?)";
        PreparedStatement ps = conn.prepareStatement(insert);
        ps.setObject(1, "zzh");
        ps.setObject(2, "21");
        //每一次的addBatch操作就相当于SQL拼接
        ps.addBatch();
        ps.addBatch();
        //执行批处理
        ps.executeBatch();
        conn.commit();

mybatis的批处理技术(重头戏)

为了向读者叙述的更加全面,打算以 Executor 接口为起点开始叙述,直到让读者体会到 Mapper 是如何实现批处理的。其实我们在调用各种 Mapper 接口进行CRUD 的时候,底层真正干活却是 Executor 接口。如果读者对 mybatis 的源码感兴趣,可以阅读 深入mybatis源码解读~手把手带你debug分析源码 本文不在过多叙述 Executor 的作用。

说了这么多,mybatis是如何执行批处理的呢?简单的一个例子带大家来体验一下,下面我把如下配置注释掉,不使用通过配置文件来配置数据源的方式,而是采用最原始的手动注入的方式来写测试

测试代码

    private Configuration configuration;
    private JdbcTransaction jdbcTransaction;
    private Connection connection;
    private Reader resourceAsReader;
    private SqlSessionFactory sqlSessionFactory;

    @SneakyThrows
    public static void main(String[] args) 
        mybatisBatch mybatisBatch = new mybatisBatch();
        mybatisBatch.init();
        mybatisBatch.batchInsert();
    

    @SneakyThrows
    public void init() 
        connection = DriverManager.getConnection(
                "url",
                "userName",
                "password");
        resourceAsReader = Resources.getResourceAsReader("mybatis.xml");
        jdbcTransaction = new JdbcTransaction(connection);
        sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsReader);
        configuration = sqlSessionFactory.getConfiguration();
    


   @SneakyThrows
    public void batchInsert() 
        BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
        Zhengshangsuo alibaba = new Zhengshangsuo().setContract("alibaba").setContractId("alibaba").setType("alibaba");
        Zhengshangsuo nio = new Zhengshangsuo().setContract("nio").setContractId("nio").setType("nio");
        //指定方法
        MappedStatement insertAlibaba = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertAlibaba");
        MappedStatement insertNio = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertNio");
        //关闭事务自动提交
        this.connection.setAutoCommit(false);
        //添加到批处理
        batchExecutor.doUpdate(insertAlibaba, alibaba);
        batchExecutor.doUpdate(insertAlibaba, alibaba);
        batchExecutor.doUpdate(insertNio, nio);
        batchExecutor.doUpdate(insertNio, nio);
        //执行批处理逻辑,并且返回执行的结果
        List<BatchResult> batchResults = batchExecutor.doFlushStatements(false);
        //提交当前事务
        this.connection.commit();
    

运行上面的代码,可以看的到插入四条数据,实际上对数据库只有俩次io。这是为什么呢?

其实不管是 Mybatis 也好MybatisPlus 也罢,当我们去调用 下图的这些 api 方法的时候,debug 追踪源码最终都是 各种执行器在起作用。

debug追踪一波源码

不管我们进行多少次 doUpdate()操作,debug发现永远都只会占用数据库的一个连接,其原因就在于
每次传入的 MappedStatement 都是相同的、 Mapper 上的 sql 也是一样的。

这行代码体现了批处理生效的条件,会将 statement、sql 相同的这些操作归类成同一批做批处理

sql.equals(this.currentSql) && ms.equals(this.currentStatement)

这行代码体现了,Sql参数拼接的过程,对此感兴趣的读者可以一路debug下去,就会来到下图的这个地方。我之前看mybatis源码的时候好像看过这一段…就不做详细说明了!

handler.parameterize(stmt);

小结 doUpdate()

所谓的批处理我个人一句话总结概括就是:单个 MappedStatement 的996,开创了一个部门的辉煌帝国。批处理:在一次 connection 中,反复对同一个 MappedStatement 进行sql拼接,并最终运行。从而达到一次处理批量执行的目的。

批处理的运行时机

还记得测试代码中的这行代码吗?debug进去就完事了。

batchExecutor.doFlushStatements(false);

源码很简单,关注下图标注的地方就ok了,遍历 statementList 中所有的 statement 然后执行就完事了。可以看到 mybatis 还是很贴心的,居然把执行过程中的一些数据返回给了我们。这些数据也就是批处理执行的返回结果

顺嘴提一波:所有的 statement 这几个字有俩层含义

  1. 经过参数拼接的 statement(批处理)
  2. 普通的 statement (处理单条数据)

批处理更新、查询生效吗?

上文已经研究了一下批处理插入的原理,虽然有多条数据都进行入库,但是可以做到对数据库只进行一次io的效果。反问一波批量更新数据是否也能做到只对数据库进行一次io操作呢?答案是可以的。那再反问批处理查询也可以做到只io一次数据库吗?答案是不可以的。原因也很简单批处理执行器(batchExcutor)的批处理功能只针对 doUpdate 操作生效,而 doUpdate 包括了插入、更新

分析一波Mybatis-Plus的批处理

这里记得配置一下 application.yml中的数据源,我这里是在boot工程测试类中跑的测试、默认从配置文件中进行读取数据源

@Autowired
private ZhengshangsuoServiceImpl zhengshangsuoService;
@Test
public void mybatisPlusBatch() 
    Zhengshangsuo data = new Zhengshangsuo().setContract("1").setContractId("1").setType("1");
    ArrayList<Zhengshangsuo> list = new ArrayList<>();
    list.add(data);
    list.add(data);
    zhengshangsuoService.saveBatch(list, 2);

debug一波源码,发现还是我们上文分析过的那些逻辑,兜兜转转又回到了 batchExecutor 中的doUpdate 方法。debug过程中会涉及到一点 aop的源码、spring 事务的源码。可以去看下这俩篇文章

  1. 全面解读spring注解事务失效场景,伪代码+图文深度解析spring源码运行过程
  2. spring 源码解析(配图文讲解)顺带搞懂了循环依赖、aop底层实现

附页

本文测试代码demo如下哦

package com.zzh.reptile;

import com.zzh.reptile.entity.Zhengshangsuo;
import lombok.SneakyThrows;
import org.apache.ibatis.executor.BatchExecutor;
import org.apache.ibatis.executor.BatchResult;
import org.apache.ibatis.executor.SimpleExecutor;
import org.apache.ibatis.io.Resources;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.RowBounds;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.apache.ibatis.transaction.jdbc.JdbcTransaction;

import java.io.Reader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.List;

public class mybatisBatch 
    private Configuration configuration;
    private JdbcTransaction jdbcTransaction;
    private Connection connection;
    private Reader resourceAsReader;
    private SqlSessionFactory sqlSessionFactory;

    @SneakyThrows
    public static void main(String[] args) 
        mybatisBatch mybatisBatch = new mybatisBatch();
        mybatisBatch.init();
        mybatisBatch.batchInsert();
    

    @SneakyThrows
    public void init() 
        connection = DriverManager.getConnection(
                "jdbc:mysql://127.0.0.1:666/aaa?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT&useSSL=false",
                "aaa",
                "aaa");
        resourceAsReader = Resources.getResourceAsReader("mybatis.xml");
        jdbcTransaction = new JdbcTransaction(connection);
        sqlSessionFactory = new SqlSessionFactoryBuilder().build(resourceAsReader);
        configuration = sqlSessionFactory.getConfiguration();
    


    @SneakyThrows
    public void batchInsert() 
        BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
        Zhengshangsuo alibaba = new Zhengshangsuo().setContract("alibaba").setContractId("alibaba").setType("alibaba");
        Zhengshangsuo nio = new Zhengshangsuo().setContract("nio").setContractId("nio").setType("nio");
        //指定方法
        MappedStatement insertAlibaba = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertAlibaba");
        MappedStatement insertNio = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.insertNio");
        //关闭事务自动提交
        this.connection.setAutoCommit(false);
        //添加到批处理
        batchExecutor.doUpdate(insertAlibaba, alibaba);
        batchExecutor.doUpdate(insertAlibaba, alibaba);
        batchExecutor.doUpdate(insertNio, nio);
        batchExecutor.doUpdate(insertNio, nio);
        //执行批处理逻辑,并且返回执行的结果
        List<BatchResult> batchResults = batchExecutor.doFlushStatements(false);
        //提交当前事务
        this.connection.commit();
    

    @SneakyThrows
    public void batchUpdate() 
        BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
        Zhengshangsuo data1 = new Zhengshangsuo().setContract("nio").setId(18513);
        Zhengshangsuo data2 = new Zhengshangsuo().setContract("pdd").setId(18512);
        MappedStatement update = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.updatezzh");
        this.connection.setAutoCommit(false);
        batchExecutor.doUpdate(update, data1);
        batchExecutor.doUpdate(update, data2);
        batchExecutor.doFlushStatements(false);
        this.connection.commit();
    

    public void batchQuery() throws Exception 
        HashMap<String, Object> queryMap = new HashMap<>();
        queryMap.put("contract", "nio");
        queryMap.put("id", 18513);
        BatchExecutor batchExecutor = new BatchExecutor(this.configuration, jdbcTransaction);
        MappedStatement query = this.configuration.getMappedStatement("com.zzh.reptile.mapper.ZhengshangsuoMapper.queryzzh");
        this.connection.setAutoCommit(false);
        List<Object> res = batchExecutor.doQuery(query, queryMap, RowBounds.DEFAULT, SimpleExecutor.NO_RESULT_HANDLER, query.getBoundSql(1));
        List<Object> ress = batchExecutor.doQuery(query, queryMap, RowBounds.DEFAULT, SimpleExecutor.NO_RESULT_HANDLER, query.getBoundSql(1));
        batchExecutor.doFlushStatements(false);
        this.connection.commit();
        System.err.println(res.toString());
        System.err.println(ress);
    

public interface ZhengshangsuoMapper extends BaseMapper<Zhengshangsuo> 
    @Insert("insert into zhengshangsuo (contract,contractId,type) values(#contract,#contractId,#type)")
    public int insertAlibaba(Zhengshangsuo zhengshangsuo);

    @Insert("insert into zhengshangsuo (contract,contractId,type) values(#contract,#contractId,#type)")
    public int insertNio(Zhengshangsuo zhengshangsuo);

    @Insert("insert into zhengshangsuo (contract,contractId,type) values(#contract,#contractId,#type)")
    public int insertzzh(Zhengshangsuo zhengshangsuo);

    @Update("update zhengshangsuo set contract = #contract where id = #id")
    public int updatezzh(Map<String, Object> map);

    @Select("select * from zhengshangsuo where contract = #contract and id = #id")
    public List<Zhengshangsuo> queryzzh(Map<String, Object> map);

@Autowired
private ZhengshangsuoServiceImpl zhengshangsuoService;
@Test
public void mybatisPlusBatch() 
    Zhengshangsuo data = new Zhengshangsuo().setContract("1").setContractId("1").setType("1");
    ArrayList<Zhengshangsuo> list = new ArrayList<>();
    list.add(data);
    list.add(data);
    zhengshangsuoService.saveBatch(list, 2);

以上是关于关于mybatis批处理那点事的主要内容,如果未能解决你的问题,请参考以下文章

关于mybatis批处理那点事

技术干货|关于logback日志压缩的那点事

Web基础知识——HTTP协议那点事

关于Json的那点事

聊聊 Kafka Consumer 那点事

Python#规范# 关于日志的那点事