SpringBoot 配置 异步 AsyncEventBus
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot 配置 异步 AsyncEventBus相关的知识,希望对你有一定的参考价值。
参考技术A 1-在使用之前,先说明:guava 的 事件操作分类型: 1-同步 EventBus 2-异步:AsyncEventBus
咱们这里说明异步的使用:
2-配置 线程自己控制
@Bean
public AsyncEventBusasyncEventBus()
// 创建一个核心3线程,最大10线程的线程池,配置DiscardPolicy策略,抛弃当前的任务
ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(3, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());
return new AsyncEventBus(threadPoolExecutor);
3 -注册使用:例如:日志的异步记录
@Component
public class AsyncEventListener
private AsyncEventBusasyncEventBus;
AsyncEventListener(AsyncEventBus asyncEventBus,SysLoginLogService sysLoginLogService){
this.asyncEventBus = asyncEventBus;
this.sysLoginLogService = sysLoginLogService;
}
/**
* 注册这个监听器
*/
@PostConstruct
public void register()
asyncEventBus.register(this);
/**
* 添加登录日志信息
* @param sysLoginLog 登录日志
*/
@Subscribe
public void addLoginLog(SysLoginLog sysLoginLog)
sysLoginLogService.save(sysLoginLog);
}
4- 各个接口的调用 使用异步的post
AsyncEventBus asyncEventBus;
asyncEventBus.post(sysLoginLog)
SpringBoot异步线程池配置
配置线程池:
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线程池配置
*
* @author foo
*/
@Slf4j
@EnableAsync
@Configuration
public class ExecutorConfig
@Bean
@Primary
public Executor asyncServiceExecutor()
log.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutorWrapper();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(20);
//配置队列大小
executor.setQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("async-service-");
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
return executor;
包装一层,处理其他事情:
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/**
* spring线程池封装类
*
* @author foo
*/
@Slf4j
public class ThreadPoolTaskExecutorWrapper extends ThreadPoolTaskExecutor
private void showThreadPoolInfo(String prefix)
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
log.info(", ,taskCount [], completedTaskCount [], activeCount [], queueSize []",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
@Override
public void execute(Runnable task)
showThreadPoolInfo("1. do execute");
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
@Override
public void execute(Runnable task, long startTimeout)
showThreadPoolInfo("2. do execute");
super.execute(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()), startTimeout);
@Override
public Future<?> submit(Runnable task)
showThreadPoolInfo("1. do submit");
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
@Override
public <T> Future<T> submit(Callable<T> task)
showThreadPoolInfo("2. do submit");
return super.submit(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
@Override
public ListenableFuture<?> submitListenable(Runnable task)
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task)
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(ThreadMdcUtil.wrap(task, MDC.getCopyOfContextMap()));
附上工具类:
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.Callable;
/**
* 线程MDC工具类
*
* @author foo
*/
public class ThreadMdcUtil
private ThreadMdcUtil ()
throw new UnsupportedOperationException();
public static void setTraceIdIfAbsent()
if (MDC.get(ConstantValue.TRACE_ID) == null)
MDC.put(ConstantValue.TRACE_ID, TraceIdUtil.getTraceId());
public static <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> context)
return () ->
if (context == null)
MDC.clear();
else
MDC.setContextMap(context);
setTraceIdIfAbsent();
try
return callable.call();
finally
MDC.clear();
;
public static Runnable wrap(final Runnable runnable, final Map<String, String> context)
return () ->
if (context == null)
MDC.clear();
else
MDC.setContextMap(context);
setTraceIdIfAbsent();
try
runnable.run();
finally
MDC.clear();
;
以上是关于SpringBoot 配置 异步 AsyncEventBus的主要内容,如果未能解决你的问题,请参考以下文章