concurrent.futures模块

Posted sunshinekimi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了concurrent.futures模块相关的知识,希望对你有一定的参考价值。

class concurrent.futures.Executor

Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。

我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。

Future可以把它理解为一个在未来完成的操作,这是异步编程的基础,传统编程模式下比如我们操作queue.get的时候,在等待返回结果之前会产生阻塞,cpu不能让出来做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

1.多线程ThreadPoolExecutor

from concurrent.futures  import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED
import requests
import time
urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
def load_url(url, timeout):
    print(f"{url} start 时间 %s"%time.asctime())
    s=requests.request("get",url, timeout=timeout)
    print(f"{url} end时间 %s"%time.asctime())
    return str(s.json())
wokers=ThreadPoolExecutor(max_workers=5)
tasks=[wokers.submit(load_url,i,timeout=10) for i in  urls]

for task in as_completed(tasks):
    print(task.result())

 excutor对象互相引用引发死锁

ThreadPoolExecutorExecutor使用线程池异步执行调用子类。

当与之关联的可调用对象Future等待另一个对象的结果时,就会发生死锁Future例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:excutor 调用woker不足引发无法返回

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

 2.as_completed

as_completed()方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。

参数是任务(submit的返回值)列表

例子见上。

3.

wait

wait方法可以让主线程阻塞,直到满足设定的要求。

wait方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件return_when默认为ALL_COMPLETED,表明要等待所有的任务都结束。等待条件还可以设置为FIRST_COMPLETED,表示第一个任务完成就停止等待。

from concurrent.futures  import ThreadPoolExecutor,ALL_COMPLETED,ProcessPoolExecutor,wait,as_completed,FIRST_COMPLETED
import requests
import time
urls = ["http://127.0.0.1:8000/index", "http://127.0.0.1:8000/stuTable/"]
def load_url(url, timeout):
    print(f"{url} start 时间 %s"%time.asctime())
    s=requests.request("get",url, timeout=timeout)
    print(f"{url} end时间 %s"%time.asctime())
    return str(s.json())
wokers=ThreadPoolExecutor(max_workers=5)
tasks=[wokers.submit(load_url,i,timeout=10) for i in  urls]

# for task in as_completed(tasks):
#     print(task.result())

res=wait(tasks,timeout=10,return_when=FIRST_COMPLETED)
print(res.done)
for i in tasks:
    print(i.result(),i.done(),i.cancelled())

  

结果分析:

return_when=FIRST_COMPLETED

http://127.0.0.1:8000/index start 时间 Sat Dec 14 12:17:48 2019
http://127.0.0.1:8000/stuTable/ start 时间 Sat Dec 14 12:17:48 2019
http://127.0.0.1:8000/index end时间 Sat Dec 14 12:17:48 2019
{<Future at 0x33a67b0 state=finished returned str>}
{‘user‘: ‘test001‘, ‘msg‘: ‘this is test index view ‘} True False
http://127.0.0.1:8000/stuTable/ end时间 Sat Dec 14 12:17:51 2019
{‘A‘: 888, ‘NN‘: 899} True False

第一个完成后就直接返回了完成的对象,即使后面通过打印获取到后完成的task的结果,

concurrent.futures.waitfstimeout = Nonereturn_when = ALL_COMPLETED 

等待fs给定Future实例(可能由其他Executor实例创建 完成。返回一组命名的2元组。第一组名为,包含在等待完成之前完成的期货(完成或取消的期货)。第二组名为,包含未完成的期货(待定或正在运行的期货)。donenot_done

超时可用于控制返回之前等待的最大秒数。 超时可以是int或float。如果未指定timeoutNone,则等待时间没有限制。

return_when指示该函数何时应返回。它必须是以下常量之一:

不变

描述

FIRST_COMPLETED

以后完成或取消操作时,该函数将返回。

FIRST_EXCEPTION

当将来通过引发异常结束时,该函数将返回。如果没有未来引发例外,则等同于 ALL_COMPLETED

ALL_COMPLETED

当所有期货结束或被取消时,该函数将返回。

concurrent.futures.as_completedfstimeout = None 

返回fs给定Future实例(可能由不同的Executor实例创建的迭代器,该迭代器将在完成时生成期货(完成或取消的期货)。fs给定的任何重复的期货将被退回一次。之前完成的任何期货 都将首先产生。返回的迭代器引发if 调用,并且从原始调用到超时后,结果不可用。 超时可以是int或float。如果 未指定timeout,则等待时间没有限制。as_completed()concurrent.futures.TimeoutError__next__()as_completed()None

 

4.ProcessPoolExecutor对象

ProcessPoolExecutor类是Executor使用的过程池异步执行调用子类。 ProcessPoolExecutor使用该multiprocessing模块,它可以避开全局解释器锁定,但也意味着只能执行和返回可拾取对象。

__main__模块必须可由工作程序子进程导入。这意味着ProcessPoolExecutor在交互式解释器中将不起作用。

从可调用对象提交到的调用ExecutorFuture方法ProcessPoolExecutor将导致死锁。

concurrent.futures.ProcessPoolExecutormax_workers = Nonemp_context = Noneinitializer = Noneinitargs =()

Executor使用最多max_workers进程池异步执行调用子类如果max_workersNone或者没有给出,将默认为机器上的处理器数量。如果max_workers小于或等于0,则将ValueError 引发a。在Windows上,max_workers必须等于或小于61如果不是,ValueError则将被引发。如果max_workersNone,那么61即使有更多处理器可用,默认选择也将是最多。 mp_context可以是多处理上下文,也可以是“无”。它将用来发动工人。如果mp_contextNone 如果未指定,则使用默认的多处理上下文。

初始化程序是一个可选的可调用对象,它在每个工作进程开始时被调用;initargs是传递给初始化程序的参数的元组。如果初始化器引发异常,则所有当前暂挂的作业都会引发BrokenProcessPool,以及任何尝试向池中提交更多作业的尝试。

在版本3.3中进行了更改:当其中一个工作进程突然终止时, BrokenProcessPool现在会引发错误。以前,行为是不确定的,但是对执行器或其期货的操作通常会冻结或死锁。

from multiprocessing import Process,Lock

def  func_mutiprocess(i):
     def ss():
         l = []
         for i in range(1000000000000000000):
             l.append(i)
     loc=Lock()
     loc.acquire()
     ss()
     print(f"this process {i},{time.asctime()}")
     loc.release()

def process_input():
    pool=[]
    pools=[Process(target=func_mutiprocess,args=(i,) ) for i in range(multiprocessing.cpu_count())]
    for p  in pools:
        p.start()
        pool.append(p)
    for j in pool:
        j.join()

if __name__ == ‘__main__‘:

   while True:
        try:
         process_input()
        except Exception as e:
            pass

  这个一个多进程引发内存占用100%爆掉的反面案例,原因是利用多进程创建超大列表容器

关于Future:

所述Future类封装一个可调用的异步执行。 Future实例由创建Executor.submit()

concurrent.futures.Future

封装可调用对象的异步执行。 Future 实例是由Executor.submit()测试人员创建的,除了测试外,不应直接创建。

cancel

尝试取消呼叫。如果该调用当前正在执行或正在运行,并且无法取消,则该方法将返回 False,否则,该调用将被取消并且该方法将返回True

cancelled

True如果呼叫已成功取消,则返回

running

True如果当前正在执行该调用且无法取消该调用,则返回

done

返回True如果调用成功取消或结束运行。

resulttimeout = None 

返回调用返回的值。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将 concurrent.futures.TimeoutError引发a。超时可以是int或float。如果未指定timeoutNone,则等待时间没有限制。

如果在完成之前取消了未来,CancelledError 则将被提出。

如果调用引发,则此方法将引发相同的异常。

exceptiontimeout = None 

返回调用引发的异常。如果呼叫尚未完成,则此方法将等待超时秒数。如果呼叫未在超时秒内完成,则将 concurrent.futures.TimeoutError引发a。 超时可以是int或float。如果未指定timeoutNone,则等待时间没有限制。

如果在完成之前取消了未来,CancelledError 则将被提出。

如果呼叫完成而没有加注,None则返回。

add_done_callbackfn 

将可调用的fn附加到将来。 当取消未来或完成运行时,将调用fn,并将future作为唯一参数。

添加的可调用对象按添加顺序被调用,并且始终在属于添加它们的进程的线程中调用。如果可调用对象引发Exception子类,则将其记录并忽略。如果callable引发BaseException子类,则该行为未定义。

如果将来已经完成或被取消,则将立即调用fn

以下Future方法适用于单元测试和 Executor实现。

set_running_or_notify_cancel

Executor在执行与Future和单元测试相关的工作之前实现应调用此方法

如果方法返回FalseFuture则取消,Future.cancel()即被调用并返回True等待Future完成的所有线程(即通过 as_completed()wait())都将被唤醒。

如果该方法返回,True则该Future不会被取消并已处于运行状态,即对的调用 Future.running()将返回True

该方法只能被调用一次,不能在调用之后 Future.set_result()Future.set_exception()已经被调用。

set_result结果

设置与Futureto 结果关联的工作结果

此方法仅应由Executor实现和单元测试使用。

在版本3.8中更改:concurrent.futures.InvalidStateError如果Future已经完成引发此方法 

set_exception例外

设置与相关的工作结果Future的 异常Exception

此方法仅应由Executor实现和单元测试使用。

在版本3.8中更改:concurrent.futures.InvalidStateError如果Future已经完成引发此方

以上是关于concurrent.futures模块的主要内容,如果未能解决你的问题,请参考以下文章

concurrent.futures模块

为啥我不能在类方法中使用 python 模块 concurrent.futures?

Python并发编程之线程池/进程池--concurrent.futures模块

python并发之concurrent.futures

线程-线程池-concurrent.futures模块

线程-线程池-concurrent.futures模块