Python中级精华-并发之启动和停止线程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python中级精华-并发之启动和停止线程相关的知识,希望对你有一定的参考价值。
参考技术A 为了让代码能够并发执行,向创建线程并在核实的时候销毁它。由于目的比较单纯,只是讲解基础的线程创建方法,所以可以直接使用threading库中的Thread类来实例化一个线程对象。
例子,用户输入两个数字,并且求其两个数字的四则运算的结果:
除了以上的一些功能以外,在python线程
中没有其他的诸如给线程发信号、设置线程调度属性、执行任何其他高级操作的功能了,如果需要这些功能,就需要手工编写了。
另外,需要注意的是,由于GIL(全局解释器锁)的存在,限制了在python解释器当中只允许运行一个线程。基于这个原因,不停该使用python线程来处理计算密集型的任务,因为在这种任务重我们希望在多个CPU核心上实现并行处理。Python线程更适合于IO处理以及设计阻塞操作的并发执行任务(即等待IO响应或等待数据库取出结果等)。
如何判断线程是否已经启动?
目的:我们加载了一个线程,但是想要知道这个线程什么时候才会开始运行?
方法:
线程的核心特征我认为就是不确定性,因为其什么时候开始运行,什么时候被打断,什么时候恢复执行,这不是程序员能够控制的,而是有系统调度
来完成的。如果遇到像某个线程的运行依托于其他某个线程运行到某个状态时该线程才能开始运行,那么这就是线程同步
问题,同样这个问题非常棘手。要解决这类问题我们要借助threading中的Event对象。
Event其实和条件标记类似,匀速线程
等待某个时间发生。初始状态时事件被设置成0。如果事件没有被设置而线程正在等待该事件,那么线程就会被阻塞,直到事件被设置位置,当有线程设置了这个事件之后,那么就会唤醒正在等待事件的线程,如果线程等待的事件已经设置了,那么线程会继续执行。
一个例子:
如上能够确定的是,主线程会在线程t运行结束时再运行。
Python并发编程
1.启动与停止线程
import time
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
from threading import Thread
t = Thread(target=countdown, args=(10,)) #创建一个线程
t.start() #线程启动
if t.is_alive(): #查询一个线程对象的状态,看它是否还在执行
print('Still running')
else:
print('Completed')
t.join() #主线程等待t线程执行结束
注意:创建好一个线程对象后,该对象并不会立即执行,除非调用它的 start()
方法(当调用 start()
方法时,它会调用传递进来的函数,并把传递进来的参数传递给该函数)
2.判断线程是否已经启动
Event
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待这个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。
from threading import Thread, Event
import time
# Code to execute in an independent thread
def countdown(n, started_evt):
print('countdown starting')
started_evt.set()
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
# Create the event object that will be used to signal startup
started_evt = Event()
# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()
# Wait for the thread to start
started_evt.wait()
print('countdown is running')
执行这段代码,“countdown is running” 总是显示在 “countdown starting” 之后显示。这是由于使用 event 来协调线程,使得主线程要等到 countdown()
函数输出启动信息后,才能继续执行
3.线程间通信
创建一个被多个线程共享的 Queue
对象,这些线程通过使用 put()
和 get()
操作来向队列中添加或者删除元素
from queue import Queue #Queue 对象已经包含了必要的锁,所以可以通过它在多个线程间多安全地共享数据
from threading import Thread
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
out_q.put(data)
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。
4.给关键部分加锁
需要对多线程程序中的临界区加锁以避免竞争条件
(1)显式加锁释放
import threading
class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock() #初始化锁
def incr(self,delta=1):
'''
Increment the counter with locking
'''
self._value_lock.acquire() #加锁
self._value += delta
self._value_lock.release() #释放锁
def decr(self,delta=1):
'''
Decrement the counter with locking
'''
self._value_lock.acquire() #加锁
self._value -= delta
self._value_lock.release() #释放锁
(2)隐式加锁释放
import threading
class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock()
def incr(self,delta=1):
'''
Increment the counter with locking
'''
with self._value_lock: #with 语句会在这个代码块执行前自动获取锁,在执行结束后自动释放锁
self._value += delta
def decr(self,delta=1):
'''
Decrement the counter with locking
'''
with self._value_lock: #每次只有一个线程可以执行 with 语句包含的代码块
self._value -= delta
5.防止死锁的加锁机制
解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁
import threading
from contextlib import contextmanager
# Thread-local state to stored information on locks already acquired
_local = threading.local()
@contextmanager
def acquire(*locks):
# 通过排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取
locks = sorted(locks, key=lambda x: id(x))
# Make sure lock order of previously acquired locks is not violated
acquired = getattr(_local,'acquired',[])
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
raise RuntimeError('Lock Order Violation')
# Acquire all of the locks
acquired.extend(locks)
_local.acquired = acquired
try:
for lock in locks:
lock.acquire()
yield
finally:
# Release locks in reverse order of acquisition
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]
import threading
x_lock = threading.Lock()
y_lock = threading.Lock()
def thread_1():
while True:
with acquire(x_lock, y_lock):
print('Thread-1')
def thread_2():
while True:
with acquire(y_lock, x_lock):
print('Thread-2')
t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()
6.创建一个线程池
concurrent.futures
函数库有一个 ThreadPoolExecutor
类可以被用来完成这个任务。 下面是一个简单的TCP服务器,使用了一个线程池来响应客户端:
from socket import AF_INET, SOCK_STREAM, socket
from concurrent.futures import ThreadPoolExecutor
def echo_client(sock, client_addr):
'''
Handle a client connection
'''
print('Got connection from', client_addr)
while True:
msg = sock.recv(65536)
if not msg:
break
sock.sendall(msg)
print('Client closed connection')
sock.close()
def echo_server(addr):
pool = ThreadPoolExecutor(128) #线程池 最大线程上限128个
sock = socket(AF_INET, SOCK_STREAM)
sock.bind(addr)
sock.listen(5)
while True:
client_sock, client_addr = sock.accept()
pool.submit(echo_client, client_sock, client_addr)
echo_server(('',15000))
如果人试图通过创建大量线程让你服务器资源枯竭而崩溃的攻击行为。 通过使用预先初始化的线程池,可以设置同时运行线程的上限数量。
7.简单的并行编程
(1)创建并激活进程池
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as pool:
...
do work in parallel using pool
...
一个 ProcessPoolExecutor
创建N个独立的Python解释器, N是系统上面可用CPU的个数。你可以通过提供可选参数给 ProcessPoolExecutor(N)
来修改 处理器数量。这个处理池会一直运行到with块中最后一个语句执行完成, 然后处理池被关闭。
(2)提交任务
# Some function
def work(x):
...
return result
with ProcessPoolExecutor() as pool:
...
# Example of submitting work to the pool
future_result = pool.submit(work, arg)
# Obtaining the result (blocks until done)
r = future_result.result() #要获取最终结果,需要调用它的 result() 方法。 它会阻塞进程直到结果被返回来
...
def when_done(r): #callback回调函数
print('Got:', r.result())
with ProcessPoolExecutor() as pool:
future_result = pool.submit(work, arg)
future_result.add_done_callback(when_done) #可以使用一个回调函数 不会阻塞进程
8.定义一个Actor任务
简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。 actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。
from queue import Queue
from threading import Thread, Event
# Sentinel used for shutdown
class ActorExit(Exception):
pass
class Actor:
def __init__(self):
self._mailbox = Queue()
def send(self, msg):
'''
使用actor实例的 send() 方法发送消息给它们。 其机制是,这个方法会将消息放入一个队里中, 然后将其转交给处理被接受消息的一个内部线程
'''
self._mailbox.put(msg)
def recv(self):
'''
Receive an incoming message
'''
msg = self._mailbox.get()
if msg is ActorExit:
raise ActorExit()
return msg
def close(self):
'''
close() 方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor
'''
self.send(ActorExit)
def start(self):
'''
Start concurrent execution
'''
self._terminated = Event()
t = Thread(target=self._bootstrap)
t.daemon = True
t.start()
def join(self):
self._terminated.wait()
def run(self):
'''
Run method to be implemented by the user
'''
while True:
msg = self.recv()
# Sample ActorTask
class PrintActor(Actor):
def run(self):
while True:
msg = self.recv()
print('Got:', msg)
# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()
9.实现消息发布/订阅模型
要实现发布/订阅的消息通信模式, 不直接将消息从一个任务发送到另一个,而是将其发送给交换机, 然后由交换机将它发送给一个或多个被关联任务。
from collections import defaultdict
class Exchange: #交换机
def __init__(self):
self._subscribers = set()
def attach(self, task): #订阅任务
self._subscribers.add(task)
def detach(self, task): #取消订阅任务
self._subscribers.remove(task)
def send(self, msg):
for subscriber in self._subscribers:
subscriber.send(msg)
# Dictionary of all created exchanges
_exchanges = defaultdict(Exchange)
# Return the Exchange instance associated with a given name
def get_exchange(name):
return _exchanges[name]
class Task:
...
def send(self, msg):
...
task_a = Task()
task_b = Task()
# Example of getting an exchange
exc = get_exchange('name')
# 订阅
exc.attach(task_a)
exc.attach(task_b)
# 发布信息
exc.send('msg1')
exc.send('msg2')
# 取消订阅
exc.detach(task_a)
exc.detach(task_b)
以上是关于Python中级精华-并发之启动和停止线程的主要内容,如果未能解决你的问题,请参考以下文章