一个入门级的利用线程池查询优化
Posted 汪小哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一个入门级的利用线程池查询优化相关的知识,希望对你有一定的参考价值。
1.背景
一个简单的查询服务的优化,“并行”和“串行”,将一批查询到的任务进行for each的组装,组装数据会调用各种各样的服务(dubbo、redis、mysql)等等,这种情况下随意业务任务的增多、整体查询服务的时间线性增加,因此想着法子将“串行”的服务变为并行的,全部处理完成后将结果返回即可。
任务信息
/**
* 任务信息
*/
@Data
public static class TaskInfo
private Integer taskId;
// get set ...
public TaskInfo(Integer taskId)
this.taskId = taskId;
/**
* 任务的结果
*/
@Data
public static class TaskResult
private Integer resultId;
// get set ...
public TaskResult(Integer resultId)
this.resultId = resultId;
2、并行转换为串行
2.1 利用parallelStream
直接使用parallelStream转换为并行流、使用 ForkJoinPool commonpool,不用使用等待CountDownLatch等等待线程执行完了之后再返回天然的支持。
public static void taskParallelStream()
List<TaskInfo> taskInfoList = Lists.newArrayList();
for (int i = 0; i < 100; i++)
taskInfoList.add(new TaskInfo(i));
List<TaskResult> results = taskInfoList.parallelStream().map(taskInfo ->
// todo task ....
return new TaskResult(taskInfo.getTaskId());
).collect(Collectors.toList());
感觉不好的地方:
- 使用公用的commonpool 可能被其他的业务占用、不方便管理池化信息.
- 如果没有特殊配置parallelism可能导致优化后没有效果。
可以通过 启动入口程序或者 启动参数增加并行度信息,默认最小为1.
- threadlocal 等信息、httprequest 执行前需要处理。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "72");
private static ForkJoinPool makeCommonPool()
int parallelism = -1;
ForkJoinWorkerThreadFactory factory = null;
UncaughtExceptionHandler handler = null;
try // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
String hp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
if (pp != null)
parallelism = Integer.parseInt(pp);
if (fp != null)
factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
catch (Exception ignore)
if (factory == null)
if (System.getSecurityManager() == null)
factory = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
factory = new InnocuousForkJoinWorkerThreadFactory();
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
2.2 直接使用线程池+CountDownLatch
其实生产过程中 会有一些非常多的线程上下文需要处理,比如httprequest 前端请求携带的信息。threadlocal里面的用户信息、mdc 日志信息等等 不能因为使用线程池化了之后导致信息丢失,要进行携带和传递过去。
2.2.1 定义任务上下文
为什么需要一个任务上下文、定义一个信息的载体、增加一些通用的领域概念,比如统一的处理httprequest、mdc等等信息方便在构造TaskContext之前获取当前执行线程的一些信息。
/**
* 执行任务信息的载体、请求、结果
*
* @param <T>
* @param <R>
*/
@Data
@Slf4j
public static class TaskContext<T, R>
/**
* 任务信息
*/
private T taskInfo;
/**
* 需要执行的函数
*/
private Function<T, R> function;
/**
* 执行存储的结果
*/
private R result;
// other userinfo、tenant info and so on
/**
* request 请求信息
*/
private RequestAttributes requestAttributes;
/**
* 日志mdc
*/
private Map<String, String> mdc;
/**
* 执行完成 减少计数器
*/
private CountDownLatch countDownLatch;
public TaskContext(T taskInfo, Function<T, R> function, CountDownLatch countDownLatch)
this.taskInfo = taskInfo;
this.function = function;
this.requestAttributes = RequestContextHolder.getRequestAttributes();
this.mdc = MDC.getCopyOfContextMap();
this.countDownLatch = countDownLatch;
// mdc 、request 信息 and 其他的 thread local的信息你都可以处理
this.initMdc();
this.initRequestContext();
private void initMdc()
Map<String, String> mdc = MDC.getCopyOfContextMap();
this.mdc = mdc;
private void initRequestContext()
this.requestAttributes = RequestContextHolder.getRequestAttributes();
/**
* 执行前插入
*/
public void insertContext()
MDC.setContextMap(mdc);
RequestContextHolder.setRequestAttributes(this.getRequestAttributes());
/**
* 执行完成后清除 清除请求信息
*/
public void clearContext()
countDownLatch.countDown();
MDC.clear();
RequestContextHolder.resetRequestAttributes();
2.2.2 定义允许线程的载体模板
方便通用化处理任务流转需要处理的一些业务问题,这样信息入口、信息结果、处理函数等等都通过TaskContext传递进来和Runnable 无关化处理。
/**
* 定义运行任务线程的载体 模板
*
* @param <T>
* @param <R>
*/
@Data
@Slf4j
public static class TaskRunnable<T, R> implements Runnable
private TaskContext<T, R> taskContext;
public TaskRunnable(TaskContext<T, R> taskContext)
this.taskContext = taskContext;
@Override
public void run()
try
// 1、插入构造 线程上下文 http信息、mdc 等等
taskContext.insertContext();
// 执行任务
taskContext.setResult(taskContext.getFunction().apply(taskContext.getTaskInfo()));
catch (Exception e)
log.info("task run error", e);
finally
// 2 、清理信息
taskContext.clearContext();
2.4 整体衔接如何操作
CountDownLatch+线程池,“串行”处理为“并行”,至于线程池的大小根据业务自己调整即可。
public static void taskExecutor() throws InterruptedException
List<TaskInfo> taskInfoList = Lists.newArrayList();
for (int i = 0; i < 100; i++)
taskInfoList.add(new TaskInfo(i));
ExecutorService executor = new ThreadPoolExecutor(10, 25, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(200),
(r) -> new Thread(r, "task-search-thread"),
new ThreadPoolExecutor.CallerRunsPolicy());
CountDownLatch latch = new CountDownLatch(taskInfoList.size());
List<TaskContext<TaskInfo, TaskResult>> taskContexts = Lists.newArrayList();
for (int i = 0; i < taskInfoList.size(); i++)
TaskContext<TaskInfo, TaskResult> taskContext = new TaskContext<>(taskInfoList.get(0), (taskInfo) ->
// 执行任务的函数
return new TaskResult(taskInfo.getTaskId());
, latch);
taskContexts.add(taskContext);
executor.submit(new TaskRunnable<TaskInfo, TaskResult>(taskContext));
// 等等任务执行完成
latch.await();
List<TaskResult> taskResult = taskContexts.parallelStream().map(TaskContext::getResult).collect(Collectors.toList());
3、收获
一个简单的优化task查询,加速执行的速度,整体思路比较简单,但是基于上下文threadlocal等信息丢失、清理等等需要值得关注。
以上是关于一个入门级的利用线程池查询优化的主要内容,如果未能解决你的问题,请参考以下文章
172SpringBoot2的一个利用CountDownLatch和线程池优化查询接口执行效率的例子