SpringBoot 配置 异步 AsyncEventBus

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 配置 异步 AsyncEventBus相关的知识,希望对你有一定的参考价值。

参考技术A 1-在使用之前,先说明:

           guava 的 事件操作分类型:  1-同步   EventBus    2-异步:AsyncEventBus

 咱们这里说明异步的使用:

2-配置   线程自己控制

@Bean

public AsyncEventBusasyncEventBus()

// 创建一个核心3线程,最大10线程的线程池,配置DiscardPolicy策略,抛弃当前的任务

    ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());

    return new AsyncEventBus(threadPoolExecutor);



3 -注册使用:例如:日志的异步记录

@Component

public class AsyncEventListener

                    private AsyncEventBusasyncEventBus;

                         AsyncEventListener(AsyncEventBus asyncEventBus,SysLoginLogService sysLoginLogService){

                                       this.asyncEventBus = asyncEventBus;

                                        this.sysLoginLogService = sysLoginLogService;

                         }

/**

                                 * 注册这个监听器

                                       */

                              @PostConstruct

                               public void register()

                                    asyncEventBus.register(this);

                                   

/**

* 添加登录日志信息

* @param sysLoginLog 登录日志

*/

@Subscribe

public void addLoginLog(SysLoginLog sysLoginLog)

sysLoginLogService.save(sysLoginLog);





4- 各个接口的调用  使用异步的post

AsyncEventBus asyncEventBus;

asyncEventBus.post(sysLoginLog)

SpringBoot异步线程池配置

 配置线程池:


import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置
 *
 * @author foo
 */
@Slf4j
@EnableAsync 
@Configuration
public class ExecutorConfig 

    @Bean
    @Primary
    public Executor asyncServiceExecutor() 
        log.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutorWrapper();
        //配置核心线程数
        executor.setCorePoolSize(10);
        //配置最大线程数
        executor.setMaxPoolSize(20);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("async-service-");

        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
    

包装一层,处理其他事情:


import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * spring线程池封装类
 *
 * @author foo
 */
@Slf4j
public class ThreadPoolTaskExecutorWrapper extends ThreadPoolTaskExecutor 

    private void showThreadPoolInfo(String prefix) 
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        log.info(", ,taskCount [], completedTaskCount [], activeCount [], queueSize []",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    

    @Override
    public void execute(Runnable task) 
        showThreadPoolInfo("1. do execute");
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));

    

    @Override
    public void execute(Runnable task, long startTimeout) 
        showThreadPoolInfo("2. do execute");
        super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);

    

    @Override
    public Future<?> submit(Runnable task) 
        showThreadPoolInfo("1. do submit");
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

    @Override
    public <T> Future<T> submit(Callable<T> task) 
        showThreadPoolInfo("2. do submit");
        return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) 
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) 
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
    

附上工具类:


import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.Callable;



/**
 * 线程MDC工具类
 *
 * @author foo
 */
public class ThreadMdcUtil 

    private ThreadMdcUtil () 
        throw new UnsupportedOperationException();
    

    public static void setTraceIdIfAbsent() 
        if (MDC.get(ConstantValue.TRACE_ID) == null) 
            MDC.put(ConstantValue.TRACE_ID, TraceIdUtil.getTraceId());
        
    

    public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context) 
        return () -> 
            if (context == null) 
                MDC.clear();
             else 
                MDC.setContextMap(context);
            
            setTraceIdIfAbsent();
            try 
                return callable.call();
             finally 
                MDC.clear();
            
        ;
    

    public static Runnable wrap(final Runnable runnable, final Map<String, String> context) 
        return () -> 
            if (context == null) 
                MDC.clear();
             else 
                MDC.setContextMap(context);
            
            setTraceIdIfAbsent();
            try 
                runnable.run();
             finally 
                MDC.clear();
            
        ;
    

以上是关于SpringBoot 配置 异步 AsyncEventBus的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot异步线程池配置

SpringBoot异步线程池配置

SpringBoot异步线程池配置

springboot+@async异步线程池的配置及应用

SpringBoot之@Async异步调用

springboot 多线程执行