mybatis批量插入数据性能测试

Posted weixin_42412601

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mybatis批量插入数据性能测试相关的知识,希望对你有一定的参考价值。

目录

for循环测试批量插入和更新数据,myabtisBatch模式,mybatis中直接使用foreach sql拼接插入数据。
性能测试

1、准备

使用的是mybatis-plus以及postgrep数据库来测试的
依赖:

       <!--postgresql-->
      <dependency>
         <groupId>org.postgresql</groupId>
         <artifactId>postgresql</artifactId>
         <version>42.2.14</version>
      </dependency>
      <!--mybatis-plus-->
      <dependency>
         <groupId>com.baomidou</groupId>
         <artifactId>mybatis-plus-boot-starter</artifactId>
         <version>3.4.1</version>
      </dependency>
<!--      lombok-->
      <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <optional>true</optional>
      </dependency>
      <!--guava-->
      <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
         <version>30.1-jre</version>
      </dependency>

配置:

server.port=8888
spring.datasource.username=postgres
spring.datasource.password=123456
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://ip:port/xxx?sslmode=disable
logging.level.com.example.OnDuplicateKeyDemo.dao=debug
mybatis-plus.mapper-locations=classpath:mapper/*.xml

实体类:

@Data
@TableName("test_onduplicatekey")
public class OnduplicateKey 
    @TableId(type= IdType.AUTO)
    private Long id;
    private String name;
    private String email;
    private String address;
    private LocalDateTime createTime;
    private LocalDateTime updateTime;

dao

public interface OnduplicateKeyMapper extends BaseMapper<OnduplicateKey> 
       @Insert("<script>" +
               "        insert into $tableName  (id,name,email,address,create_time,update_time)\\n" +
               "        values\\n" +
               "        <foreach collection=\\"onduplicateKeyList\\" item=\\"item\\" separator=\\",\\">\\n" +
               "            (\\n" +
               "            <if test=\\"item.id==null\\">\\n" +
               "                nextval('$tableName_id_seq')\\n" +
               "            </if>\\n" +
               "            <if test=\\"item.id!=null\\">\\n" +
               "                #item.id\\n" +
               "            </if>\\n" +
               "            ,#item.name,#item.email,#item.address,#item.createTime,#item.updateTime)\\n" +
               "        </foreach>\\n" +
               "        ON conflict(id) do\\n" +
               "        update set\\n" +
               "            name=EXCLUDED.name,\\n" +
               "            email=EXCLUDED.email,\\n" +
               "            address=EXCLUDED.address,\\n" +
               "            create_time=EXCLUDED.create_time,\\n" +
               "            update_time=EXCLUDED.update_time" +
               "</script>")
       int saveOrUpdateOnDuplicateKey( @Param("onduplicateKeyList") List<OnduplicateKey> onduplicateKeyList,@Param("tableName") String tableName);

映射文件:不想使用注解,也可以使用如下映射文件

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.OnDuplicateKeyDemo.dao.OnduplicateKeyMapper">

直接使用tableName
<!--    <insert  id="saveOrUpdateOnDuplicateKey">-->
<!--        insert into $tableName  (id,name,email,address,create_time,update_time)-->
<!--        values-->
<!--        <foreach collection="onduplicateKeyList" item="item" separator=",">-->
<!--            (-->
<!--            <if test="item.id==null">-->
<!--                nextval('$tableName_id_seq')-->
<!--            </if>-->
<!--            <if test="item.id!=null">-->
<!--                #item.id-->
<!--            </if>-->
<!--            ,#item.name,#item.email,#item.address,#item.createTime,#item.updateTime)-->
<!--        </foreach>-->
<!--        ON conflict(id) do-->
<!--        update set-->
<!--            name=EXCLUDED.name,-->
<!--            email=EXCLUDED.email,-->
<!--            address=EXCLUDED.address,-->
<!--            create_time=EXCLUDED.create_time,-->
<!--            update_time=EXCLUDED.update_time-->
<!--    </insert>-->

使用tableInfo
<!--    <insert  id="saveOrUpdateOnDuplicateKey">-->
<!--        insert into $tableInfo.tableName  (id,-->
<!--        <foreach collection="tableInfo.fieldList" item="item" separator=",">-->
<!--            $item.column-->
<!--        </foreach>-->
<!--        )-->
<!--        values-->
<!--        <foreach collection="onduplicateKeyList" item="item" separator=",">-->
<!--            (-->
<!--            <if test="item.id==null">-->
<!--                nextval('$tableInfo.tableName_id_seq')-->
<!--            </if>-->
<!--            <if test="item.id!=null">-->
<!--                #item.id-->
<!--            </if>-->
<!--            ,-->
<!--            <foreach collection="tableInfo.fieldList" item="item2" separator=",">-->
<!--                #item.$item2.property-->
<!--            </foreach>-->
<!--            )-->
<!--        </foreach>-->
<!--        ON conflict(id) do-->
<!--        update set-->
<!--        <foreach collection="tableInfo.fieldList" item="item" separator=",">-->
<!--            $item.column=EXCLUDED.$item.column-->
<!--        </foreach>-->
<!--    </insert>-->
</mapper>

service以及实现类:

public interface OnduplicateKeyService extends IService<OnduplicateKey> 

@Service
public class OnduplicateKeyServiceImpl extends ServiceImpl<OnduplicateKeyMapper, OnduplicateKey> implements OnduplicateKeyService 

启动类:

@MapperScan("com.example.OnDuplicateKeyDemo.dao.**")

2、普通for循环批量插入数据测试

/**
 * @Description:
 *            for循环测试批量更新或新增,虽然循环多次,但是这些insert都处于同一个事务中,因此,使用的连接都是同一条
 *            但是每次insert,数据库都要解析sql,然后操作表。
 *            编译次数=设置参数次数=执行次数
 *            1千条数据耗时:86865 83086 81232
 *
 *            循环插入:需要每次都获取session,获取连接,然后将sql 语句发给mysql 去执行
 * 						(JDBC一般情况下是通过TCP/IP 进行连接和数据库进行通信的)。
 *
 * @DateTime: 2021/3/1 14:29
 * @Params:
 * @Return
 * @Author: liuzhihui
 */
@Test
void saveOrUpdateWithFor() 
   //获取数据
   List<OnduplicateKey> onduplicateKeys = prepareData();

   long startTime = System.currentTimeMillis();
   System.out.println("开始时间:"+startTime);
   for (OnduplicateKey onduplicateKey : onduplicateKeys) 
      //判断数据是否存在
      OnduplicateKey onduplicateKey1 = onduplicateKeyMapper.selectById(onduplicateKey.getId());

      //插入
      if (onduplicateKey1==null)
         int insert = onduplicateKeyMapper.insert(onduplicateKey);
      else //更新
         int i = onduplicateKeyMapper.updateById(onduplicateKey);
      
   
   System.out.println("耗时:"+(System.currentTimeMillis()-startTime));

/**
 * 数据准备
 * @return
 */
private List<OnduplicateKey> prepareData()
   List<OnduplicateKey> data=new ArrayList<>();
   for (int i = 1; i <= 1000; i++) 
      OnduplicateKey onduplicateKey=new OnduplicateKey();
      if (i<10) 
         onduplicateKey.setId((long)i);
      
      onduplicateKey.setName("lisi_"+i);
      onduplicateKey.setEmail(i+"@qq.com");
      onduplicateKey.setAddress("北京"+i);
      onduplicateKey.setCreateTime(LocalDateTime.now());
      onduplicateKey.setUpdateTime(LocalDateTime.now());
      data.add(onduplicateKey);
   
   return data;

3、使用MyBatis提供的BATCH模式

/**
 * @Description:
 *              当使用MyBatis时,有两种批量插入的方式,一种是动态SQL中foreach拼接字符串,一种是使用MyBatis提供的BATCH模式。
 *              两种方式的区别是啥?
 *           1、batch模式: 重复使用已经预处理的语句,并且批量执行所有更新语句.
 *                     预编译sql(1次)==》设置参数=500次==》执行commit(1次)
 *                     预编译:PreparedStatement ps = conn.prepareStatement(sql);
 * 							ps对象包含语句 insert into t_customer values(?,?,?,?),它已发送给DBMS,并为执行作好了准备。
 *                     原理:其实底层就是使用jdbc的批量插入操作。
 *                          1、预编译PreparedStatement ps = conn.prepareStatement(sql)
 *                          2、for循环,使用ps设置n条数据,每设置一条就ps.addBatch();
 *                            当积攒的数据量到了指定的值时,就执行一次ps.executeBatch并清空缓存
 *                     非batch模式,就是不积攒,有一条数据就,编译一次,然后设置参数,执行一次
 *          1千条数据耗时: 1222  1241  1253
 *          1万条数据耗时: 19221  15559 13910 14664
 *
 *           2、mybatis中直接使用foreach插入数据,就相当于将所有的sql预先拼接到一起,然后一起提交,但是数据量过大,会引起,栈内存溢出了,mysql对语句的长度有限制,默认是 4M
 *              动态sql,通过foreach标签进行了sql拼接出一个大sql,交由数据库执行,只需一次调用。
 *           批量插入通过foreach 标签,将多条数据拼接在sql 语句后,一次执行只获取一次session,提交一条sql语句。减少了程序和数据库交互的准备时间。
 *
 *
 * @DateTime: 2021/3/3 13:30
 * @Params:`
 * @Return
 * @Author: liuzhihui
 */
@Test
void saveOrUpdateBatch() 
   //获取数据
   List<OnduplicateKey> onduplicateKeys = prepareData();

   long startTime = System.currentTimeMillis();
   System.out.println("开始时间:"+startTime);

   boolean b = onduplicateKeyService.saveOrUpdateBatch(onduplicateKeys, 1000);

   System.out.println("耗时:"+b+" "+(System.currentTimeMillis()-startTime));

4、mybatis中直接使用foreach插入数据

说是说OnduplicateKey模式,在mysql中才有OnduplicateKey,在postgrep中,是ON conflict,有什么用呢,就是数据的批处理中,有些数据可能是新增,有些数据可能是更新,这时ON conflict,就能用上了。
在这里使用ON conflict,就是新增的时候,使用foreachinsert语句中,拼接多个value,来提高插入数据。
最后的测试结果,也是相当不错的。

/**
 * @Description: OnduplicateKey模式: 应该具备回滚的功能,中间某条数据插入出现问题,要回滚
 * @DateTime: 2021/3/4 10:38
 * @Params:  1千条数据耗时:1393 1395 1365
 * @Return   1万条数据耗时:4358 4056 4322
 *           1万条多线程批处理数据耗时: 908 1238 861
 * @Author: liuzhihui
 */
@Transactional
@Test
void saveOrUpdateOnDuplicateKey()
   List<OnduplicateKey> onduplicateKeys = prepareData();

   TableInfo tableInfo = TableInfoHelper.getTableInfo(OnduplicateKey.class);

   //拆分集合的阈值
   int count =1000;

   long startTime = System.currentTimeMillis();
   System.out.println("开始时间:"+startTime);
   //集合按照count,均等拆分为多个集合
   List<List<OnduplicateKey>> collect = Lists.partition(onduplicateKeys, count);
   //多线程处理
   collect.stream().parallel().forEach(onduplicateKeyList->
      onduplicateKeyMapper.saveOrUpdateOnDuplicateKey(onduplicateKeyList,tableInfo.getTableName());
   );
   System.out.println("耗时:"+(System.currentTimeMillis()-startTime));

5、mybatis如何开启batch模式

没有使用mybatis-plus,怎么开启batch模式呢?

一般来说,对于SSM项目工程来说,mybatisExectoryType默认是simple,那么又如何能动态使用batch模式呢?
直接上源码实现干货:

import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.function.BiConsumer;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
/**
 * 启动mybatis的Batch模式的批量新增、更新
 *
 * @author sunchangtan
 */
@Component
public class MybatisBatchExecutor 
    /**
     * 拼写成SQL的最大数据量
     * 比如: 如果insert,把batchCount条数的数据拼写成一个大SQL
     * 如果update,把batchCount条数的数据拼写成case when方式的大SQL
     */
    private static final int batchCountToSQL = 100;

    /**
     * 每多少次时开始commit
     */
    private static final int batchCountToSubmit = 100;

    @Resource
    private SqlSessionTemplate sqlSessionTemplate;


    /**
     * 批量更新、新增
     * @param dbList
     * @param mapperClass
     * @param func
     * @param <T>
     * @param <M>
     */
    public <T, M> void insertOrUpdateBatch(List<T> dbList, Class<M> mapperClass, BiConsumer<M, List<T>> func) 
        int batchLastIndex = batchCountToSQL;
        int batchLastIndexToSubmit = 0;
        int total = dbList.size();

        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
        M modelMapper = sqlSession.getMapper(mapperClass);
        try 
            if (total > batchCountToSQL) 
                for (int index = 0; index < total; ) 
                    if (batchLastIndex > total) 
                        List<T> list = dbList.subList(index, total);

                        func.accept(modelMapper, list);

                        sqlSession.flushStatements();
                        sqlSession.commit();
                        break;
                     else 
                        List<T> list = dbList.subList(index, batchLastIndex);
                        func.accept(modelMapper, list);

                        if (batchLastIndexToSubmit++ >= batchCountToSubmit) 
                            //如果可以批量提交,则提交
                            sqlSession.flushStatements();
                            sqlSession.commit();
                            batchLastIndexToSubmit = 0;
                        
                        index = batchLastIndex;// 设置下一批下标
                        batchLastIndex = index + (batchCountToSQL - 1);
                    
                
             else 
                func.accept(modelMapper, dbList);
                sqlSession.commit();
            
         finally 
            closeSqlSession(sqlSession, sqlSessionFactory);
        
    

使用例子:

//批量插入
 mybatisBatchExecutor.insertOrUpdateBatch(list, BatchTestMapper.class, (mapper, data)-> 
            mapper.insertList(data);
        );

//批量更新
mybatisBatchExecutor.insertOrUpdateBatch(list, BatchTestMapper.class, (mapper, data)-> 
            mapper.updateBatchByPrimaryKeySelective(data);
        );

以上是关于mybatis批量插入数据性能测试的主要内容,如果未能解决你的问题,请参考以下文章

Mongodb亿级数据量的性能测试

Mybatis+mysql批量插入性能分析测试

MyBatis中批量插入数据对插入记录数的限制

MyBatis批量插入为什么比单条插入块?

转-Mongodb亿级数据量的性能测试

mybatis优化批量插入数据