mybatis以及mybatisplus批量插入问题
Posted 又 欠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mybatis以及mybatisplus批量插入问题相关的知识,希望对你有一定的参考价值。
1. 思路分析:
批量插入是我们日常开放经常会使用到的场景,一般情况下我们也会有两种方案进行实施,如下所示。
方案一 就是用 for 循环循环插入:
优点:JDBC 中的 PreparedStatement 有预编译功能,预编译之后会缓存起来,后面的 SQL 执行会比较快并且JDBC 可以开启批处理,这个批处理执行非常给力。
缺点:很多时候我们的 SQL 服务器和应用服务器可能并不是同一台,所以必须要考虑网络 IO,如果网络 IO 比较费时间的话,那么可能会拖慢
SQL 执行的速度。
再来说第二种方案,就是生成一条 SQL 插入:
优势:这种方案的优势在于只有一次网络 IO,即使分片处理也只是数次网络 IO,所以这种方案不会在网络 IO 上花费太多时间。
缺点一是 SQL 太长了,甚至可能需要分片后批量处理;
缺点二是无法充分发挥 PreparedStatement 预编译的优势,SQL 要重新解析且无法复用;三是最终生成的 SQL
太长了,数据库管理器解析这么长的 SQL 也需要时间。
2. rewriteBatchedStatements=true
在jdbc连接后面加上 rewriteBatchedStatements=true ,加上后才是真正的批量插入。
jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
3.使用mybatis批量插入:
方案一:使用foreach进行插入(生成一条 SQL 插入)
mapper文件
<insert id="save" parameterType="java.util.List">
INSERT INTO test
(
id,
a,
b,
c
)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(
#item.id,
#item.a,
#item.b,
#item.c
)
</foreach>
</insert>
调用方法
@Override
public void add()
//时间 一
long l = System.currentTimeMillis();
List<TestEntity> list=new ArrayList<>();
for (int i=0;i<1000;i++)
TestEntity testEntity=new TestEntity();
testEntity.setC(i);
list.add(testEntity);
testMapper.save(list);
//时间 二
long l1 = System.currentTimeMillis();
System.out.println("耗时"+(l1-l));
插入了1000条数据,耗时535毫秒。
插入了50000条数据,直接报错。
报错原因是因为我们一条SQL进行插入导致SQL太长
解决办法:
1.修改MySQL配置
2.对新增数据进行分片
方案二:一条条插入
mapper
<insert id="addUserOneByOne" parameterType="com.ruoyi.system.domain.TestEntity">
insert into test (id,a,b,c) values (#id,#a,#b,#c)
</insert>
测试代码
@Service
public class TestServiceimpl extends ServiceImpl<TestMapper, TestEntity> implements TestService
@Autowired
private TestMapper testMapper;
@Autowired
private SqlSessionFactory sqlSessionFactory;
public void addUserOneByOne(List<TestEntity> users)
SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
TestMapper um = session.getMapper(TestMapper.class);
long startTime = System.currentTimeMillis();
for (TestEntity user : users)
um.addUserOneByOne(user);
session.commit();
long endTime = System.currentTimeMillis();
System.out.println("耗时"+(endTime - startTime));
插入了1000条数据,耗时959毫秒。
插入50000条数据,耗时11214毫秒。
对比分析:
如果我们批量插入少部分数据,可以使用方式一,一条SQL进行插入。这样是比较快的。
如果我们插入数据达到,1w条,10来万条,这时建议用方式二进行插入是比较快的。
4. 使用mybatisplus批量插入
使用saveBatch()方法进行批量插入
@Service
public class TestServiceimpl extends ServiceImpl<TestMapper, TestEntity> implements TestService
@Autowired
private TestMapper testMapper;
@Autowired
private SqlSessionFactory sqlSessionFactory;
@Override
public void add()
//时间 一
long l = System.currentTimeMillis();
List<TestEntity> list=new ArrayList<>();
for (int i=0;i<50000;i++)
TestEntity testEntity=new TestEntity();
testEntity.setC(i);
list.add(testEntity);
saveBatch(list);
//时间 二
long l1 = System.currentTimeMillis();
System.out.println("耗时"+(l1-l));
插入50000条数据,耗时19516毫秒
源码分析
public boolean saveBatch(Collection<T> entityList, int batchSize)
String sqlStatement = this.getSqlStatement(SqlMethod.INSERT_ONE);
return this.executeBatch(entityList, batchSize, (sqlSession, entity) ->
sqlSession.insert(sqlStatement, entity);
);
这里注意 return 中的第三个参数,是一个 lambda 表达式,这也是 MP 中批量插入的核心逻辑,可以看到,MP 先对数据进行分片(默认分片大小是 1000),分片完成之后,也是一条一条的插入。
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer)
Assert.isFalse(batchSize < 1, "batchSize must not be less than one", new Object[0]);
return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, (sqlSession) ->
int size = list.size();
int i = 1;
for(Iterator var6 = list.iterator(); var6.hasNext(); ++i)
E element = var6.next();
consumer.accept(sqlSession, element);
if (i % batchSize == 0 || i == size)
sqlSession.flushStatements();
);
继续查看 executeBatch 方法,就会发现这里的 sqlSession 其实也是一个批处理的 sqlSession,并非普通的 sqlSession。和我们mybatis使用的方法二一致。
5业务场景一对多怎么处理:
比如,如下这种一对多场景。
新增的时候保存都好理解,形成一个数组一起保存。
而修改的时候就有点难处理了,比如我修改了第二条,删除了第三条,这时统一保存应该怎么处理?
使用 ON DUPLICATE KEY UPDATE (发生主键冲突就更新,没有发生主键冲突就新增)
有时候由于业务需求,可能需要先去根据某一字段值查询数据库中是否有记录,有则更新,没有则插入。这个时候就可以用到ON DUPLICATE key update这个sql语句了
mapper如下所示
<insert id="save" parameterType="java.util.List">
INSERT INTO test
(
id,
a,
b,
c
)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(
#item.id,
#item.a,
#item.b,
#item.c
)
</foreach>
ON DUPLICATE KEY UPDATE
id=id,
a = VALUES(a) ,
b = VALUES(b),
c = VALUES(c)
</insert>
或者在使用mybatisplus时,使用saveOrUpdate()方法进行一条数据的新增或更新。 saveOrUpdateBatch()方法进行批量数据的新增或更新。
梅西“消除厄运”卡
新冠,退退退!
梅老板,冲冲冲!
三星阿根廷,加油!
mybatis批量插入数据性能测试
目录
for
循环测试批量插入和更新数据,myabtis
的Batch
模式,mybatis
中直接使用foreach sql
拼接插入数据。
性能测试
1、准备
使用的是mybatis-plus
以及postgrep
数据库来测试的
依赖:
<!--postgresql-->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.14</version>
</dependency>
<!--mybatis-plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<!-- lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--guava-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
配置:
server.port=8888
spring.datasource.username=postgres
spring.datasource.password=123456
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.url=jdbc:postgresql://ip:port/xxx?sslmode=disable
logging.level.com.example.OnDuplicateKeyDemo.dao=debug
mybatis-plus.mapper-locations=classpath:mapper/*.xml
实体类:
@Data
@TableName("test_onduplicatekey")
public class OnduplicateKey
@TableId(type= IdType.AUTO)
private Long id;
private String name;
private String email;
private String address;
private LocalDateTime createTime;
private LocalDateTime updateTime;
dao
:
public interface OnduplicateKeyMapper extends BaseMapper<OnduplicateKey>
@Insert("<script>" +
" insert into $tableName (id,name,email,address,create_time,update_time)\\n" +
" values\\n" +
" <foreach collection=\\"onduplicateKeyList\\" item=\\"item\\" separator=\\",\\">\\n" +
" (\\n" +
" <if test=\\"item.id==null\\">\\n" +
" nextval('$tableName_id_seq')\\n" +
" </if>\\n" +
" <if test=\\"item.id!=null\\">\\n" +
" #item.id\\n" +
" </if>\\n" +
" ,#item.name,#item.email,#item.address,#item.createTime,#item.updateTime)\\n" +
" </foreach>\\n" +
" ON conflict(id) do\\n" +
" update set\\n" +
" name=EXCLUDED.name,\\n" +
" email=EXCLUDED.email,\\n" +
" address=EXCLUDED.address,\\n" +
" create_time=EXCLUDED.create_time,\\n" +
" update_time=EXCLUDED.update_time" +
"</script>")
int saveOrUpdateOnDuplicateKey( @Param("onduplicateKeyList") List<OnduplicateKey> onduplicateKeyList,@Param("tableName") String tableName);
映射文件:不想使用注解,也可以使用如下映射文件
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.OnDuplicateKeyDemo.dao.OnduplicateKeyMapper">
直接使用tableName
<!-- <insert id="saveOrUpdateOnDuplicateKey">-->
<!-- insert into $tableName (id,name,email,address,create_time,update_time)-->
<!-- values-->
<!-- <foreach collection="onduplicateKeyList" item="item" separator=",">-->
<!-- (-->
<!-- <if test="item.id==null">-->
<!-- nextval('$tableName_id_seq')-->
<!-- </if>-->
<!-- <if test="item.id!=null">-->
<!-- #item.id-->
<!-- </if>-->
<!-- ,#item.name,#item.email,#item.address,#item.createTime,#item.updateTime)-->
<!-- </foreach>-->
<!-- ON conflict(id) do-->
<!-- update set-->
<!-- name=EXCLUDED.name,-->
<!-- email=EXCLUDED.email,-->
<!-- address=EXCLUDED.address,-->
<!-- create_time=EXCLUDED.create_time,-->
<!-- update_time=EXCLUDED.update_time-->
<!-- </insert>-->
使用tableInfo
<!-- <insert id="saveOrUpdateOnDuplicateKey">-->
<!-- insert into $tableInfo.tableName (id,-->
<!-- <foreach collection="tableInfo.fieldList" item="item" separator=",">-->
<!-- $item.column-->
<!-- </foreach>-->
<!-- )-->
<!-- values-->
<!-- <foreach collection="onduplicateKeyList" item="item" separator=",">-->
<!-- (-->
<!-- <if test="item.id==null">-->
<!-- nextval('$tableInfo.tableName_id_seq')-->
<!-- </if>-->
<!-- <if test="item.id!=null">-->
<!-- #item.id-->
<!-- </if>-->
<!-- ,-->
<!-- <foreach collection="tableInfo.fieldList" item="item2" separator=",">-->
<!-- #item.$item2.property-->
<!-- </foreach>-->
<!-- )-->
<!-- </foreach>-->
<!-- ON conflict(id) do-->
<!-- update set-->
<!-- <foreach collection="tableInfo.fieldList" item="item" separator=",">-->
<!-- $item.column=EXCLUDED.$item.column-->
<!-- </foreach>-->
<!-- </insert>-->
</mapper>
service
以及实现类:
public interface OnduplicateKeyService extends IService<OnduplicateKey>
@Service
public class OnduplicateKeyServiceImpl extends ServiceImpl<OnduplicateKeyMapper, OnduplicateKey> implements OnduplicateKeyService
启动类:
@MapperScan("com.example.OnDuplicateKeyDemo.dao.**")
2、普通for循环批量插入数据测试
/**
* @Description:
* for循环测试批量更新或新增,虽然循环多次,但是这些insert都处于同一个事务中,因此,使用的连接都是同一条
* 但是每次insert,数据库都要解析sql,然后操作表。
* 编译次数=设置参数次数=执行次数
* 1千条数据耗时:86865 83086 81232
*
* 循环插入:需要每次都获取session,获取连接,然后将sql 语句发给mysql 去执行
* (JDBC一般情况下是通过TCP/IP 进行连接和数据库进行通信的)。
*
* @DateTime: 2021/3/1 14:29
* @Params:
* @Return
* @Author: liuzhihui
*/
@Test
void saveOrUpdateWithFor()
//获取数据
List<OnduplicateKey> onduplicateKeys = prepareData();
long startTime = System.currentTimeMillis();
System.out.println("开始时间:"+startTime);
for (OnduplicateKey onduplicateKey : onduplicateKeys)
//判断数据是否存在
OnduplicateKey onduplicateKey1 = onduplicateKeyMapper.selectById(onduplicateKey.getId());
//插入
if (onduplicateKey1==null)
int insert = onduplicateKeyMapper.insert(onduplicateKey);
else //更新
int i = onduplicateKeyMapper.updateById(onduplicateKey);
System.out.println("耗时:"+(System.currentTimeMillis()-startTime));
/**
* 数据准备
* @return
*/
private List<OnduplicateKey> prepareData()
List<OnduplicateKey> data=new ArrayList<>();
for (int i = 1; i <= 1000; i++)
OnduplicateKey onduplicateKey=new OnduplicateKey();
if (i<10)
onduplicateKey.setId((long)i);
onduplicateKey.setName("lisi_"+i);
onduplicateKey.setEmail(i+"@qq.com");
onduplicateKey.setAddress("北京"+i);
onduplicateKey.setCreateTime(LocalDateTime.now());
onduplicateKey.setUpdateTime(LocalDateTime.now());
data.add(onduplicateKey);
return data;
3、使用MyBatis提供的BATCH模式
/**
* @Description:
* 当使用MyBatis时,有两种批量插入的方式,一种是动态SQL中foreach拼接字符串,一种是使用MyBatis提供的BATCH模式。
* 两种方式的区别是啥?
* 1、batch模式: 重复使用已经预处理的语句,并且批量执行所有更新语句.
* 预编译sql(1次)==》设置参数=500次==》执行commit(1次)
* 预编译:PreparedStatement ps = conn.prepareStatement(sql);
* ps对象包含语句 insert into t_customer values(?,?,?,?),它已发送给DBMS,并为执行作好了准备。
* 原理:其实底层就是使用jdbc的批量插入操作。
* 1、预编译PreparedStatement ps = conn.prepareStatement(sql)
* 2、for循环,使用ps设置n条数据,每设置一条就ps.addBatch();
* 当积攒的数据量到了指定的值时,就执行一次ps.executeBatch并清空缓存
* 非batch模式,就是不积攒,有一条数据就,编译一次,然后设置参数,执行一次
* 1千条数据耗时: 1222 1241 1253
* 1万条数据耗时: 19221 15559 13910 14664
*
* 2、mybatis中直接使用foreach插入数据,就相当于将所有的sql预先拼接到一起,然后一起提交,但是数据量过大,会引起,栈内存溢出了,mysql对语句的长度有限制,默认是 4M
* 动态sql,通过foreach标签进行了sql拼接出一个大sql,交由数据库执行,只需一次调用。
* 批量插入通过foreach 标签,将多条数据拼接在sql 语句后,一次执行只获取一次session,提交一条sql语句。减少了程序和数据库交互的准备时间。
*
*
* @DateTime: 2021/3/3 13:30
* @Params:`
* @Return
* @Author: liuzhihui
*/
@Test
void saveOrUpdateBatch()
//获取数据
List<OnduplicateKey> onduplicateKeys = prepareData();
long startTime = System.currentTimeMillis();
System.out.println("开始时间:"+startTime);
boolean b = onduplicateKeyService.saveOrUpdateBatch(onduplicateKeys, 1000);
System.out.println("耗时:"+b+" "+(System.currentTimeMillis()-startTime));
4、mybatis中直接使用foreach插入数据
说是说OnduplicateKey
模式,在mysql
中才有OnduplicateKey
,在postgrep
中,是ON conflict
,有什么用呢,就是数据的批处理中,有些数据可能是新增,有些数据可能是更新,这时ON conflict
,就能用上了。
在这里使用ON conflict
,就是新增的时候,使用foreach
在insert
语句中,拼接多个value
,来提高插入数据。
最后的测试结果,也是相当不错的。
/**
* @Description: OnduplicateKey模式: 应该具备回滚的功能,中间某条数据插入出现问题,要回滚
* @DateTime: 2021/3/4 10:38
* @Params: 1千条数据耗时:1393 1395 1365
* @Return 1万条数据耗时:4358 4056 4322
* 1万条多线程批处理数据耗时: 908 1238 861
* @Author: liuzhihui
*/
@Transactional
@Test
void saveOrUpdateOnDuplicateKey()
List<OnduplicateKey> onduplicateKeys = prepareData();
TableInfo tableInfo = TableInfoHelper.getTableInfo(OnduplicateKey.class);
//拆分集合的阈值
int count =1000;
long startTime = System.currentTimeMillis();
System.out.println("开始时间:"+startTime);
//集合按照count,均等拆分为多个集合
List<List<OnduplicateKey>> collect = Lists.partition(onduplicateKeys, count);
//多线程处理
collect.stream().parallel().forEach(onduplicateKeyList->
onduplicateKeyMapper.saveOrUpdateOnDuplicateKey(onduplicateKeyList,tableInfo.getTableName());
);
System.out.println("耗时:"+(System.currentTimeMillis()-startTime));
5、mybatis如何开启batch模式
没有使用mybatis-plus
,怎么开启batch
模式呢?
一般来说,对于SSM
项目工程来说,mybatis
的ExectoryType
默认是simple
,那么又如何能动态使用batch
模式呢?
直接上源码实现干货:
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.function.BiConsumer;
import static org.mybatis.spring.SqlSessionUtils.closeSqlSession;
/**
* 启动mybatis的Batch模式的批量新增、更新
*
* @author sunchangtan
*/
@Component
public class MybatisBatchExecutor
/**
* 拼写成SQL的最大数据量
* 比如: 如果insert,把batchCount条数的数据拼写成一个大SQL
* 如果update,把batchCount条数的数据拼写成case when方式的大SQL
*/
private static final int batchCountToSQL = 100;
/**
* 每多少次时开始commit
*/
private static final int batchCountToSubmit = 100;
@Resource
private SqlSessionTemplate sqlSessionTemplate;
/**
* 批量更新、新增
* @param dbList
* @param mapperClass
* @param func
* @param <T>
* @param <M>
*/
public <T, M> void insertOrUpdateBatch(List<T> dbList, Class<M> mapperClass, BiConsumer<M, List<T>> func)
int batchLastIndex = batchCountToSQL;
int batchLastIndexToSubmit = 0;
int total = dbList.size();
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH, false);
M modelMapper = sqlSession.getMapper(mapperClass);
try
if (total > batchCountToSQL)
for (int index = 0; index < total; )
if (batchLastIndex > total)
List<T> list = dbList.subList(index, total);
func.accept(modelMapper, list);
sqlSession.flushStatements();
sqlSession.commit();
break;
else
List<T> list = dbList.subList(index, batchLastIndex);
func.accept(modelMapper, list);
if (batchLastIndexToSubmit++ >= batchCountToSubmit)
//如果可以批量提交,则提交
sqlSession.flushStatements();
sqlSession.commit();
batchLastIndexToSubmit = 0;
index = batchLastIndex;// 设置下一批下标
batchLastIndex = index + (batchCountToSQL - 1);
else
func.accept(modelMapper, dbList);
sqlSession.commit();
finally
closeSqlSession(sqlSession, sqlSessionFactory);
使用例子:
//批量插入
mybatisBatchExecutor.insertOrUpdateBatch(list, BatchTestMapper.class, (mapper, data)->
mapper.insertList(data);
);
//批量更新
mybatisBatchExecutor.insertOrUpdateBatch(list, BatchTestMapper.class, (mapper, data)->
mapper.updateBatchByPrimaryKeySelective(data);
);
以上是关于mybatis以及mybatisplus批量插入问题的主要内容,如果未能解决你的问题,请参考以下文章
mybatis和mybatisPlus中解决实体类字段与数据库关键字冲突问题
mybatis和mybatisPlus中解决实体类字段与数据库关键字冲突问题
mybatis-plus 自定义basemapper支持批量增删改操作