如何从python中的线程获取返回值?
Posted
技术标签:
【中文标题】如何从python中的线程获取返回值?【英文标题】:How to get the return value from a thread in python? 【发布时间】:2011-10-17 03:54:05 【问题描述】:下面的函数foo
返回一个字符串'foo'
。如何获取线程目标返回的值'foo'
?
from threading import Thread
def foo(bar):
print('hello '.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
上面显示的“一种明显的方法”不起作用:thread.join()
返回None
。
【问题讨论】:
【参考方案1】:我见过的一种方法是将可变对象(例如列表或字典)与索引或其他某种标识符一起传递给线程的构造函数。然后线程可以将其结果存储在该对象的专用槽中。例如:
def foo(bar, result, index):
print 'hello 0'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
如果您真的希望join()
返回被调用函数的返回值,您可以使用Thread
子类来执行此操作,如下所示:
from threading import Thread
def foo(bar):
print 'hello 0'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
由于一些名称修改,这有点麻烦,它访问特定于Thread
实现的“私有”数据结构......但它可以工作。
对于python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
print(type(self._target))
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
【讨论】:
酷,谢谢你的例子!我想知道为什么 Thread 一开始就没有通过处理返回值来实现,这似乎是一个显而易见的事情来支持。 我认为这应该是公认的答案 - OP 要求threading
,而不是尝试不同的库,加上池大小限制引入了一个额外的潜在问题,这发生在我的案例中。
在 python3 上返回 TypeError: __init__() takes from 1 to 6 positional arguments but 7 were given
。有什么办法解决吗?
join
有一个应该传递的超时参数
警告任何想要做第二个的人(_Thread__target
的事情)。你会让任何试图将你的代码移植到 python 3 的人讨厌你,直到他们弄清楚你做了什么(因为使用了在 2 和 3 之间更改的未记录的特性)。好好记录你的代码。【参考方案2】:
FWIW,multiprocessing
模块使用Pool
类为此提供了一个很好的接口。如果你想坚持使用线程而不是进程,你可以使用 multiprocessing.pool.ThreadPool
类作为替代品。
def foo(bar, baz):
print 'hello 0'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
【讨论】:
@JakeBiesinger 我的意思是,我一直在寻找答案,如何从 Thread 获得响应,来到这里,并且接受的答案没有回答所述问题。我区分线程和进程。我知道全局解释器锁,但是我正在处理 I/O 绑定问题,所以线程没问题,我不需要进程。这里的其他答案更好地回答了问题。 @omikron 但是python中的线程不会返回响应,除非您使用启用此功能的子类。在可能的子类中,ThreadPools 是一个不错的选择(选择线程数,使用 map/apply w/sync/async)。尽管是从multiprocess
导入的,但它们与进程无关。
@JakeBiesinger 哦,我是盲人。对不起,我不必要的 cmets。你说的对。我只是假设多处理 = 进程。
如果您有更多线程,请不要忘记将processes=1
设置为多个!
多处理和线程池的问题在于,与基本线程库相比,设置和启动线程要慢得多。它非常适合启动长时间运行的线程,但在需要启动大量短时间运行的线程时会失败。在我看来,此处其他答案中记录的使用“线程”和“队列”的解决方案是后一种用例的更好选择。【参考方案3】:
在 Python 3.2+ 中,stdlib concurrent.futures
模块为 threading
提供了更高级别的 API,包括将工作线程的返回值或异常传递回主线程:
import concurrent.futures
def foo(bar):
print('hello '.format(bar))
return 'foo'
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(foo, 'world!')
return_value = future.result()
print(return_value)
【讨论】:
对于那些想知道这可以通过线程列表来完成的人。futures = [executor.submit(foo, param) for param in param_list]
将保持顺序,退出with
将允许收集结果。 [f.result() for f in futures]
@jayreed1 评论应该有自己的答案,或者应该包含在答案中。非常有用。
哇.. 感谢您的回答,正在为我的代码寻找多处理解决方案,但这有助于我以如此简单的方式做到这一点,@jayreed1 评论让它锦上添花,谢谢大家...
非常感谢,这帮助我解决了在一些非线程安全库中发现的问题。我喜欢你的回答。我的问答:***.com/questions/68982519/…
我以前从未使用过这个库。我是否必须以某种方式关闭线程以便它不会“松动”,或者如果我只使用此处显示的代码,执行器会自动为我处理吗?【参考方案4】:
Jake 的回答很好,但是如果您不想使用线程池(您不知道需要多少线程,但根据需要创建它们),那么在线程之间传输信息的好方法是内置 Queue.Queue 类,因为它提供线程安全。
我创建了以下装饰器以使其以类似于线程池的方式运行:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
那么你只需将它用作:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
装饰函数每次调用时都会创建一个新线程,并返回一个 Thread 对象,该对象包含将接收结果的队列。
更新
自从我发布此答案以来已经有一段时间了,但它仍然获得了意见,所以我想我会更新它以反映我在较新版本的 Python 中执行此操作的方式:
concurrent.futures
模块中添加了 Python 3.2,该模块为并行任务提供了高级接口。它提供了ThreadPoolExecutor
和ProcessPoolExecutor
,所以你可以使用同一个api的线程或进程池。
这个 api 的一个好处是向Executor
提交任务会返回一个Future
对象,该对象将以您提交的可调用对象的返回值完成。
这使得附加queue
对象变得不必要,这大大简化了装饰器:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
如果没有传入,这将使用默认的模块线程池执行器。
用法和之前很相似:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
如果您使用的是 Python 3.4+,那么使用此方法(以及一般的 Future 对象)的一个非常好的特性是可以将返回的 future 包装成带有 asyncio.wrap_future
的 asyncio.Future
。这使得它可以轻松地与协程一起工作:
result = await asyncio.wrap_future(long_task(10))
如果您不需要访问底层的concurrent.Future
对象,您可以在装饰器中包含包装:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
然后,当您需要将 cpu 密集型或阻塞代码从事件循环线程中推送出去时,您可以将其放在一个装饰函数中:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
【讨论】:
我似乎无法让它工作;我收到一条错误消息,指出AttributeError: 'module' object has no attribute 'Lock'
这似乎来自y = long_task(10)
... 想法?
代码没有明确使用 Lock,所以问题可能出在代码的其他地方。您可能想发布一个关于它的新 SO 问题
为什么 result_queue 是实例属性?如果它是一个类属性,这样用户在使用不明确和不明确的@threaded 时不必知道调用 result_queue 会更好吗?
@t88,不确定您的意思,您需要某种方式来访问结果,这意味着您需要知道要调用什么。如果你想让它成为别的东西,你可以继承 Thread 并做你想做的事(这是一个简单的解决方案)。队列需要附加到线程的原因是多个调用/函数有自己的队列
@LeonardoRick 它在 functools 模块中:docs.python.org/3/library/functools.html#functools.wraps【参考方案5】:
另一个不需要更改现有代码的解决方案:
import Queue # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread
def foo(bar):
print 'hello 0'.format(bar) # Python 2.x
#print('hello 0'.format(bar)) # Python 3.x
return 'foo'
que = Queue.Queue() # Python 2.x
#que = Queue() # Python 3.x
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result # Python 2.x
#print(result) # Python 2.x
它也可以很容易地调整为多线程环境:
import Queue # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread
def foo(bar):
print 'hello 0'.format(bar) # Python 2.x
#print('hello 0'.format(bar)) # Python 3.x
return 'foo'
que = Queue.Queue() # Python 2.x
#que = Queue() # Python 3.x
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result # Python 2.x
#print(result) # Python 3.x
【讨论】:
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!')) whats q.put 在这里做什么? Queue.Queue() 确实 que = Queue.Queue() - 创建队列 q.put(foo) - 将 foo() 插入队列 对于Python3,需要改成from queue import Queue
。
这似乎是破坏性最小的方法(无需大幅重构原始代码库)以允许返回值返回到主线程。
@DaniyalWarraich 我刚刚用 Python 3 运行了这两个示例,它们都像魅力一样工作。确保您注释/取消注释相关行。【参考方案6】:
Parris / kindall 的 answer join
/return
答案移植到 Python 3:
from threading import Thread
def foo(bar):
print('hello 0'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
注意,Thread
类在 Python 3 中的实现方式不同。
【讨论】:
join 需要一个超时参数,应该一起传递 文档指出,唯一需要覆盖的方法应该是:__init__() 和 run() docs.python.org/3/library/threading.html#thread-objects【参考方案7】:我发现的大多数答案都很长,需要熟悉其他模块或高级 python 功能,除非他们已经熟悉答案所涉及的所有内容,否则会让人感到困惑。
简化方法的工作代码:
import threading
class ThreadWithResult(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=, *, daemon=None):
def function():
self.result = target(*args, **kwargs)
super().__init__(group=group, target=function, name=name, daemon=daemon)
示例代码:
import time, random
def function_to_thread(n):
count = 0
while count < 3:
print(f'still running thread n')
count +=1
time.sleep(3)
result = random.random()
print(f'Return value of thread n should be: result')
return result
def main():
thread1 = ThreadWithResult(target=function_to_thread, args=(1,))
thread2 = ThreadWithResult(target=function_to_thread, args=(2,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(thread1.result)
print(thread2.result)
main()
解释:
我想显着简化事情,所以我创建了一个ThreadWithResult
类并让它继承自threading.Thread
。 __init__
中的嵌套函数function
调用我们要保存其值的线程函数,并在线程执行完毕后将该嵌套函数的结果保存为实例属性self.result
。
创建 this 的实例与创建 threading.Thread
的实例相同。将要在新线程上运行的函数传递给 target
参数,并将函数可能需要的任何参数传递给 args
参数,并将任何关键字参数传递给 kwargs
参数。
例如
my_thread = ThreadWithResult(target=my_function, args=(arg1, arg2, arg3))
我认为这比绝大多数答案更容易理解,而且这种方法不需要额外的导入!我包含了time
和random
模块来模拟线程的行为,但它们不是实现original question 中要求的功能所必需的。
我知道我是在提出问题后才回答这个问题的,但我希望这可以帮助更多的人!
编辑:我创建了save-thread-result
PyPI package 以允许您访问上面的相同代码并在项目之间重用它(GitHub code is here)。 PyPI 包完全扩展了threading.Thread
类,因此您也可以在ThreadWithResult
类上设置您在threading.thread
上设置的任何属性!
上面的原始答案涵盖了这个子类背后的主要思想,但有关更多信息,请参阅more detailed explanation (from the module docstring) here。
快速使用示例:
pip3 install -U save-thread-result # MacOS/Linux
pip install -U save-thread-result # Windows
python3 # MacOS/Linux
python # Windows
from save_thread_result import ThreadWithResult
# As of Release 0.0.3, you can also specify values for
#`group`, `name`, and `daemon` if you want to set those
# values manually.
thread = ThreadWithResult(
target = my_function,
args = (my_function_arg1, my_function_arg2, ...)
kwargs = my_function_kwarg1: kwarg1_value, my_function_kwarg2: kwarg2_value, ...
)
thread.start()
thread.join()
if getattr(thread, 'result', None):
print(thread.result)
else:
# thread.result attribute not set - something caused
# the thread to terminate BEFORE the thread finished
# executing the function passed in through the
# `target` argument
print('ERROR! Something went wrong while executing this thread, and the function you passed in did NOT complete!!')
# seeing help about the class and information about the threading.Thread super class methods and attributes available:
help(ThreadWithResult)
【讨论】:
还刚刚编辑了答案以包含指向我为此制作的 PyPI 模块的链接。核心代码可能会保持不变,但我想包含一些更好的使用示例并使 README 更详细一些,所以我将逐步添加它们,然后将包更新为 1.0.0 和Stable
Development Status 之后那!之后我也会在这里更新答案:)【参考方案8】:
我偷了 kindall 的答案,稍微整理了一下。
关键部分是将 *args 和 **kwargs 添加到 join() 以处理超时
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
以下更新答案
这是我最受欢迎的答案,因此我决定使用可在 py2 和 py3 上运行的代码进行更新。
此外,我看到这个问题的许多答案表明对 Thread.join() 缺乏理解。有些完全无法处理timeout
arg。但是还有一个极端情况,当您有 (1) 一个可以返回 None
的目标函数和 (2) 您还将 timeout
arg 传递给 join() 时,您应该注意有关实例的极端情况。请参阅“测试 4”以了解这种极端情况。
适用于 py2 和 py3 的 ThreadWithReturn 类:
import sys
from threading import Thread
from builtins import super # https://***.com/a/30159479
_thread_target_key, _thread_args_key, _thread_kwargs_key = (
('_target', '_args', '_kwargs')
if sys.version_info >= (3, 0) else
('_Thread__target', '_Thread__args', '_Thread__kwargs')
)
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if target is not None:
self._return = target(
*getattr(self, _thread_args_key),
**getattr(self, _thread_kwargs_key)
)
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
一些示例测试如下所示:
import time, random
# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg
# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs='seconds': 5)
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs='seconds': 5)
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
您能确定我们在 TEST 4 中可能遇到的极端情况吗?
问题是我们希望 giveMe() 返回 None(参见测试 2),但我们也希望 join() 在超时时返回 None。
returned is None
表示:
(1) 这就是 giveMe() 返回的内容,或者
(2) join() 超时
这个例子很简单,因为我们知道 giveMe() 总是返回 None。但在现实世界的实例中(目标可能合法地返回 None 或其他内容),我们希望明确检查发生了什么。
以下是解决这种极端情况的方法:
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs='seconds': 5)
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app's logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
【讨论】:
你知道 Python3 的 _Thread_target 等价物吗? Python3 中不存在该属性。 我查看了threading.py文件,原来是_target(其他属性名称类似)。 如果将target
、args
和kwargs
参数保存为init中的成员变量,则可以避免访问线程类的私有变量你的班级。
@GreySage 见我的回答,I ported this block to python3 下面
@GreySage answer 现在支持 py2 和 py3【参考方案9】:
使用队列:
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
【讨论】:
真的很喜欢这个解决方案,又短又甜。如果您的函数读取输入队列,并且您添加到out_queue1
,您将需要循环out_queue1.get()
并捕获Queue.Empty 异常:ret = [] ; try: ; while True; ret.append(out_queue1.get(block=False)) ; except Queue.Empty: ; pass
。用分号模拟换行符。【参考方案10】:
我对这个问题的解决方案是将函数和线程包装在一个类中。不需要使用池、队列或 c 类型变量传递。它也是非阻塞的。您改为检查状态。请在代码末尾查看如何使用它的示例。
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
【讨论】:
您将如何处理异常?假设给出了 add 函数和 int 和一个 str。是所有线程都失败还是只有一个线程失败? +1 像我一样思考。说真的 - 这是最少的努力。如果你用 Python 编码 - 你的东西应该自动在一个类中完成,所以这是解决这个问题的最明智的方法。【参考方案11】:我正在使用这个包装器,它可以轻松地将任何函数转换为在 Thread
中运行 - 处理它的返回值或异常。它不会增加Queue
开销。
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
使用示例
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
threading
模块注意事项
线程函数的舒适返回值和异常处理是一种常见的“Pythonic”需求,并且确实应该由threading
模块提供——可能直接在标准Thread
类中提供。 ThreadPool
对简单任务的开销太大 - 3 个管理线程,很多官僚作风。不幸的是,Thread
的布局最初是从 Java 复制而来的——你可以看到例如来自仍然无用的第一个 (!) 构造函数参数 group
。
【讨论】:
第一个构造函数不是没用的,它保留在那里以供将来实现..来自python并行编程食谱 不错的解决方案!只是为了好奇,为什么在“获取”中你不只是简单地提出异常(即 raise ex)?【参考方案12】:根据上面提到的内容,这是适用于 Python3 的更通用的解决方案。
import threading
class ThreadWithReturnValue(threading.Thread):
def __init__(self, *init_args, **init_kwargs):
threading.Thread.__init__(self, *init_args, **init_kwargs)
self._return = None
def run(self):
self._return = self._target(*self._args, **self._kwargs)
def join(self):
threading.Thread.join(self)
return self._return
用法
th = ThreadWithReturnValue(target=requests.get, args=('http://www.google.com',))
th.start()
response = th.join()
response.status_code # => 200
【讨论】:
【参考方案13】:考虑到 @iman 对 @JakeBiesinger 答案的评论,我已将其重新组合为具有不同数量的线程:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello 0'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
【讨论】:
【参考方案14】:join
总是返回None
,我认为你应该继承Thread
来处理返回码等等。
【讨论】:
【参考方案15】:您可以在线程函数的范围之上定义一个可变对象,并将结果添加到其中。 (我还修改了代码以兼容python3)
returns =
def foo(bar):
print('hello 0'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
这将返回'world!': 'foo'
如果您使用函数输入作为结果字典的键,则保证每个唯一输入都会在结果中给出一个条目
【讨论】:
【参考方案16】:将目标定义为
1) 取参数q
2)用q.put(foo); return
替换任何语句return foo
一个函数
def func(a):
ans = a * a
return ans
会变成
def func(a, q):
ans = a * a
q.put(ans)
return
然后你就这样继续
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
您可以使用函数装饰器/包装器来制作它,这样您就可以将现有函数用作target
而无需修改它们,但请遵循此基本方案。
【讨论】:
应该是results = [ans_q.get() for _ in xrange(len(threads))]
【参考方案17】:
GuySoft 的想法很棒,但我认为对象不一定要从 Thread 继承,并且 start() 可以从接口中删除:
from threading import Thread
import queue
class ThreadWithReturnValue(object):
def __init__(self, target=None, args=(), **kwargs):
self._que = queue.Queue()
self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
args=(self._que, args, kwargs), )
self._t.start()
def join(self):
self._t.join()
return self._que.get()
def foo(bar):
print('hello 0'.format(bar))
return "foo"
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
print(twrv.join()) # prints foo
【讨论】:
【参考方案18】:如前所述,多处理池比基本线程慢得多。使用此处某些答案中提出的队列是一种非常有效的选择。我已经将它与字典一起使用,以便能够运行许多小线程并通过将它们与字典相结合来恢复多个答案:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put(k: result)
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)
【讨论】:
【参考方案19】:这是我创建的 @Kindall's answer 的版本。
此版本使您只需输入带有参数的命令即可创建新线程。
这是用 Python 3.8 制作的:
from threading import Thread
from typing import Any
def test(plug, plug2, plug3):
print(f"hello plug")
print(f'I am the second plug : plug2')
print(plug3)
return 'I am the return Value!'
def test2(msg):
return f'I am from the second test: msg'
def test3():
print('hello world')
def NewThread(com, Returning: bool, *arguments) -> Any:
"""
Will create a new thread for a function/command.
:param com: Command to be Executed
:param arguments: Arguments to be sent to Command
:param Returning: True/False Will this command need to return anything
"""
class NewThreadWorker(Thread):
def __init__(self, group = None, target = None, name = None, args = (), kwargs = None, *,
daemon = None):
Thread.__init__(self, group, target, name, args, kwargs, daemon = daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
ntw = NewThreadWorker(target = com, args = (*arguments,))
ntw.start()
if Returning:
return ntw.join()
if __name__ == "__main__":
print(NewThread(test, True, 'hi', 'test', test2('hi')))
NewThread(test3, True)
【讨论】:
【参考方案20】:一个常见的解决方案是用一个装饰器来包装你的函数foo
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
那么整个代码可能是这样的
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
注意
一个重要的问题是返回值可能是无序。
(其实return value
不一定保存到queue
,因为你可以选择任意thread-safe数据结构)
【讨论】:
【参考方案21】:Kindall's answer 在 Python3 中
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon)
self._return = None
def run(self):
try:
if self._target:
self._return = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self,timeout=None):
Thread.join(self,timeout)
return self._return
【讨论】:
【参考方案22】:我知道这个帖子很旧....但我遇到了同样的问题...如果你愿意使用thread.join()
import threading
class test:
def __init__(self):
self.msg=""
def hello(self,bar):
print('hello '.format(bar))
self.msg="foo"
def main(self):
thread = threading.Thread(target=self.hello, args=('world!',))
thread.start()
thread.join()
print(self.msg)
g=test()
g.main()
【讨论】:
以上是关于如何从python中的线程获取返回值?的主要内容,如果未能解决你的问题,请参考以下文章