如何从多个线程中获取第一个结果并取消剩余的
Posted
技术标签:
【中文标题】如何从多个线程中获取第一个结果并取消剩余的【英文标题】:How to get the first result from multiple threads and cancel remaining 【发布时间】:2016-05-27 11:15:54 【问题描述】:我有请求和多个线程,它们以不同的方式寻找结果,每个线程都应该在某个时候得到一些结果。我需要采取 第一个完成的线程的结果,返回此结果并杀死所有剩余线程。当我返回一些默认结果时,我也有超时..
我能想到两种解决方案,但在我看来,没有一个是“正确的”..
1) 遍历任务,询问是否完成,休眠一会,返回找到的第一个完成的任务..
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.log4j.LogManager;
public class App
public static final ExecutorService executors = Executors.newCachedThreadPool();
public static void main(String[] args)
ArrayList<Future<String>> taskList = new ArrayList<>();
taskList.add(executors.submit(new Task1()));
taskList.add(executors.submit(new Task2()));
String result = null;
long start = System.currentTimeMillis();
long timeout = 1000;
while ((start + timeout) < System.currentTimeMillis())
try
for (Future<String> future : taskList)
if (future.isDone())
result = future.get();
break;
if (result != null)
break;
Thread.sleep(10);
catch (InterruptedException | ExecutionException e)
e.printStackTrace();
if (result == null)
result = "default..";
class Task1 implements Callable<String>
@Override
public String call() throws Exception
// find result in one way
return new String("result..");
class Task2 implements Callable<String>
@Override
public String call() throws Exception
// find result in some other way
return new String("result..");
2) 通过调用future.get(timeout, TimeUnit.MILLISECONDS);
使用另一个线程监控每个任务,然后第一个完成的线程将为所有其他线程调用future.cancel(true);
...
第一个解决方案在我看来是浪费处理器时间,第二个在我看来是浪费线程..
最后,问题是:有没有更好的解决方案?
提前谢谢你
编辑:谢谢大家的回答,我已经用“John H”的回答解决了这个问题:
有一个内置函数可以做到这一点:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAny%28java.util.Collection,%20long,%20java.util.concurrent.TimeUnit%29
这将调用你给它的所有任务,并等待第一个返回答案直到时间限制。如果它们都没有及时返回结果,您可以捕获 TimeoutException 并返回默认值。否则,您可以使用它返回的第一个值,它将负责取消其余任务。
【问题讨论】:
不要使用new String("result");
创建String
。
使用 CompletionService:When should I use a CompletionService over an ExecutorService?
【参考方案1】:
有一个内置函数可以做到这一点:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAny%28java.util.Collection,%20long,%20java.util.concurrent.TimeUnit%29
这将调用你给它的所有任务,并等待第一个返回答案直到时间限制。如果它们都没有及时返回结果,您可以捕获 TimeoutException 并返回默认值。否则,您可以使用它返回的第一个值,它将负责取消其余任务。
【讨论】:
【参考方案2】:共享CountDownLatch
怎么样?
您的主线程将在cdl.await(30, TimeUnit.SECONDS);
等待,工作线程将在完成后调用cdl.countDown();
。
另一个选择是使用共享的Exchanger
,这样您就可以轻松地检索结果。尽管使用交换器的两个工作线程之间交换结果的可能性很小,但一个简单的解决方法是检查工作线程是否检索“dummyString”,因此知道它是获得结果的主线程。
主线程:
myExchanger.exchange("dummyString", 30, TimeUnit.SECONDS);
.
工作线程:
while(!myExchanger.exchange(result).equals("dummyString"));
【讨论】:
【参考方案3】:你可以考虑SynchronousQueue。
主线程启动所有线程,为每个线程提供对队列的引用,然后在队列上执行take
(这将阻塞直到工作线程执行put
)。发现结果的第一个线程发布到队列,释放主线程。
然后主线程遍历所有取消它们的工人。
可以通过只等待超时和put
s 默认值的线程来实现默认值。
这也适用于Runnable
s。
示例代码 - 似乎有效,但 cancel
无效。
public static void main(String[] args)
try
ArrayList<Future<String>> taskList = new ArrayList<>();
BlockingQueue q = new SynchronousQueue();
taskList.add(executors.submit(new Task1(q)));
taskList.add(executors.submit(new Task2(q)));
Object took = q.take();
for (Future<String> task : taskList)
if (!task.isDone())
task.cancel(true);
System.out.println("Got " + took);
catch (InterruptedException ex)
Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
看来cancel
不够用。请参阅here 了解替代方案。
【讨论】:
以上是关于如何从多个线程中获取第一个结果并取消剩余的的主要内容,如果未能解决你的问题,请参考以下文章