SpringBootspringboot数据使用多线程批量入数据库

Posted 愿做无知一猿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBootspringboot数据使用多线程批量入数据库相关的知识,希望对你有一定的参考价值。

环境

springboot、mybatisPlus、mysql8

mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0)

原始的for循环入库

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService 

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() 
        long start = System.currentTimeMillis();

        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) 
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
						
            //在循环中入库
            baseMapper.insert(entity);
        

        long end = System.currentTimeMillis();

        System.err.println(end - start);

        return end - start;
    

共耗时:180121 ms

批量保存操作

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService 

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() 
        long start = System.currentTimeMillis();

        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) 
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        
				
      	//mybatisPlus提供的批量保存方法,数字代表每几条数据提交一次事务,默认1000
        saveBatch(entityList, 1000);

        long end = System.currentTimeMillis();

        System.err.println(end - start);

        return end - start;
    

耗时时间:87217ms

在批量插入的基础上使用多线程

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService 

    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() throws InterruptedException 
        long start = System.currentTimeMillis();

        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));


        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) 
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        

        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch latch = new CountDownLatch(5);
        partition.forEach(item -> 
            poolExecutor.execute(() -> 
                saveBatch(item, 1000);
                latch.countDown();
            );
        );
        latch.await();
        // 也可以这么写,设定超时时间
        //latch.await(100,TimeUnit.SECONDS);
        long end = System.currentTimeMillis();

        System.err.println(end - start);
        //关闭线程池
        poolExecutor.shutdown();
        return end - start;
    

耗时时间: 28235

可见时间从180秒,缩短到了28秒,但是@Transactional对于多线程是控制不了所有的事务的。

Spring实现事务的原理是通过ThreadLocal把数据库连接绑定到当前线程中,同一个事务中数据库操作使用同一个jdbc connection,新开启的线程获取不到当前jdbc connection。

如下代码:

partition.forEach(item -> 
            poolExecutor.execute(() -> 
                saveBatch(item, 1000);
                latch.countDown();
                //让每个都报错
                int i = 1/0;
            );
        );

控制台打印:

Exception in thread "执行线程5" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程2" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程4" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程1" java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程3" 30179
java.lang.ArithmeticException: / by zero
	at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

可见5个线程都报错了,但是去查询数据库,却可以查询到5000条数据,这是不应该出现的情况。

处理多线程入库的事务问题

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService 

    @Resource
    private DataSourceTransactionManager dataSourceTransactionManager;
    @Resource
    private TransactionDefinition transactionDefinition;

    @Override
    //此处手动管理事务的提交后,这个注解就可以去掉了
    //    @Transactional(rollbackFor = Exception.class)
    public Object doTest() 
        long start = System.currentTimeMillis();

        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));


        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 50; i++) 
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        

        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch sonLatch = new CountDownLatch(5);
        //主线程的 肯定为1
        CountDownLatch mainLatch = new CountDownLatch(1);
        AtomicBoolean hasError = new AtomicBoolean(false);
        partition.forEach(item -> 
            poolExecutor.execute(() -> 
                doSave(item, sonLatch, hasError, mainLatch);
            );
        );

        try 
            //此处应该是用try catch 包裹着主线程的所有业务代码,以此保证主线程中任何一处报错都可以通知子线程

            //这里加一个是为了调试主线程中的数据入库操作
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) 99999);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE

SpringbootSpringboot整合Jasypt,让配置信息安全最优雅方便的方式

1 简介

在上一篇文章中,介绍了Jasypt及其用法,具体细节可以查看【Java库】如何使用优秀的加密库Jasypt来保护你的敏感信息?。如此利器,用之得当,那将事半功倍。本文将介绍Springboot整合Jasypt,实现配置信息的安全,如数据库连接、账号和密码、接口凭证信息等。

Jasypt可以为Springboot加密的信息很多,主要有:

  • System Property 系统变量
  • Envirnment Property 环境变量
  • Command Line argument 命令行参数
  • Application.properties 应用配置文件
  • Yaml properties 应用配置文件
  • other custom property sources 其它配置文件

经测试,Springboot 2.1.9版本与jasypt-spring-boot最新版本的3.0.0和2.1.2均有问题,本文使用2.1.1成功。

2 如何加入依赖

Jasypt整合到Springboot是另一个开源项目jasypt-spring-boot,主要有三种整合方式:

2.1 jasypt-spring-boot-starter

如果项目使用@SpringBootApplication@EnableAutoConfiguration注解,在pom中加入以下依赖即可对整个Spring的环境的配置信息进行加密解密。

<dependency>
  <groupId>com.github.ulisesbocchio</groupId>
  <artifactId>jasypt-spring-boot-starter</artifactId>
  <version>2.1.1</version>
</dependency>

2.2 jasypt-spring-boot

如果项目不使用@SpringBootApplication@EnableAutoConfiguration注解,我们就使用下面的依赖,然后在配置Java类中加上注解@EnableEncryptableProperties

<dependency>
  <groupId>com.github.ulisesbocchio</groupId>
  <artifactId>jasypt-spring-boot</artifactId>
  <version>2.1.1</version>
</dependency>

配置类如下:

@Configuration
@EnableEncryptableProperties
public class MyApplication {
 
}

2.3 只对特定配置加密解密

如果不想使用以上两种方式对所有配置信息都进行加密解密的话,可以使用注解@EncryptablePropertySource指定配置文件,依赖如下:

<dependency>
  <groupId>com.github.ulisesbocchio</groupId>
  <artifactId>jasypt-spring-boot</artifactId>
  <version>2.1.1</version>
</dependency>

配置类如下:

@Configuration
@EncryptablePropertySource(name = "EncryptedProperties", value = "classpath:encrypted.properties")
public class MyApplication {
	
}

3 生成加密字符

生成加密字符有多种方式,在实践中使用过以下几种方式。

3.1 Java命令行

Jasypt提供了一个类专门用于加密解密,提供了main方法,调用如下:

java -cp ./jasypt-1.9.3.jar org.jasypt.intf.cli.JasyptPBEStringEncryptionCLI password=pkslow algorithm=PBEWithMD5AndTripleDES input=larry

输出为:

----ENVIRONMENT-----------------
Runtime: Oracle Corporation Java HotSpot(TM) 64-Bit Server VM 25.212-b10 

----ARGUMENTS-------------------
input: larry
algorithm: PBEWithMD5AndTripleDES
password: pkslow

----OUTPUT----------------------
SUfiOs8MvmAUjg+oWl/6dQ==

3.2 脚本命令

Jasypt为我们提供了脚本,可以直接用于加密解密,从http://www.jasypt.org/download.html可以下载。下载解压后的文件有:

# 解压后文件
LICENSE.txt NOTICE.txt  README.txt  apidocs     bin         lib
# bin文件夹的文件
decrypt.bat    decrypt.sh
digest.bat    digest.sh
encrypt.bat    encrypt.sh
listAlgorithms.bat listAlgorithms.sh

在bin目录下面,我们可以根据自己的系统选择使用什么脚本来生成密文,使用参数与Java命令一样。其实这些脚本就是封装了一个调用Java类的工具。使用如下:

$ sh encrypt.sh password=pkslow algorithm=PBEWithMD5AndTripleDES input=larry

----ENVIRONMENT-----------------
Runtime: Oracle Corporation Java HotSpot(TM) 64-Bit Server VM 25.212-b10 

----ARGUMENTS-------------------
input: larry
algorithm: PBEWithMD5AndTripleDES
password: pkslow

----OUTPUT----------------------
xRvdeEnk7zgKtX5uVGCIug==

3.3 Java代码

既然是Java的库,那肯定能用Java代码来加密解密了。具体细节可以参考【Java库】如何使用优秀的加密库Jasypt来保护你的敏感信息?

4 配置密文与其它项

4.1 配置密文

生成密文后,就要把密文配置在相应的位置,如下:

username: ENC(SUfiOs8MvmAUjg+oWl/6dQ==)

jasypt:
  encryptor:
    password: pkslow
    algorithm: PBEWithMD5AndTripleDES

配置密文的默认格式:ENC(密文),这个格式可以通过jasypt.encryptor.property.prefixjasypt.encryptor.property.suffix配置,这里不再演示。

4.2 其它配置项

配置信息只有 jasypt.encryptor.password 是必须的,配置项有:

配置项 必须 Default Value
jasypt.encryptor.password True -
jasypt.encryptor.algorithm False PBEWITHHMACSHA512ANDAES_256
jasypt.encryptor.keyObtentionIterations False 1000
jasypt.encryptor.poolSize False 1
jasypt.encryptor.providerName False SunJCE
jasypt.encryptor.providerClassName False null
jasypt.encryptor.saltGeneratorClassname False org.jasypt.salt.RandomSaltGenerator
jasypt.encryptor.ivGeneratorClassname False org.jasypt.iv.RandomIvGenerator
jasypt.encryptor.stringOutputType False base64
jasypt.encryptor.proxyPropertySources False false

5 如何安放你的密钥

密钥是非常重要的信息,放在什么地方,决定着你的密文是否真的安全。可以有以下几类方式:

(1)放在application.properties

这样能获得配置文件的人就能知道密钥,不够安全。但它是一种方便简单的方式。存在密文和密钥放在同一个配置文件的风险。

(2)JVM参数

在启动Java程序时加参数:-Djasypt.encryptor.password=pkslow,这样就不会把密钥放在代码中去了。

(3)服务器的环境变量

把密钥放在linux系统的环境变量中去,只有能拿到服务器访问权限的人,才有可能知道密钥在哪。例如:

# 配置profile文件
export JASYPT_PASSWORD = pkslow

# 生效 
source /etc/profile

# 运行java程序时
java -jar -Djasypt.encryptor.password=${JASYPT_PASSWORD} xxx.jar

(4)使用自定义的Encryptor来存放

以上我们都使用了官方提供的Encryptor,其实我们是可以自定义的,如下:

@Bean("jasyptStringEncryptor")
public StringEncryptor stringEncryptor() {
  PooledPBEStringEncryptor encryptor = new PooledPBEStringEncryptor();
  SimpleStringPBEConfig config = new SimpleStringPBEConfig();
  config.setPassword("password");
  config.setAlgorithm("PBEWITHHMACSHA512ANDAES_256");
  config.setKeyObtentionIterations("1000");
  config.setPoolSize("1");
  config.setProviderName("SunJCE");
  config.setSaltGeneratorClassName("org.jasypt.salt.RandomSaltGenerator");
  config.setIvGeneratorClassName("org.jasypt.iv.RandomIvGenerator");
  config.setStringOutputType("base64");
  encryptor.setConfig(config);
  return encryptor;
}

把密钥写在代码里,只有能获得jar包并反编译的人,才能获得密文。

如果我们把密钥的一部分写在代码里,另一部分通过外部方式来配置,这样就会更加安全。

6 结果测试

我们已经完成了密文的生成,现在我们测试一下是否能正常解密,测试代码如下:

@RestController
@RequestMapping("/jasypt")
public class JasyptController {
    @Value("${username}")
    private String username;

    @GetMapping("/name")
    public Mono<String> sendNormalText() {
        return Mono.just(username);
    }
}

访问该接口,能返回加密前的字符串,整个流程测试成功:

file

7 总结

本文简介了Springboot整合Jasypt实现配置信息的安全化,在实际项目中应用还是很多的。

另外,如果项目中是采用Spring Cloud Config的,它提供了统一的加解密方式,也方便使用。但如果应用配置没有走配置中心,还是应该使用Jasypt。


欢迎关注公众号<南瓜慢说>,将持续为你更新...

file

欢迎加博主微信,做一个点赞之友,哈哈...

file

多读书,多分享;多写作,多整理。

以上是关于SpringBootspringboot数据使用多线程批量入数据库的主要内容,如果未能解决你的问题,请参考以下文章

Springbootspringboot中使用mybatis操作数据库

SpringBootSpringBoot 之 Hystrix服务隔离

SpringbootSpringboot整合Jasypt,让配置信息安全最优雅方便的方式

SpringBootSpringBoot 缓存(十八)

SpringBootspringboot 与 单元测试经验

SpringBootSpringBoot 自动配置原理