Java 线程池和可运行对象创建可运行对象
Posted
技术标签:
【中文标题】Java 线程池和可运行对象创建可运行对象【英文标题】:Java threadpools and runnables creating runnables 【发布时间】:2013-08-07 21:20:13 【问题描述】:请耐心等待,因为我在多线程编程方面并不是很精通......
我目前正在构建一个系统,该系统将 ThreadPool ExecutorService 用于各种可运行对象。这很简单。但是,我正在考虑让可运行对象本身根据原始可运行对象中发生的情况产生额外的可运行对象的可能性(即,如果成功,则执行此操作,如果失败,则执行此操作等,因为某些任务必须先于其他任务完成执行)。需要注意的是,不需要将这些任务的结果通知主线程,尽管它对于处理异常可能很方便,即,如果无法联系到外部服务并且所有线程都因此抛出异常,那么停止提交任务并定期检查外部服务,直到它恢复。这不是完全必要的,但会很好。
即,提交任务 A。任务 A 做了一些事情。如果一切顺利,任务 A 将执行任务 B。如果某些事情不正常或抛出异常,则执行任务 C。每个子任务也可能有额外的任务,但只有几个层次。我宁愿在单个任务中做这样的事情,也不愿在单个任务中使用大而复杂的条件,因为这种方法可以提供更大的灵活性。
但是,我不确定这会如何影响线程池。我假设从池中的线程内创建的任何其他线程都将存在于池之外,因为它们本身并未直接提交到池中。这是一个正确的假设吗?如果是这样,这可能是一个坏主意(好吧,如果不是,它可能无论如何都不是一个好主意),因为它可能会导致原始线程完成时产生更多线程,并且在线程从较早的任务仍在进行中(并且可能比其他任务持续时间更长)。
我还考虑将这些实现为 Callables,并将响应对象放在返回的 Future 中,然后根据响应将适当的 Callable 添加到线程池中。但是,这会将所有操作绑定到主线程,这似乎是一个不必要的瓶颈。我想我可以将一个 Runnable 放入池中,该池本身处理 Callable 和后续操作的执行,但是我得到的线程数是原来的两倍。
我是在正确的轨道上还是完全脱离了轨道?
【问题讨论】:
【参考方案1】:我从未使用过这个,但它对你很有用:http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
【讨论】:
非常漂亮,但不是我想要的。将工作负载分成更小的块并不是真正需要的,在某种意义上,它更像是一种在池中进行条件任务链接的模式。【参考方案2】:有很多方法可以做你想做的事。您需要小心,不要最终创建太多线程。
以下是一个示例,您可以使用 ExecutorCompletionService 提高效率,也可以使用 Runnable 的。
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadsMakeThreads
public static void main(String[] args)
new ThreadsMakeThreads().start();
public void start()
//Create resources
ExecutorService threadPool = Executors.newCachedThreadPool();
Random random = new Random(System.currentTimeMillis());
int numberOfThreads = 5;
//Prepare threads
ArrayList<Leader> leaders = new ArrayList<Leader>();
for(int i=0; i < numberOfThreads; i++)
leaders.add(new Leader(threadPool, random));
//Get the results
try
List<Future<Integer>> results = threadPool.invokeAll(leaders);
for(Future<Integer> result : results)
System.out.println("Result is " + result.get());
catch (InterruptedException e)
e.printStackTrace();
catch (ExecutionException e)
e.printStackTrace();
threadPool.shutdown();
class Leader implements Callable<Integer>
private ExecutorService threadPool;
private Random random;
public Leader(ExecutorService threadPool, Random random)
this.threadPool = threadPool;
this.random = random;
@Override
public Integer call() throws Exception
int numberOfWorkers = random.nextInt(10);
ArrayList<Worker> workers = new ArrayList<Worker>();
for(int i=0; i < numberOfWorkers; i++)
workers.add(new Worker(random));
List<Future<Integer>> tasks = threadPool.invokeAll(workers);
int result = 0;
for(Future<Integer> task : tasks)
result += task.get();
return result;
class Worker implements Callable<Integer>
private Random random;
public Worker(Random random)
this.random = random;
@Override
public Integer call() throws Exception
return random.nextInt(100);
【讨论】:
【参考方案3】:将任务从其他任务提交到线程池是非常有意义的想法。但恐怕你会想到在单独的线程上运行新任务,那真的会吃掉所有的内存。只需在创建池时设置线程数限制,然后向该线程池提交新任务。
这种方法可以在不同的方向进一步阐述。首先,将任务视为具有接口方法的普通对象,并让这些方法决定是否要将这个对象提交给线程池。这要求每个任务都知道它的线程池——在创建时将其作为参数传递。更方便的是,保持对线程池的引用作为线程局部变量。
您可以轻松地模拟函数式编程:一个对象代表一个函数调用,并且对于每个参数,它都有对应的 set 方法。设置好所有参数后,将对象提交到线程池。
另一个方向是actor编程:任务类只有一个set方法,但可以多次调用,如果前面的参数还没有处理,set方法不会将任务提交到线程池,而是简单的存储它的队列中的参数。 run() 方法处理队列中所有可用的参数,然后返回。
所有这些功能都在数据流库https://github.com/rfqu/df4j 中实现。我故意写它来支持基于任务的并行性。
【讨论】:
在另一个任务中直接执行的任务不是任务,它只是一个方法。改写您的问题,以便任务意味着线程池(可运行或可调用)的一项工作,然后,我可能会理解您想要什么。以上是关于Java 线程池和可运行对象创建可运行对象的主要内容,如果未能解决你的问题,请参考以下文章