mybatis以及mybatisplus批量插入问题

Posted 又 欠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mybatis以及mybatisplus批量插入问题相关的知识,希望对你有一定的参考价值。

1. 思路分析:

批量插入是我们日常开放经常会使用到的场景,一般情况下我们也会有两种方案进行实施,如下所示。

方案一 就是用 for 循环循环插入:

优点:JDBC 中的 PreparedStatement 有预编译功能,预编译之后会缓存起来,后面的 SQL 执行会比较快并且JDBC 可以开启批处理,这个批处理执行非常给力。

缺点:很多时候我们的 SQL 服务器和应用服务器可能并不是同一台,所以必须要考虑网络 IO,如果网络 IO 比较费时间的话,那么可能会拖慢
SQL 执行的速度。

再来说第二种方案,就是生成一条 SQL 插入:

优势:这种方案的优势在于只有一次网络 IO,即使分片处理也只是数次网络 IO,所以这种方案不会在网络 IO 上花费太多时间。

缺点一是 SQL 太长了,甚至可能需要分片后批量处理;

缺点二是无法充分发挥 PreparedStatement 预编译的优势,SQL 要重新解析且无法复用;三是最终生成的 SQL
太长了,数据库管理器解析这么长的 SQL 也需要时间。


2. rewriteBatchedStatements=true

在jdbc连接后面加上 rewriteBatchedStatements=true ,加上后才是真正的批量插入。

 jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true

3.使用mybatis批量插入:

方案一:使用foreach进行插入(生成一条 SQL 插入)
mapper文件

   <insert id="save" parameterType="java.util.List">
        INSERT INTO test
        (
        id,
        a,
        b,
        c
        )
        VALUES
        <foreach collection="list" item="item" index="index" separator=",">
            (
            #item.id,
            #item.a,
            #item.b,
            #item.c
            )
        </foreach>
    </insert>

调用方法

 @Override
    public void add() 
        //时间 一
        long l = System.currentTimeMillis();
        List<TestEntity> list=new ArrayList<>();
        for (int i=0;i<1000;i++)
            TestEntity testEntity=new TestEntity();
            testEntity.setC(i);
            list.add(testEntity);
        
       testMapper.save(list);
        //时间 二
        long l1 = System.currentTimeMillis();
        System.out.println("耗时"+(l1-l));
    

插入了1000条数据,耗时535毫秒。
插入了50000条数据,直接报错。
报错原因是因为我们一条SQL进行插入导致SQL太长
解决办法:
1.修改MySQL配置
2.对新增数据进行分片


方案二:一条条插入

mapper

   <insert id="addUserOneByOne" parameterType="com.ruoyi.system.domain.TestEntity">
    insert into test (id,a,b,c) values (#id,#a,#b,#c)
    </insert>

测试代码

@Service
public class TestServiceimpl extends ServiceImpl<TestMapper, TestEntity> implements TestService 

    @Autowired
  private   TestMapper testMapper;

    @Autowired
  private SqlSessionFactory sqlSessionFactory;
    
    public void addUserOneByOne(List<TestEntity> users) 
        SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
        TestMapper um = session.getMapper(TestMapper.class);
        long startTime = System.currentTimeMillis();
        for (TestEntity user : users) 
            um.addUserOneByOne(user);
        
        session.commit();
        long endTime = System.currentTimeMillis();
        System.out.println("耗时"+(endTime - startTime));
    

插入了1000条数据,耗时959毫秒。
插入50000条数据,耗时11214毫秒。


对比分析:
如果我们批量插入少部分数据,可以使用方式一,一条SQL进行插入。这样是比较快的。
如果我们插入数据达到,1w条,10来万条,这时建议用方式二进行插入是比较快的。


4. 使用mybatisplus批量插入

使用saveBatch()方法进行批量插入

@Service
public class TestServiceimpl extends ServiceImpl<TestMapper, TestEntity> implements TestService 
    
    @Autowired
  private   TestMapper testMapper;

    @Autowired
  private SqlSessionFactory sqlSessionFactory;

    @Override
    public void add() 
        //时间 一
        long l = System.currentTimeMillis();
        List<TestEntity> list=new ArrayList<>();
        for (int i=0;i<50000;i++)
            TestEntity testEntity=new TestEntity();
            testEntity.setC(i);
            list.add(testEntity);
        
        saveBatch(list);
        //时间 二
        long l1 = System.currentTimeMillis();
        System.out.println("耗时"+(l1-l));
    

插入50000条数据,耗时19516毫秒

源码分析

   public boolean saveBatch(Collection<T> entityList, int batchSize) 
        String sqlStatement = this.getSqlStatement(SqlMethod.INSERT_ONE);
        return this.executeBatch(entityList, batchSize, (sqlSession, entity) -> 
            sqlSession.insert(sqlStatement, entity);
        );
    

这里注意 return 中的第三个参数,是一个 lambda 表达式,这也是 MP 中批量插入的核心逻辑,可以看到,MP 先对数据进行分片(默认分片大小是 1000),分片完成之后,也是一条一条的插入。

 public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) 
        Assert.isFalse(batchSize < 1, "batchSize must not be less than one", new Object[0]);
        return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, (sqlSession) -> 
            int size = list.size();
            int i = 1;

            for(Iterator var6 = list.iterator(); var6.hasNext(); ++i) 
                E element = var6.next();
                consumer.accept(sqlSession, element);
                if (i % batchSize == 0 || i == size) 
                    sqlSession.flushStatements();
                
            

        );
    

继续查看 executeBatch 方法,就会发现这里的 sqlSession 其实也是一个批处理的 sqlSession,并非普通的 sqlSession。和我们mybatis使用的方法二一致。


5业务场景一对多怎么处理:

比如,如下这种一对多场景。
新增的时候保存都好理解,形成一个数组一起保存。
而修改的时候就有点难处理了,比如我修改了第二条,删除了第三条,这时统一保存应该怎么处理?

使用 ON DUPLICATE KEY UPDATE (发生主键冲突就更新,没有发生主键冲突就新增)

有时候由于业务需求,可能需要先去根据某一字段值查询数据库中是否有记录,有则更新,没有则插入。这个时候就可以用到ON DUPLICATE key update这个sql语句了

mapper如下所示

   <insert id="save" parameterType="java.util.List">
        INSERT INTO test
        (
        id,
        a,
        b,
        c
        )
        VALUES
        <foreach collection="list" item="item" index="index" separator=",">
            (
            #item.id,
            #item.a,
            #item.b,
            #item.c
            )
        </foreach>
        ON DUPLICATE KEY UPDATE
        id=id,
        a = VALUES(a) ,
        b = VALUES(b),
        c = VALUES(c)
    </insert>

或者在使用mybatisplus时,使用saveOrUpdate()方法进行一条数据的新增或更新。 saveOrUpdateBatch()方法进行批量数据的新增或更新。


梅西“消除厄运”卡

新冠,退退退!
梅老板,冲冲冲!
三星阿根廷,加油!

mybatis批量插入数据性能测试

目录

for循环测试批量插入和更新数据,myabtisBatch模式,mybatis中直接使用foreach sql拼接插入数据。
性能测试

1、准备

使用的是mybatis-plus以及postgrep数据库来测试的
依赖:

       <!--postgresql-->
      <dependency>
         <groupId>org.postgresql</groupId>
         <artifactId>postgresql</artifactId>
         <version>42.2.14</version>
      </dependency>
      <!--mybatis-plus-->
      <dependency>
         <groupId>com.baomidou</groupId>
         <artifactId>mybatis-plus-boot-starter</artifactId>
         <version>3.4.1</version>
      </dependency>
<!--      lombok-->
      <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <optional>true</optional>
      </dependency>
      <!--guava-->
      <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>30.1-jre</version>
      </dependency>

配置:

server.port=8888
spring.datasource.username=postgres
spring.datasource.password=123456
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://ip:port/xxx?sslmode=disable
logging.level.com.example.OnDuplicateKeyDemo.dao=debug
mybatis-plus.mapper-locations=classpath:mapper/*.xml

实体类:

@Data
@TableName("test_onduplicatekey")
public class OnduplicateKey 
    @TableId(type= IdType.AUTO)
    private Long id;
    private String name;
    private String email;
    private String address;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;

dao

public interface OnduplicateKeyMapper extends BaseMapper<OnduplicateKey> 
       @Insert("<script>" +
               "        insert into $tableName  (id,name,email,address,create_time,update_time)\\n" +
               "        values\\n" +
               "        <foreach collection=\\"onduplicateKeyList\\" item=\\"item\\" separator=\\",\\">\\n" +
               "            (\\n" +
               "            <if test=\\"item.id==null\\">\\n" +
               "                nextval('$tableName_id_seq')\\n" +
               "            </if>\\n" +
               "            <if test=\\"item.id!=null\\">\\n" +
               "                #item.id\\n" +
               "            </if>\\n" +
               "            ,#item.name,#item.email,#item.address,#item.createTime,#item.updateTime)\\n" +
               "        </foreach>\\n" +
               "        ON conflict(id) do\\n" +
               "        update set\\n" +
               "            name=EXCLUDED.name,\\n" +
               "            email=EXCLUDED.email,\\n" +
               "            address=EXCLUDED.address,\\n" +
               "            create_time=EXCLUDED.create_time,\\n" +
               "            update_time=EXCLUDED.update_time" +
               "</script>")
       int saveOrUpdateOnDuplicateKey( @Param("onduplicateKeyList") List<OnduplicateKey> onduplicateKeyList,@Param("tableName") String tableName);

映射文件:不想使用注解,也可以使用如下映射文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.OnDuplicateKeyDemo.dao.OnduplicateKeyMapper">

直接使用tableName
<!--    <insert  id="saveOrUpdateOnDuplicateKey">-->
<!--        insert into $tableName  (id,name,email,address,create_time,update_time)-->
<!--        values-->
<!--        <foreach collection="onduplicateKeyList" item="item" separator=",">-->
<!--            (-->
<!--            <if test="item.id==null">-->
<!--                nextval('$tableName_id_seq')-->
<!--            </if>-->
<!--            <if test="item.id!=null">-->
<!--                #item.id-->
<!--            </if>-->
<!--            ,#item.name,#item.email,#item.address,#item.createTime,#item.updateTime)-->
<!--        </foreach>-->
<!--        ON conflict(id) do-->
<!--        update set-->
<!--            name=EXCLUDED.name,-->
<!--            email=EXCLUDED.email,-->
<!--            address=EXCLUDED.address,-->
<!--            create_time=EXCLUDED.create_time,-->
<!--            update_time=EXCLUDED.update_time-->
<!--    </insert>-->

使用tableInfo
<!--    <insert  id="saveOrUpdateOnDuplicateKey">-->
<!--        insert into $tableInfo.tableName  (id,-->
<!--        <foreach collection="tableInfo.fieldList" item="item" separator=",">-->
<!--            $item.column-->
<!--        </foreach>-->
<!--        )-->
<!--        values-->
<!--        <foreach collection="onduplicateKeyList" item="item" separator=",">-->
<!--            (-->
<!--            <if test="item.id==null">-->
<!--                nextval('$tableInfo.tableName_id_seq')-->
<!--            </if>-->
<!--            <if test="item.id!=null">-->
<!--                #item.id-->
<!--            </if>-->
<!--            ,-->
<!--            <foreach collection="tableInfo.fieldList" item="item2" separator=",">-->
<!--                #item.$item2.property-->
<!--            </foreach>-->
<!--            )-->
<!--        </foreach>-->
<!--        ON conflict(id) do-->
<!--        update set-->
<!--        <foreach collection="tableInfo.fieldList" item="item" separator=",">-->
<!--            $item.column=EXCLUDED.$item.column-->
<!--        </foreach>-->
<!--    </insert>-->
</mapper>

service以及实现类:

public interface OnduplicateKeyService extends IService<OnduplicateKey> 

@Service
public class OnduplicateKeyServiceImpl extends ServiceImpl<OnduplicateKeyMapper, OnduplicateKey> implements OnduplicateKeyService 

启动类:

@MapperScan("com.example.OnDuplicateKeyDemo.dao.**")

2、普通for循环批量插入数据测试

/**
 * @Description:
 *            for循环测试批量更新或新增,虽然循环多次,但是这些insert都处于同一个事务中,因此,使用的连接都是同一条
 *            但是每次insert,数据库都要解析sql,然后操作表。
 *            编译次数=设置参数次数=执行次数
 *            1千条数据耗时:86865 83086 81232
 *
 *            循环插入:需要每次都获取session,获取连接,然后将sql 语句发给mysql 去执行
 * 						(JDBC一般情况下是通过TCP/IP 进行连接和数据库进行通信的)。
 *
 * @DateTime: 2021/3/1 14:29
 * @Params:
 * @Return
 * @Author: liuzhihui
 */
@Test
void saveOrUpdateWithFor() 
   //获取数据
   List<OnduplicateKey> onduplicateKeys = prepareData();

   long startTime = System.currentTimeMillis();
   System.out.println("开始时间:"+startTime);
   for (OnduplicateKey onduplicateKey : onduplicateKeys) 
      //判断数据是否存在
      OnduplicateKey onduplicateKey1 = onduplicateKeyMapper.selectById(onduplicateKey.getId());

      //插入
      if (onduplicateKey1==null)
         int insert = onduplicateKeyMapper.insert(onduplicateKey);
      else //更新
         int i = onduplicateKeyMapper.updateById(onduplicateKey);
      
   
   System.out.println("耗时:"+(System.currentTimeMillis()-startTime));

/**
 * 数据准备
 * @return
 */
private List<OnduplicateKey> prepareData()
   List<OnduplicateKey> data=new ArrayList<>();
   for (int i = 1; i <= 1000; i++) 
      OnduplicateKey onduplicateKey=new OnduplicateKey();
      if (i<10) 
         onduplicateKey.setId((long)i);
      
      onduplicateKey.setName("lisi_"+i);
      onduplicateKey.setEmail(i+"@qq.com");
      onduplicateKey.setAddress("北京"+i);
      onduplicateKey.setCreateTime(LocalDateTime.now());
      onduplicateKey.setUpdateTime(LocalDateTime.now());
      data.add(onduplicateKey);
   
   return data;

3、使用MyBatis提供的BATCH模式

/**
 * @Description:
 *              当使用MyBatis时,有两种批量插入的方式,一种是动态SQL中foreach拼接字符串,一种是使用MyBatis提供的BATCH模式。
 *              两种方式的区别是啥?
 *           1、batch模式: 重复使用已经预处理的语句,并且批量执行所有更新语句.
 *                     预编译sql(1次)==》设置参数=500次==》执行commit(1次)
 *                     预编译:PreparedStatement ps = conn.prepareStatement(sql);
 * 							ps对象包含语句 insert into t_customer values(?,?,?,?),它已发送给DBMS,并为执行作好了准备。
 *                     原理:其实底层就是使用jdbc的批量插入操作。
 *                          1、预编译PreparedStatement ps = conn.prepareStatement(sql)
 *                          2、for循环,使用ps设置n条数据,每设置一条就ps.addBatch();
 *                            当积攒的数据量到了指定的值时,就执行一次ps.executeBatch并清空缓存
 *                     非batch模式,就是不积攒,有一条数据就,编译一次,然后设置参数,执行一次
 *          1千条数据耗时: 1222  1241  1253
 *          1万条数据耗时: 19221  15559 13910 14664
 *
 *           2、mybatis中直接使用foreach插入数据,就相当于将所有的sql预先拼接到一起,然后一起提交,但是数据量过大,会引起,栈内存溢出了,mysql对语句的长度有限制,默认是 4M
 *              动态sql,通过foreach标签进行了sql拼接出一个大sql,交由数据库执行,只需一次调用。
 *           批量插入通过foreach 标签,将多条数据拼接在sql 语句后,一次执行只获取一次session,提交一条sql语句。减少了程序和数据库交互的准备时间。
 *
 *
 * @DateTime: 2021/3/3 13:30
 * @Params:`
 * @Return
 * @Author: liuzhihui
 */
@Test
void saveOrUpdateBatch() 
   //获取数据
   List<OnduplicateKey> onduplicateKeys = prepareData();

   long startTime = System.currentTimeMillis();
   System.out.println("开始时间:"+startTime);

   boolean b = onduplicateKeyService.saveOrUpdateBatch(onduplicateKeys, 1000);

   System.out.println("耗时:"+b+" "+(System.currentTimeMillis()-startTime));

4、mybatis中直接使用foreach插入数据

说是说OnduplicateKey模式,在mysql中才有OnduplicateKey,在postgrep中,是ON conflict,有什么用呢,就是数据的批处理中,有些数据可能是新增,有些数据可能是更新,这时ON conflict,就能用上了。
在这里使用ON conflict,就是新增的时候,使用foreachinsert语句中,拼接多个value,来提高插入数据。
最后的测试结果,也是相当不错的。

/**
 * @Description: OnduplicateKey模式: 应该具备回滚的功能,中间某条数据插入出现问题,要回滚
 * @DateTime: 2021/3/4 10:38
 * @Params:  1千条数据耗时:1393 1395 1365
 * @Return   1万条数据耗时:4358 4056 4322
 *           1万条多线程批处理数据耗时: 908 1238 861
 * @Author: liuzhihui
 */
@Transactional
@Test
void saveOrUpdateOnDuplicateKey()
   List<OnduplicateKey> onduplicateKeys = prepareData();

   TableInfo tableInfo = TableInfoHelper.getTableInfo(OnduplicateKey.class);

   //拆分集合的阈值
   int count =1000;

   long startTime = System.currentTimeMillis();
   System.out.println("开始时间:"+startTime);
   //集合按照count,均等拆分为多个集合
   List<List<OnduplicateKey>> collect = Lists.partition(onduplicateKeys, count);
   //多线程处理
   collect.stream().parallel().forEach(onduplicateKeyList->
      onduplicateKeyMapper.saveOrUpdateOnDuplicateKey(onduplicateKeyList,tableInfo.getTableName());
   );
   System.out.println("耗时:"+(System.currentTimeMillis()-startTime));

5、mybatis如何开启batch模式

没有使用mybatis-plus,怎么开启batch模式呢?

一般来说,对于SSM项目工程来说,mybatisExectoryType默认是simple,那么又如何能动态使用batch模式呢?
直接上源码实现干货:

import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.function.BiConsumer;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
/**
 * 启动mybatis的Batch模式的批量新增、更新
 *
 * @author sunchangtan
 */
@Component
public class MybatisBatchExecutor 
    /**
     * 拼写成SQL的最大数据量
     * 比如: 如果insert,把batchCount条数的数据拼写成一个大SQL
     * 如果update,把batchCount条数的数据拼写成case when方式的大SQL
     */
    private static final int batchCountToSQL = 100;

    /**
     * 每多少次时开始commit
     */
    private static final int batchCountToSubmit = 100;

    @Resource
    private SqlSessionTemplate sqlSessionTemplate;


    /**
     * 批量更新、新增
     * @param dbList
     * @param mapperClass
     * @param func
     * @param <T>
     * @param <M>
     */
    public <T, M> void insertOrUpdateBatch(List<T> dbList, Class<M> mapperClass, BiConsumer<M, List<T>> func) 
        int batchLastIndex = batchCountToSQL;
        int batchLastIndexToSubmit = 0;
        int total = dbList.size();

        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
        M modelMapper = sqlSession.getMapper(mapperClass);
        try 
            if (total > batchCountToSQL) 
                for (int index = 0; index < total; ) 
                    if (batchLastIndex > total) 
                        List<T> list = dbList.subList(index, total);

                        func.accept(modelMapper, list);

                        sqlSession.flushStatements();
                        sqlSession.commit();
                        break;
                     else 
                        List<T> list = dbList.subList(index, batchLastIndex);
                        func.accept(modelMapper, list);

                        if (batchLastIndexToSubmit++ >= batchCountToSubmit) 
                            //如果可以批量提交,则提交
                            sqlSession.flushStatements();
                            sqlSession.commit();
                            batchLastIndexToSubmit = 0;
                        
                        index = batchLastIndex;// 设置下一批下标
                        batchLastIndex = index + (batchCountToSQL - 1);
                    
                
             else 
                func.accept(modelMapper, dbList);
                sqlSession.commit();
            
         finally 
            closeSqlSession(sqlSession, sqlSessionFactory);
        
    

使用例子:

//批量插入
 mybatisBatchExecutor.insertOrUpdateBatch(list, BatchTestMapper.class, (mapper, data)-> 
            mapper.insertList(data);
        );

//批量更新
mybatisBatchExecutor.insertOrUpdateBatch(list, BatchTestMapper.class, (mapper, data)-> 
            mapper.updateBatchByPrimaryKeySelective(data);
        );

以上是关于mybatis以及mybatisplus批量插入问题的主要内容,如果未能解决你的问题,请参考以下文章

mybatis和mybatisPlus中解决实体类字段与数据库关键字冲突问题

mybatis和mybatisPlus中解决实体类字段与数据库关键字冲突问题

mybatis-plus 自定义basemapper支持批量增删改操作

mybatis-plus 自定义basemapper支持批量增删改操作

Java--MyBatis批量插入批量更新和批量删除

求大神解决,mybatis插入数据报id为空