python中的concurrent.futures模块

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python中的concurrent.futures模块相关的知识,希望对你有一定的参考价值。

参考技术A concurrent.futures 模块提供了并发执行调用的高级接口

并发可以使用 threads 执行,使用 ThreadPoolExecutor 或 分离的 processes ,使用 ProcessPoolExecutor 。都实现了同一个接口,这个接口在抽象类 Executor 定义

wait 等待 fs 里面所有的 Future 实例(由不同的 Executors 实例创建的)完成。返回两个命名元祖,第一个元祖名为 done ,存放完成的 futures 对象,第二个元祖名为 not_done ,存放未完成的 futures 。
return_when 参数必须是 concurrent.futures 里面定义的常量: FIRST_COMPLETED , FIRST_EXCEPTION , ALL_COMPLETED

返回一个迭代器, yield 那些完成的 futures 对象。 fs 里面有重复的也只可能返回一次。任何 futures 在调用 as_completed() 调用之前完成首先被 yield 。

Future() 封装了可调用对象的异步执行。 Future 实例可以被 Executor.submit() 方法创建。除了测试之外不应该直接创建。 Future 对象可以和异步执行的任务进行交互

1、抽象类,提供异步调用的方法。不能被直接使用,而是通过构建子类。

2、方法

shutdown(wait=True)
给 executor 发信号,使其释放资源,当 futures 完成执行时。已经 shutdown 再调用 submit() 或 map() 会抛出 RuntimeError 。使用 with 语句,就可以避免必须调用本函数

ThreadPoolExecutor 是 Executor 的子类使用线程池来异步执行调用

如果使用不正确可能会造成死锁,所以 submit 的 task 尽量不要调用 executor 和 futures ,否则很容易出现死锁

默认的max_workers是设备的处理器数目*5

ProcessPoolExecutor 使用 multiprocessing 模块,不受 GIL 锁的约束,意味着只有可以 pickle 的对象才可以执行和返回

__main__ 必须能够被工作子进程导入。所以意味着 ProcessPoolExecutor 在交互式解释器下不能工作。

提交给 ProcessPoolExecutor 的可调用方法里面调用 Executor 或 Future 将会形成死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

max_workers 默认是处理器的个数

exception concurrent.futures.CancelledError

exception concurrent.futures.TimeoutError

exception concurrent.futures.process.BrokenProcessPool

java.util.concurrent.Future Basics

Hereby I am starting a series of articles about future concept in programming languages (also known as promises or delays) with a working title: Back to the Future. Futures are very important abstraction, even more these day than ever due to growing demand for asynchronous, event-driven, parallel and scalable systems. In the first article we‘ll discover most basic java.util.concurrent.Future<T> interface. Later on we will jump into other frameworks, libraries or even languages. Future<T> is pretty limited, but essential to understand, ekhm, future parts.

In a single-threaded application when you call a method it returns only when the computations are done (IOUtils.toString() comes from Apache Commons IO):

 
public String downloadContents(URL url) throws IOException {
 
    try(InputStream input = url.openStream()) {
 
        return IOUtils.toString(input, StandardCharsets.UTF_8);
 
    }
 
}
 
 
 
//...
 
 
 
final String contents = downloadContents(new URL("http://www.example.com"));
 

downloadContents() looks harmless1, but it can take even arbitrary long time to complete. Moreover in order to reduce latency you might want to do other, independent processing in the meantime, while waiting for results. In the old days you would start a new Thread and somehow wait for results (shared memory, locks, dreadful wait()/notify() pair, etc.) With Future<T> it‘s much more pleasant:

 
public static Future<String> startDownloading(URL url) {
 
    //...
 
}
 
 
 
final Future<String> contentsFuture = startDownloading(new URL("http://www.example.com"));
 
//other computation
 
final String contents = contentsFuture.get();
 

We will implement startDownloading() soon. For now it‘s important that you understand the principles. startDownloading() does not block, waiting for external website. Instead it returns immediately, returning a lightweight Future<String> object. This object is a promise that String will be available in the future. Don‘t know when, but keep this reference and once it‘s there, you‘ll be able to retrieve it using Future.get(). In other words Future is a proxy or a wrapper around an object that is not yet there. Once the asynchronous computation is done, you can extract it. So what API does Future provide?

Future.get() is the most important method. It blocks and waits until promised result is available (resolved). So if we really need that String, just call get() and wait. There is an overloaded version that accepts timeout so you won‘t wait forever if something goes wild. TimeoutException is thrown if waiting for too long.

In some use cases you might want to peek on the Future and continue if result is not yet available. This is possible with isDone(). Imagine a situation where your user waits for some asynchronous computation and you‘d like to let him know that we are still waiting and do some computation in the meantime:

 
final Future<String> contentsFuture = startDownloading(new URL("http://www.example.com"));
 
while (!contentsFuture.isDone()) {
 
    askUserToWait();
 
    doSomeComputationInTheMeantime();
 
}
 
contentsFuture.get();
 

The last call to contentsFuture.get() is guaranteed to return immediately and not block because Future.isDone() returned true. If you follow the pattern above make sure you are not busy waiting, calling isDone() millions of time per second.

Cancelling futures is the last aspect we have not covered yet. Imagine you started some asynchronous job and you can only wait for it given amount of time. If it‘s not there after, say, 2 seconds, we give up and either propagate error or work around it. However if you are a good citizen, you should somehow tell this future object: I no longer need you, forget about it. You save processing resources by not running obsolete tasks. The syntax is simple:

 
contentsFuture.cancel(true);    //meh...
 

We all love cryptic, boolean parameters, aren‘t we? Cancelling comes in two flavours. By passing false to mayInterruptIfRunning parameter we only cancel tasks that didn‘t yet started, when the Future represents results of computation that did not even began. But if our Callable.call() is already in the middle, we let it finish. However if we pass true, Future.cancel() will be more aggressive, trying to interrupt already running jobs as well. How? Think about all these methods that throw infamous InterruptedException, namely Thread.sleep(), Object.wait(), Condition.await(), and many others (including Future.get()). If you are blocking on any of such methods and someone decided to cancel your Callable, they will actually throw InterruptedException, signalling that someone is trying to interrupt currently running task.


So we now understand what

Future<T>

is - a place-holder for something, that you will get in the future. It‘s like keys to a car that was not yet manufactured. But how do you actually obtain an instance of

Future<T>

in your application? Two most common sources are thread pools and asynchronous methods (backed by thread pools for you). Thus our

startDownloading()

method can be rewritten to:

 
private final ExecutorService pool = Executors.newFixedThreadPool(10);
 
 
 
public Future<String> startDownloading(final URL url) throws IOException {
 
    return pool.submit(new Callable<String>() {
 
        @Override
 
        public String call() throws Exception {
 
            try (InputStream input = url.openStream()) {
 
                return IOUtils.toString(input, StandardCharsets.UTF_8);
 
            }
 
        }
 
    });
 
}
 

A lot of syntax boilerplate, but the basic idea is simple: wrap long-running computations in

Callable<String>

and

submit()

them to a thread pool of 10 threads. Submitting returns some implementation of

Future<String>

, most likely somehow linked to your task and thread pool. Obviously your task is not executed immediately. Instead it is placed in a queue which is later (maybe even much later) polled by thread from a pool. Now it should be clear what these two flavours of

cancel()

mean - you can always cancel task that still resides in that queue. But cancelling already running task is a bit more complex.

 

Another place where you can meet

Future

is Spring and EJB. For example in Spring framework you can simply annotate your method with @Async:

 
@Async
 
public Future<String> startDownloading(final URL url) throws IOException {
 
    try (InputStream input = url.openStream()) {
 
        return new AsyncResult<>(
 
                IOUtils.toString(input, StandardCharsets.UTF_8)
 
        );
 
    }
 
}
 

 

Notice that we simply wrap our result in AsyncResult implementing

Future

. But the method itself does not deal with thread pool or asynchronous processing. Later on Spring will proxy all calls to

startDownloading()

and run them in a thread pool. The exact same feature is available through @Asynchronous annotation in EJB.

 

So we learned a lot about

java.util.concurrent.Future

. Now it‘s time to admit - this interface is quite limited, especially when compared to other languages. More on that later.

 











以上是关于python中的concurrent.futures模块的主要内容,如果未能解决你的问题,请参考以下文章

并发实现-Callable/Future 实现返回值控制的线程

95-24-030-Future-ChannelFuture

JUC线程池扩展可回调的Future

Java并发编程-扩展可回调的Future

播放框架错误:类型不匹配 - 发现 scala.concurrent.Future[play.api.mvc.Result] required: play.api.mvc.Result

Scala Future