它来了!真正的 python 多线程
Posted edisonfish
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了它来了!真正的 python 多线程相关的知识,希望对你有一定的参考价值。
哈喽大家好,我是咸鱼
几天前,IBM 工程师 Martin Heinz 发文表示 python 3.12 版本回引入"Per-Interpreter GIL”,有了这个 Per-Interpreter 全局解释器锁,python 就能实现真正意义上的并行/并发
我们知道,python 的多线程/进程并不是真正意义上的多线程/进程,这是因为 python GIL (Global Interpreter Lock)导致的
而即将发布的 Python 3.12 中引入了名为 "Per-Interpreter GIL" 的新特性,能够实现真正的并发
接下来我们来看下这篇文章,原文链接如下:
https://martinheinz.dev/blog/97
译文
Python 到现在已经 32 岁了,但它到现在还没有实现适当的、真正的并发/并行
由于将在 Python 3.12 (预计 2023 年 10 月发布)中引入 “Per-Interpreter GIL”(全局解释器锁),这种情况将会被改变
虽然距离 python 3.12 的发布还有几个月的时间,但是相关代码已经实现了。所以让我们提前来了解一下如何使用子解释器 API(ub-interpreters API) 来编写出真正的并发Python代码
子解释器(Sub-Interpreters)
我们首先来看下这个 “Per-Interpreter GIL” 是如何解决 Python 缺失适当并发性这个问题的
简单来讲,GIL(全局解释器锁)是一个互斥锁,它只允许一个线程控制 Python 解释器(某个线程想要执行,必须要先拿到 GIL ,在一个 python 解释器里面,GIL 只有一个,拿不到 GIL 的就不允许执行)
这就意味着即使你在 Python 中创建多个线程,也只会有一个线程在运行
随着 “Per-Interpreter GIL” 的引用,单个 python 解释器不再共享同一个 GIL。这种隔离级别允许每个子 python 解释器真正地并发运行
这意味着我们可以通过生成额外的子解释器来绕过 Python 的并发限制,其中每个子解释器都有自己的GIL(拿到一个 GIL 锁)
更详细的说明请参见 PEP 684,该文档描述了此功能/更改:https://peps.python.org/pep-0684/#per-interpreter-state
如何安装
想要使用这个新功能,我们需要安装最新的 python 版本,这需要源码编译安装
# https://devguide.python.org/getting-started/setup-building/#unix-compiling
git clone https://github.com/python/cpython.git
cd cpython
./configure --enable-optimizations --prefix=$(pwd)/python-3.12
make -s -j2
./python
# Python 3.12.0a7+ (heads/main:22f3425c3d, May 10 2023, 12:52:07) [GCC 11.3.0] on linux
# Type "help", "copyright", "credits" or "license" for more information.
C-API 在哪里
现在我们已经安装好了最新版本,那么我们该如何使用子解释器呢?我们可以直接通过 import
来导入吗?不幸的是,还不能
正如 PEP-684 中指出的: ...this is an advanced feature meant for a narrow set of users of the C-API.
Per-Interpreter GIL 的特性目前只能通过 C-API 使用,还没有直接的接口供开发人员使用
接口预计会在 PEP 554中出现,如果大家能够接受,它应该会在 Python 3.13 中出现,在这个版本出现之前,我们必须自己想办法来实现子解释器
虽然还没有相关文档,也没有相关模块可以导入,但 CPython 代码库中有一些代码段向我们展示了如何使用它:
- 方法一:我们可以使用
_xxsubinterpreters
模块(因为是通过 C 实现的,所以命名比较奇怪,而且在 python 中不能够简单地去检查代码) - 方法二:可以使用 CPython 的 test 模块,该模块具有用于测试的示例 Interpreter(和 Channel)类
# Choose one of these:
import _xxsubinterpreters as interpreters
from test.support import interpreters
通常情况下我们一般用上面的第二种方法来实现
我们已经找到了子解释器,但我们还需要通过 test 模块去借用一些辅助函数,以便将代码传递给子解释器,辅助函数如下
from textwrap import dedent
import os
# https://github.com/python/cpython/blob/
# 15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L75
def _captured_script(script):
r, w = os.pipe()
indented = script.replace(\'\\n\', \'\\n \')
wrapped = dedent(f"""
import contextlib
with open(w, \'w\', encoding="utf-8") as spipe:
with contextlib.redirect_stdout(spipe):
indented
""")
return wrapped, open(r, encoding="utf-8")
def _run_output(interp, request, channels=None):
script, rpipe = _captured_script(request)
with rpipe:
interp.run(script, channels=channels)
return rpipe.read()
将 interpreters
模块与上面的辅助函数组合在一起,便可以生成第一个子解释器:
from test.support import interpreters
main = interpreters.get_main()
print(f"Main interpreter ID: main")
# Main interpreter ID: Interpreter(id=0, isolated=None)
interp = interpreters.create()
print(f"Sub-interpreter: interp")
# Sub-interpreter: Interpreter(id=1, isolated=True)
# https://github.com/python/cpython/blob/
# 15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L236
code = dedent("""
from test.support import interpreters
cur = interpreters.get_current()
print(cur.id)
""")
out = _run_output(interp, code)
print(f"All Interpreters: interpreters.list_all()")
# All Interpreters: [Interpreter(id=0, isolated=None), Interpreter(id=1, isolated=None)]
print(f"Output: out") # Result of \'print(cur.id)\'
# Output: 1
生成和运行新解释器的一种方法是使用 create()
函数,然后将解释器与我们想要执行的代码一起传递给 _run_output()
辅助函数
还有一种更简单的方法,如下所示
interp = interpreters.create()
interp.run(code)
直接使用 interpreters
模块的 run
方法。
但如果我们运行上面这两段代码时,会收到以下报错
Fatal Python error: PyInterpreterState_Delete: remaining subinterpreters
Python runtime state: finalizing (tstate=0x000055b5926bf398)
为了避免这个报错,我们还需要清理一些悬挂的解释器:
def cleanup_interpreters():
for i in interpreters.list_all():
if i.id == 0: # main
continue
try:
print(f"Cleaning up interpreter: i")
i.close()
except RuntimeError:
pass # already destroyed
cleanup_interpreters()
# Cleaning up interpreter: Interpreter(id=1, isolated=None)
# Cleaning up interpreter: Interpreter(id=2, isolated=None)
线程
虽然使用上面的辅助函数运行代码是可行的,但在 threading
模块中使用熟悉的接口可能会更方便
import threading
def run_in_thread():
t = threading.Thread(target=interpreters.create)
print(t)
t.start()
print(t)
t.join()
print(t)
run_in_thread()
run_in_thread()
# <Thread(Thread-1 (create), initial)>
# <Thread(Thread-1 (create), started 139772371633728)>
# <Thread(Thread-1 (create), stopped 139772371633728)>
# <Thread(Thread-2 (create), initial)>
# <Thread(Thread-2 (create), started 139772371633728)>
# <Thread(Thread-2 (create), stopped 139772371633728)>
我们通过把 interpreters.create
函数传递给Thread
,它会自动在线程内部生成新的子解释器
我们也可以结合这两种方法,并将辅助函数传递给 threading.Thread
:
import time
def run_in_thread():
interp = interpreters.create(isolated=True)
t = threading.Thread(target=_run_output, args=(interp, dedent("""
import _xxsubinterpreters as _interpreters
cur = _interpreters.get_current()
import time
time.sleep(2)
# Can\'t print from here, won\'t bubble-up to main interpreter
assert isinstance(cur, _interpreters.InterpreterID)
""")))
print(f"Created Thread: t")
t.start()
return t
t1 = run_in_thread()
print(f"First running Thread: t1")
t2 = run_in_thread()
print(f"Second running Thread: t2")
time.sleep(4) # Need to sleep to give Threads time to complete
cleanup_interpreters()
上面的代码中演示了如何使用 _xxsubinterpreters
模块来实现 (方法一)
我们还在每个线程中休眠 2 秒来模拟“工作”状态
请注意,我们甚至不必调用 join() 函数等待线程完成,只需在线程完成时清理解释器即可
Channels
如果我们进一步挖掘 CPython test
模块,我们还会发现 RecvChannel 和 SendChannel 类的实现类似于 Golang 中已知的通道
# https://github.com/python/cpython/blob/
# 15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test_interpreters.py#L583
r, s = interpreters.create_channel()
print(f"Channel: r, s")
# Channel: RecvChannel(id=0), SendChannel(id=0)
orig = b\'spam\'
s.send_nowait(orig)
obj = r.recv()
print(f"Received: obj")
# Received: b\'spam\'
cleanup_interpreters()
# Need clean up, otherwise:
# free(): invalid pointer
# Aborted (core dumped)
上面的例子介绍了如何创建一个接收端通道(r)和发送端通道(s),然后我们使用 send_nowait
方法将数据发送,通过 recv
方法来接收数据
这个通道实际上只是另一个解释器,和以前一样,我们需要在处理完它之后进行清理
Digging Deeper
如果我们想要修改或者调整子解释器的选项(这些选项通常在 C 代码中设置),我们可以使用
test.support
模块中的代码,具体来说是run_in_subinterp_with_config
import test.support
def run_in_thread(script):
test.support.run_in_subinterp_with_config(
script,
use_main_obmalloc=True,
allow_fork=True,
allow_exec=True,
allow_threads=True,
allow_daemon_threads=False,
check_multi_interp_extensions=False,
own_gil=True,
)
code = dedent(f"""
from test.support import interpreters
cur = interpreters.get_current()
print(cur)
""")
run_in_thread(code)
# Interpreter(id=7, isolated=None)
run_in_thread(code)
# Interpreter(id=8, isolated=None)
上面这个run_in_subinterp_with_config
函数是 C 函数的 Python API。它提供了一些子解释器选项,如 own_gil
,指定子解释器是否应该拥有自己的 GIL
python之多线程原理
参考技术A 并发:逻辑上具备同时处理多个任务的能力。并行:物理上在同一时刻执行多个并发任务。
举例:开个QQ,开了一个进程,开了微信,开了一个进程。在QQ这个进程里面,传输文字开一个线程、传输语音开了一个线程、弹出对话框又开了一个线程。
总结:开一个软件,相当于开了一个进程。在这个软件运行的过程里,多个工作同时运转,完成了QQ的运行,那么这个多个工作分别有多个线程。
线程和进程之间的区别:
进程在python中的使用,对模块threading进行操作,调用的这个三方库。可以通过 help(threading) 了解其中的方法、变量使用情况。也可以使用 dir(threading) 查看目录结构。
current_thread_num = threading.active_count() # 返回正在运行的线程数量
run_thread_len = len(threading.enumerate()) # 返回正在运行的线程数量
run_thread_list = threading.enumerate() # 返回当前运行线程的列表
t1=threading.Thread(target=dance) #创建两个子线程,参数传递为函数名
t1.setDaemon(True) # 设置守护进程,守护进程:主线程结束时自动退出子线程。
t1.start() # 启动子线程
t1.join() # 等待进程结束 exit()`# 主线程退出,t1子线程设置了守护进程,会自动退出。其他子线程会继续执行。
以上是关于它来了!真正的 python 多线程的主要内容,如果未能解决你的问题,请参考以下文章