SpringBoot异步线程池配置

Posted Java_Chuck

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot异步线程池配置相关的知识,希望对你有一定的参考价值。

 配置线程池:


import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置
 *
 * @author foo
 */
@Slf4j
@EnableAsync 
@Configuration
public class ExecutorConfig 

    @Bean
    @Primary
    public Executor asyncServiceExecutor() 
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutorWrapper();
        //配置核心线程数
        executor.setCorePoolSize(10);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");

        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    

包装一层,处理其他事情:


import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * spring线程池封装类
 *
 * @author foo
 */
@Slf4j
public class ThreadPoolTaskExecutorWrapper extends ThreadPoolTaskExecutor 

    private void showThreadPoolInfo(String prefix) 
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        log.info(", ,taskCount [], completedTaskCount [], activeCount [], queueSize []",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    

    @Override
    public void execute(Runnable task) 
        showThreadPoolInfo("1. do execute");
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));

    

    @Override
    public void execute(Runnable task, long startTimeout) 
        showThreadPoolInfo("2. do execute");
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);

    

    @Override
    public Future<?> submit(Runnable task) 
        showThreadPoolInfo("1. do submit");
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

    @Override
    public <T> Future<T> submit(Callable<T> task) 
        showThreadPoolInfo("2. do submit");
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) 
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) 
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

附上工具类:


import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.Callable;



/**
 * 线程MDC工具类
 *
 * @author foo
 */
public class ThreadMdcUtil 

    private ThreadMdcUtil () 
        throw new UnsupportedOperationException();
    

    public static void setTraceIdIfAbsent() 
        if (MDC.get(ConstantValue.TRACE_ID) == null) 
            MDC.put(ConstantValue.TRACE_ID, TraceIdUtil.getTraceId());
        
    

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) 
        return () -> 
            if (context == null) 
                MDC.clear();
             else 
                MDC.setContextMap(context);
            
            setTraceIdIfAbsent();
            try 
                return callable.call();
             finally 
                MDC.clear();
            
        ;
    

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) 
        return () -> 
            if (context == null) 
                MDC.clear();
             else 
                MDC.setContextMap(context);
            
            setTraceIdIfAbsent();
            try 
                runnable.run();
             finally 
                MDC.clear();
            
        ;
    

以上是关于SpringBoot异步线程池配置的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot异步线程池配置

#yyds干货盘点# springboot配置@Async异步任务的线程池

springboot+@async异步线程池的配置及应用

springboot隔离@Async异步任务的线程池

SpringBoot 自定义TaskExecutor线程池执行异步操作

SpringBoot - 优雅的实现异步编程