在java中并行化任务的最简单方法是啥?
Posted
技术标签:
【中文标题】在java中并行化任务的最简单方法是啥?【英文标题】:What is the easiest way to parallelize a task in java?在java中并行化任务的最简单方法是什么? 【发布时间】:2011-01-02 05:33:51 【问题描述】:假设我有这样的任务:
for(Object object: objects)
Result result = compute(object);
list.add(result);
并行化每个 compute() 的最简单方法是什么(假设它们已经可并行化)?
我不需要与上述代码严格匹配的答案,只是一个一般性的答案。但如果您需要更多信息:我的任务是 IO 绑定的,这是针对 Spring Web 应用程序的,这些任务将在 HTTP 请求中执行。
【问题讨论】:
第二行应该是Result result = compute(object);
吗?
【参考方案1】:
我建议您查看ExecutorService。
特别是这样的:
ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects)
Callable<Result> c = new Callable<Result>()
@Override
public Result call() throws Exception
return compute(object);
;
tasks.add(c);
List<Future<Result>> results = EXEC.invokeAll(tasks);
请注意,如果 objects
是一个大列表,则使用 newCachedThreadPool
可能会很糟糕。缓存的线程池可以为每个任务创建一个线程!您可能想要使用newFixedThreadPool(n)
,其中 n 是合理的(例如您拥有的内核数量,假设 compute()
受 CPU 限制)。
以下是实际运行的完整代码:
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorServiceExample
private static final Random PRNG = new Random();
private static class Result
private final int wait;
public Result(int code)
this.wait = code;
public static Result compute(Object obj) throws InterruptedException
int wait = PRNG.nextInt(3000);
Thread.sleep(wait);
return new Result(wait);
public static void main(String[] args) throws InterruptedException,
ExecutionException
List<Object> objects = new ArrayList<Object>();
for (int i = 0; i < 100; i++)
objects.add(new Object());
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object : objects)
Callable<Result> c = new Callable<Result>()
@Override
public Result call() throws Exception
return compute(object);
;
tasks.add(c);
ExecutorService exec = Executors.newCachedThreadPool();
// some other exectuors you could try to see the different behaviours
// ExecutorService exec = Executors.newFixedThreadPool(3);
// ExecutorService exec = Executors.newSingleThreadExecutor();
try
long start = System.currentTimeMillis();
List<Future<Result>> results = exec.invokeAll(tasks);
int sum = 0;
for (Future<Result> fr : results)
sum += fr.get().wait;
System.out.println(String.format("Task waited %d ms",
fr.get().wait));
long elapsed = System.currentTimeMillis() - start;
System.out.println(String.format("Elapsed time: %d ms", elapsed));
System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
finally
exec.shutdown();
【讨论】:
这个有c#版本吗? 还可以查看 Executors,它作为各种类型的 executor 服务的工厂。 @Malfist 在 C# 中有一些任务(对于即将到来的 .net 4 来说很好)使所有这些变得轻而易举:)。并且在 3.5 中有委托/lambdas 和线程、函数、线程启动等来完成 @Malfist,我知道这是一个旧评论,但 C# 现在有 Parallel.ForEach 和 Task Parallels Library - aka TPL。它们非常完整。【参考方案2】:使用 Java8 及更高版本,您可以在集合上使用 parallelStream 来实现此目的:
List<T> objects = ...;
List<Result> result = objects.parallelStream().map(object ->
return compute(object);
).collect(Collectors.toList());
注意:结果列表的顺序可能与对象列表中的顺序不匹配。
此 *** 问题how-many-threads-are-spawned-in-parallelstream-in-java-8 中提供了如何设置正确线程数的详细信息
【讨论】:
在我看来这是代码异味。您正在使用 parallelStream 阻止所有其他代码。在测试或小型应用程序中,我可能没问题,但在大型服务器上,这可能是灾难的根源。 流是为数据并行而不是任务并行而设计的。见***.com/a/23370799/208288。【参考方案3】:可以简单地创建几个线程并得到结果。
Thread t = new Mythread(object);
if (t.done())
// get result
// add result
编辑:我认为其他解决方案更酷。
【讨论】:
【参考方案4】:如需更详细的答案,请阅读Java Concurrency in Practice 并使用java.util.concurrent。
【讨论】:
这应该是一个内容伴侣【参考方案5】:这是我在自己的项目中使用的东西:
public class ParallelTasks
private final Collection<Runnable> tasks = new ArrayList<Runnable>();
public ParallelTasks()
public void add(final Runnable task)
tasks.add(task);
public void go() throws InterruptedException
final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
try
final CountDownLatch latch = new CountDownLatch(tasks.size());
for (final Runnable task : tasks)
threads.execute(new Runnable()
public void run()
try
task.run();
finally
latch.countDown();
);
latch.await();
finally
threads.shutdown();
// ...
public static void main(final String[] args) throws Exception
ParallelTasks tasks = new ParallelTasks();
final Runnable waitOneSecond = new Runnable()
public void run()
try
Thread.sleep(1000);
catch (InterruptedException e)
;
tasks.add(waitOneSecond);
tasks.add(waitOneSecond);
tasks.add(waitOneSecond);
tasks.add(waitOneSecond);
final long start = System.currentTimeMillis();
tasks.go();
System.err.println(System.currentTimeMillis() - start);
在我的双核盒子上打印了超过 2000 个。
【讨论】:
【参考方案6】:您可以使用ThreadPoolExecutor。下面是示例代码:http://programmingexamples.wikidot.com/threadpoolexecutor(这里太长了,这里就不放了)
【讨论】:
【参考方案7】:Fork/Join 的并行数组是一种选择
【讨论】:
【参考方案8】:我要提到一个执行器类。下面是一些示例代码,您可以放置在 executor 类中。
private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);
private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();
public void addCallable(Callable<Object> callable)
this.callableList.add(callable);
public void clearCallables()
this.callableList.clear();
public void executeThreads()
try
threadLauncher.invokeAll(this.callableList);
catch (Exception e)
// TODO Auto-generated catch block
e.printStackTrace();
public Object[] getResult()
List<Future<Object>> resultList = null;
Object[] resultArray = null;
try
resultList = threadLauncher.invokeAll(this.callableList);
resultArray = new Object[resultList.size()];
for (int i = 0; i < resultList.size(); i++)
resultArray[i] = resultList.get(i).get();
catch (Exception e)
// TODO Auto-generated catch block
e.printStackTrace();
return resultArray;
然后要使用它,您需要调用执行器类来填充和执行它。
executor.addCallable( some implementation of callable) // do this once for each task
Object[] results = executor.getResult();
【讨论】:
一组作业没有包装类总是让我很恼火【参考方案9】:一种巧妙的方法是利用 ExecutorCompletionService。
假设您有以下代码(如您的示例):
public static void main(String[] args)
List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
List<List<Character>> list = new ArrayList<>();
for (char letter : letters)
List<Character> result = computeLettersBefore(letter);
list.add(result);
System.out.println(list);
private static List<Character> computeLettersBefore(char letter)
return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
现在要并行执行任务,您需要做的就是创建由线程池支持的 ExecutorCompletionService。然后提交任务并阅读结果。由于 ExecutorCompletionService 在后台使用 LinkedBlockingQueue,结果一旦可用就可以获取(如果您运行代码,您会注意到结果的顺序是随机的):
public static void main(String[] args) throws InterruptedException, ExecutionException
final ExecutorService threadPool = Executors.newFixedThreadPool(3);
final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);
final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
List<List<Character>> list = new ArrayList<>();
for (char letter : letters)
completionService.submit(() -> computeLettersBefore(letter));
// NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
for (char letter : letters)
final List<Character> result = completionService.take().get();
list.add(result);
threadPool.shutdownNow(); // NOTE: for safety place it inside finally block
System.out.println(list);
private static List<Character> computeLettersBefore(char letter)
return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
【讨论】:
以上是关于在java中并行化任务的最简单方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
在 Swift 中实例化一个新的 Codable 对象的最简单方法是啥?