使用线程池时,多线程之间上下文参数传递失效解决办法
Posted 抓手
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用线程池时,多线程之间上下文参数传递失效解决办法相关的知识,希望对你有一定的参考价值。
问题描述
在执行业务代码时,有时候需要异步处理一些业务,在异步线程中获取当前登录的用户信息失败。因为用户信息是在过滤器或拦截器中解析出来并设置到ThreadLocal中的,子线程是无法获取到ThreadLocal中的值的。
虽然可以使用InheritableThreadLocal替代ThreadLocal,但是只能在新创建的线程中使用。对于线程池也会失效,因为线程池中的线程是由线程池创建好并可重复利用,这时InheritableThreadLocal的值传递就失去意义了。
解决办法
自定义线程池和线程池执行器,重写submit方法,抓取主线程中的用户信息然后在子线程中回放,在子线程执行完业务逻辑后,清除用户信息。
定义当前登录用户信息类
import lombok.Data;
/**
* @author 向振华
* @date 2023/01/12 10:40
*/
public class CurrentUser
private CurrentUser()
public static final ThreadLocal<UserInfo> CURRENT_USER = new ThreadLocal<>();
public static UserInfo get()
return CURRENT_USER.get();
public static void set(UserInfo userInfo)
CURRENT_USER.set(userInfo);
public static void clear()
CURRENT_USER.remove();
@Data
public static class UserInfo
private Long userId;
private String name;
private String mobile;
定义可传递上下文信息的ThreadPoolExecutor
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* @author 向振华
* @date 2023/01/12 10:40
*/
@Slf4j
public class TransmittableThreadPoolExecutor extends ThreadPoolExecutor
public TransmittableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
public TransmittableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
public TransmittableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
public TransmittableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
@Override
public void execute(Runnable task)
super.execute(this.wrapRunnable(task));
@Override
public <T> Future<T> submit(Runnable task, T result)
return super.submit(this.wrapRunnable(task), result);
@Override
public <T> Future<T> submit(Callable<T> task)
return super.submit(this.wrapCallable(task));
@Override
public Future<?> submit(Runnable task)
return super.submit(this.wrapRunnable(task));
public Runnable wrapRunnable(final Runnable runnable)
final CurrentUser.UserInfo userInfo = CurrentUser.get();
return () ->
CurrentUser.set(userInfo);
try
runnable.run();
catch (Exception e)
log.error("异常 ", e);
throw e;
finally
CurrentUser.clear();
;
public <T> Callable<T> wrapCallable(final Callable<T> callable)
final CurrentUser.UserInfo userInfo = CurrentUser.get();
return () ->
CurrentUser.set(userInfo);
try
return callable.call();
catch (Exception e)
log.error("异常 ", e);
throw e;
finally
CurrentUser.clear();
;
定义线程池获取工具类
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author 向振华
* @date 2023/01/12 10:40
*/
@Component
public class ExecutorUtils implements ApplicationListener<ContextRefreshedEvent>
private static ExecutorService asyncExecutorService;
private ExecutorUtils()
public static ExecutorService getAsyncExecutorService()
return asyncExecutorService;
/**
* 初始化线程池
*/
private static void init()
if (asyncExecutorService == null)
AtomicInteger threadNumber = new AtomicInteger(0);
asyncExecutorService = new TransmittableThreadPoolExecutor(
3,
3,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(16),
new ThreadFactory()
@Override
public Thread newThread(Runnable r)
return new Thread(r, "async-exec-" + threadNumber.getAndIncrement());
,
new RejectedExecutionHandler()
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
if (!executor.isShutdown())
try
//尝试阻塞式加入任务队列
executor.getQueue().put(r);
catch (InterruptedException e)
//保持线程的中端状态
Thread.currentThread().interrupt();
);
@Override
public void onApplicationEvent(ContextRefreshedEvent event)
init();
使用方法
开启异步调用:
ExecutorUtils.getAsyncExecutorService().submit(() -> testService.doSomething());
注意:需要提前在过滤器或拦截器中解析出用户信息并设置到CurrentUser类中,或者提前在主线程中设置用户信息。然后可以在主线程、子线程直接获取用户信息:
CurrentUser.UserInfo userInfo = CurrentUser.get();
其他解决办法
使用阿里的开源工具:transmittable-thread-local
以上是关于使用线程池时,多线程之间上下文参数传递失效解决办法的主要内容,如果未能解决你的问题,请参考以下文章