Java基于线程池的独立任务并发执行器
Posted lyloou
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java基于线程池的独立任务并发执行器相关的知识,希望对你有一定的参考价值。
目的:
对于多个独立的任务,可以以并发的方式执行任务,以提高 CPU 利用率,提高处理效率。
思路
在一个线程池中,开启指定数量的线程,每个线程从任务队列中获取任务执行。
执行的过程中,判断当前线程是否在执行任务的状态,如果没有执行任务,取一条任务执行,如果正在执行,则跳过,下轮再判断。
在所有任务执行完后,关闭线程池。
需要注意的是数据结构的选择,须选择并发类的数据结构,不然可能出现阻塞,死锁等情况。
(具体逻辑参考源码)
示例
/**
* 并发执行器示例
*/
public class ConcurrentExecutorTest
/**
* 测试
*/
public static void main(String[] args)
for (int i = 0; i < 100; i++)
test();
private static void test()
Map<String, String> paramMap = new LinkedHashMap<>();
for (int i = 0; i < 10; i++)
paramMap.put("key:" + i, "value:" + i);
final ConcurrentExecutor<String, String, Integer> executor = new ConcurrentExecutor<>(5, paramMap,
(k, v) ->
ThreadUtil.sleep(10);
System.out.println(Thread.currentThread().getName() + "-" + v);
final int abs = Math.abs(Objects.hash(v));
if (abs % 3 == 0)
int i = 1 / 0;
return abs;
);
executor.execute();
System.out.println("success result: " + executor.getSuccessResultMap());
System.out.println("error result: " + executor.getErrorResultMap());
测试结果
pool-1-thread-1-value:0
pool-1-thread-2-value:1
pool-1-thread-4-value:3
pool-1-thread-3-value:2
pool-1-thread-3-value:8
pool-1-thread-1-value:5
pool-1-thread-5-value:4
pool-1-thread-2-value:6
pool-1-thread-4-value:7
pool-1-thread-3-value:9
success result: key:2=231604360, key:0=231604358, key:6=231604364, key:5=231604363, key:3=231604361, key:9=231604367, key:8=231604366
error result: key:1=java.lang.ArithmeticException: / by zero, key:4=java.lang.ArithmeticException: / by zero, key:7=java.lang.ArithmeticException: / by zero
源码
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.BooleanUtil;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.BiFunction;
/**
* 并发执行器
* <p>
* 适用场景:每个任务是独立的,不耦合的
*
* @author lilou
* @since 2022/6/9 9:05
*/
public class ConcurrentExecutor<K, V, R>
private final Map<K, V> paramMap;
private final Map<K, R> successResultMap;
private final Map<K, Throwable> errorResultMap;
private final Set<K> runningKeySet;
private final Queue<K> candidateKeyQueue;
private int maxThreadNum;
private final ExecutorService executorService;
private final BiFunction<K, V, R> consumer;
private final Map<Integer, Boolean> currentIndexThreadRunningStatusMap;
public ConcurrentExecutor(int maxThreadNum, Map<K, V> paramMap, BiFunction<K, V, R> consumer)
Assert.notNull(paramMap, "paramMap不可为空");
Assert.isTrue(maxThreadNum > 0, "maxThreadNum不可小于1");
final int paramSize = paramMap.size();
// 如果需要处理的数据小于 maxThreadNum 时,用这个小数
this.maxThreadNum = Math.min(maxThreadNum, paramSize);
// 线程池数量不能比1小
this.maxThreadNum = Math.max(this.maxThreadNum, 1);
// tips: 如果错误地使用 this.paramMap = paramMap; 多测试几遍会发现,偶尔会陷入了阻塞
this.paramMap = Collections.synchronizedMap(paramMap);
this.candidateKeyQueue = new ConcurrentLinkedQueue<>(paramMap.keySet());
this.runningKeySet = new ConcurrentHashSet<>(paramSize);
this.consumer = consumer;
this.executorService = Executors.newWorkStealingPool(this.maxThreadNum);
this.currentIndexThreadRunningStatusMap = new ConcurrentHashMap<>(this.maxThreadNum);
this.successResultMap = Collections.synchronizedMap(new LinkedHashMap<>(this.paramMap.size()));
this.errorResultMap = Collections.synchronizedMap(new LinkedHashMap<>());
public void execute()
while (CollUtil.isNotEmpty(paramMap))
// 最多同时有 maxRunningThreadNumber 同时消费 taskMap 中的数据
for (int i = 0; i < this.maxThreadNum; i++)
int currentIndex = i;
// 没执行完,不要执行
final Boolean isRunning = currentIndexThreadRunningStatusMap.getOrDefault(currentIndex, false);
if (BooleanUtil.isTrue(isRunning))
continue;
// 每个线程只处理和自己相关的
final K candidateKey = pickCandidateKey();
// 当前没有对应key的任务
if (Objects.isNull(candidateKey))
continue;
executorService.submit(() ->
try
currentIndexThreadRunningStatusMap.put(currentIndex, true);
final V data = paramMap.get(candidateKey);
final R result = consumer.apply(candidateKey, data);
successResultMap.put(candidateKey, result);
catch (Exception e)
errorResultMap.put(candidateKey, e);
finally
paramMap.remove(candidateKey);
candidateKeyQueue.remove(candidateKey);
currentIndexThreadRunningStatusMap.remove(currentIndex);
);
executorService.shutdown();
private K pickCandidateKey()
for (K candidateKey : candidateKeyQueue)
if (!runningKeySet.contains(candidateKey))
runningKeySet.add(candidateKey);
return candidateKey;
return null;
public Map<K, R> getSuccessResultMap()
return successResultMap;
public Map<K, Throwable> getErrorResultMap()
return errorResultMap;
以上是关于Java基于线程池的独立任务并发执行器的主要内容,如果未能解决你的问题,请参考以下文章