线程Queue定时器进程池和线程池同步异步

Posted zhuangyl23

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程Queue定时器进程池和线程池同步异步相关的知识,希望对你有一定的参考价值。

线程Queue、定时器、进程池和线程池、多线程socket通信

一、Queue队列实现线程通信

queue模块下提供了几个阻塞队列,这些队列主要用于实现线程通信。在queue模块下主要提供了三个类,分别代表三种队列,它们的主要区别就在于进队列、出队列的不同。

关于这三个队列类的简单介绍如下:

  1. queue.Queue(maxsize=0):代表先进先出(FIFO)的常规队列,maxsize 可以限制队列的大小。如果队列的大小达到队列的上限,就会加锁,再次加入元素时就会被阻塞,直到队列中的元素被消费。如果将 maxsize 设置为 0 或负数,则该队列的大小就是无限制的。
  2. queue.LifoQueue(maxsize=0):代表后进先出(LIFO)的队列,与 Queue 的区别就是出队列的顺序不同。
  3. queue.PriorityQueue(maxsize=0):代表优先级队列,优先级最小的元素先出队列。

这三个队列类的属性和方法基本相同, 它们都提供了如下属性和方法:

  • Queue.qsize():返回队列的实际大小,也就是该队列中包含几个元素。
  • Queue.empty():判断队列是否为空。
  • Queue.full():判断队列是否已满。
  • Queue.put(item, block=True, timeout=None):向队列中放入元素。如果队列己满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到该队列的元素被消费;如果队列己满,且 block 参数为 False(不阻塞),则直接引发 queue.FULL 异常。
  • Queue.put_nowait(item):向队列中放入元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。
  • Queue.get(item, block=True, timeout=None):从队列中取出元素(消费元素)。如果队列已满,且 block 参数为 True(阻塞),当前线程被阻塞,timeout 指定阻塞时间,如果将 timeout 设置为 None,则代表一直阻塞,直到有元素被放入队列中; 如果队列己空,且 block 参数为 False(不阻塞),则直接引发 queue.EMPTY 异常。
  • Queue.get_nowait(item):从队列中取出元素,不阻塞。相当于在上一个方法中将 block 参数设置为 False。

下面看具体代码

# 先进先出
import queue
q = queue.Queue()
q.put('123')
q.put('qweqwe')
print(q.get())  # 先拿到的是123
print(q.get())
# print(q.get()) # 两个值都取光了,队列为空,这里再取就会阻塞,直到队列中有值
q.task_done() # 结束任务
# q.join()
# 先进后出
import queue
q = queue.LifoQueue()
q.put('杨蓬蓬吃饭')
q.put('杨蓬蓬上厕所')
q.put('杨蓬蓬睡觉')
print(q.get())  # 后进先出,先取到杨蓬蓬睡觉
print(q.get())
print(q.get())  # 先进后出,最后取到杨蓬蓬吃饭
# 优先级
import queue
q = queue.PriorityQueue() # 可以根据优先级取数据
# 通常这个元组的第一个值是int类型,第一个值越小,优先级越高
q.put((50,'吃饭'))
q.put((80,'睡觉'))
q.put((1,'敲代码'))
print(q.get())  # 1最小,先取到敲代码 (1, '敲代码')
print(q.get())  # (50, '吃饭')
print(q.get())  # (80, '睡觉')

二、线程定时器(Timer)

Thread 类有一个 Timer子类,该子类可用于控制指定函数在特定时间内执行一次。例如如下程序:

from threading import Thread,Timer
import time

def task():
    print('线程执行了')
    time.sleep(2)
    print('线程结束了')

t = Timer(4,task)  # 指定4s后开启一个线程执行task
t.start()

需要说明的是,Timer 只能控制函数在指定时间内执行一次,如果要使用 Timer 控制函数多次重复执行,则需要再执行下一次调度。

三、进程池和线程池

用池的功能限制进程数或线程数。

为什么要限制:当并发的任务数量远远大于计算机所能承受的范围,即无法一次性开启过多的任务数量,就应该考虑去限制进程数或线程数,从而保证服务器不崩。

系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互。在这种情形下,使用线程池可以很好的提升性能,尤其是当程序中需要创建大量生存期很短暂的线程时,更应该考虑使用线程池。

线程池在系统启动时即创建大量空闲的线程,程序只要将一个函数提交给线程池,线程池就会启动一个空闲的线程来执行它。当该函数执行结束后,该线程并不会死亡,而是再次返回到线程池中变成空闲状态,等待执行下一个函数。

此外,使用线程池可以有效地控制系统中并发线程的数量。当系统中包含有大量的并发线程时,会导致系统性能急剧下降,甚至导致 Python 解释器崩溃,而线程池的最大线程数参数可以控制系统中并发线程的数量不超过此数。

线程池的基类是 concurrent.futures 模块中的 Executor,Executor 提供了两个子类,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用于创建线程池,而 ProcessPoolExecutor 用于创建进程池。

如果使用线程池/进程池来管理并发编程,那么只要将相应的 task 函数提交给线程池/进程池,剩下的事情就由线程池/进程池来搞定。

Exectuor 提供了如下常用方法:

  • submit(fn, *args, **kwargs):将 fn 函数提交给线程池。args 代表传给 fn 函数的参数,kwargs 代表以关键字参数的形式为 fn 函数传入参数。
  • map(func, iterables, timeout=None, chunksize=1):该函数类似于全局函数 map(func, iterables),只是该函数将会启动多个线程,以异步方式立即对 iterables 执行 map 处理。
  • shutdown(wait=True):关闭线程池。

程序将 task 函数提交(submit)给线程池后,submit 方法会返回一个 Future 对象,Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行,因此,线程执行的函数相当于一个“将来完成”的任务,所以 Python 使用 Future 来代表。

Future 提供了如下方法:

  • cancel():取消该 Future 代表的线程任务。如果该任务正在执行,不可取消,则该方法返回 False;否则,程序会取消该任务,并返回 True。
  • cancelled():返回 Future 代表的线程任务是否被成功取消。
  • running():如果该 Future 代表的线程任务正在执行、不可被取消,该方法返回 True。
  • done():如果该 Funture 代表的线程任务被成功取消或执行完成,则该方法返回 True。
  • result(timeout=None):获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成,该方法将会阻塞当前线程,其中 timeout 参数指定最多阻塞多少秒。
  • exception(timeout=None):获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成,没有异常,则该方法返回 None。
  • add_done_callback(fn):为该 Future 代表的线程任务注册一个“回调函数”,当该任务成功完成时,程序会自动触发该 fn 函数。

使用线程池来执行线程任务:

from concurrent.futures import ThreadPoolExecutor
from threading import currentThread
import time

def task(i):
    time.sleep(1.5)
    print(f"currentThread().name在执行任务i+1")
    return i**2

if __name__ == '__main__':
    fu_list = []
    pool = ThreadPoolExecutor(4)  #规定线程池有4个线程
    for i in range(20):  #模拟20个线程,task要做20次,4个线程负责做这个事情
        future = pool.submit(task,i)   #for循环一次,提交一次
        fu_list.append(future)   #先把提交的数据意义放到这个列表里面
    pool.shutdown()   # 关闭池的入口,不让你往里面再放东西
    for fu in fu_list:  #依次循环列表里面的值
        print(fu.result())  #打印返回值
        
-----------------------------------------------------------------------------
ThreadPoolExecutor-0_0在执行任务1
ThreadPoolExecutor-0_2在执行任务3
ThreadPoolExecutor-0_1在执行任务2
ThreadPoolExecutor-0_3在执行任务4
ThreadPoolExecutor-0_0在执行任务5
ThreadPoolExecutor-0_1在执行任务7
ThreadPoolExecutor-0_2在执行任务6
ThreadPoolExecutor-0_3在执行任务8
ThreadPoolExecutor-0_0在执行任务9
ThreadPoolExecutor-0_2在执行任务11
ThreadPoolExecutor-0_1在执行任务10
ThreadPoolExecutor-0_3在执行任务12
ThreadPoolExecutor-0_0在执行任务13
ThreadPoolExecutor-0_2在执行任务14
ThreadPoolExecutor-0_1在执行任务15
ThreadPoolExecutor-0_3在执行任务16
ThreadPoolExecutor-0_0在执行任务17
ThreadPoolExecutor-0_2在执行任务18
ThreadPoolExecutor-0_1在执行任务19
ThreadPoolExecutor-0_3在执行任务20
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361

上述代码就是只有0,1,2,3四个线程来执行20次任务,同一时间只能有四个任务执行,只有能一个线程执行完一个任务,空出来了,才能执行下一个任务。

使用进程池来执行进程任务

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time

def task(i):
    time.sleep(1)
    print(f"current_process().name在执行任务i+1")
    time.sleep(1)
    return i**2

if __name__ == '__main__':
    fu_list = []
    pool = ProcessPoolExecutor(4)  #规定进程池有是个线程
    for i in range(20):  #模拟20个进程,task要做20次,4个进程负责做这个事情
        future = pool.submit(task,i)   #for循环一次,提交一次
        fu_list.append(future)   #先把提交的数据意义放到这个列表里面
    pool.shutdown()   # 关闭池的入口,不让你往里面再放东西
    for fu in fu_list:
        print(fu.result())
        
-----------------------------------------------------------------------------
Process-1在执行任务1
Process-2在执行任务2
Process-3在执行任务3
Process-4在执行任务4
Process-1在执行任务5
Process-2在执行任务6
Process-3在执行任务7
Process-4在执行任务8
Process-1在执行任务9
Process-2在执行任务10
Process-3在执行任务11
Process-4在执行任务12
Process-1在执行任务13
Process-2在执行任务14
Process-3在执行任务15
Process-4在执行任务16
Process-1在执行任务17
Process-2在执行任务18
Process-3在执行任务19
Process-4在执行任务20
0
1
4
9
16
25
36
49
64
81
100
121
144
169
196
225
256
289
324
361

不要和信号量用混了,线程池里面始终没有产生新的线程,比如ThreadPoolExecutor(4),所有的任务始终是由这4个线程去执行。

四、同步和异步

是提交任务的两种方式

4.1、同步

提交了一个任务,必须等任务执行完 (拿到返回值)才能执行下一行代码

import os
import time
import random
from multiprocessing import Process

def work(n):
    print(f'n: os.getpid() is running' )
    time.sleep(random.randint(1,3))
    print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
    for i in range(3):  #这种就是同步了
        p=Process(target=work,args=(i,))
        p.start()

-----------------------------------------------------------------------------
1: 7504 is running
0: 15736 is running
2: 17896 is running
2:17896 is done
1:7504 is done
0:15736 is done

4.2 、异步

提交了一个任务,不要等任务执行完,可以直接执行下一行代码

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
import time

def task(i):
    time.sleep(1)
    print(f"current_process().name在执行任务i+1")
    time.sleep(1)
    return i**2

def parse(future):
    print(future.result())

if __name__ == '__main__':
    fu_list = []
    pool = ProcessPoolExecutor(4)  #规定进程池有是个线程
    for i in range(20):  #模拟20个进程,task要做20次,4个进程负责做这个事情
        future = pool.submit(task,i)   #for循环一次,提交一次
        future.add_done_callback(parse)
        # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数,
        # 会把future对象作为参数传给函数
        # 这个称之为回调函数,处理完了回来就调用这个函数.
        
-----------------------------------------------------------------------------
Process-1在执行任务1
Process-2在执行任务2
Process-3在执行任务3
Process-4在执行任务4
1
0
4
9
Process-2在执行任务5
Process-1在执行任务6
Process-3在执行任务7
Process-4在执行任务8
16
25
36
49
Process-2在执行任务9
Process-1在执行任务10
Process-3在执行任务11
Process-4在执行任务12
64
81
100
121
Process-2在执行任务13
Process-1在执行任务14
Process-3在执行任务15
Process-4在执行任务16
144
169
196
225
Process-2在执行任务17
Process-1在执行任务18
Process-3在执行任务19
Process-4在执行任务20
256
289
324
361

五、多线程socket升级

服务端

import socket
from threading import Thread

def talk(conn):
    while True:
        try:
            msg = conn.recv(1024)
            if len(msg) == 0: break
            conn.send(msg.upper())
        except connectionResetError
            print('客户端关闭了一个链接')
            break
    conn.close()   
    
def serve_demo():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('192.168.11.199', 8010))
    server.listen(5)
    
    while True:
        conn, addr = server.accept()
        t = Thread(target=talk, args(conn,))
        t.start()
        
if __name__ == '__main__':
    server_demo()

客户端

import socket
from threading import Thread

def client_demo():
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    client.connect(('192.168.11.199', 8010))
    while True:
        msg = f'currentThread().name'
        if len(msg) == 0: break
        client.send(msg.encode('utf-8'))
        feedback = client.recv(1024)
        print(feedback.decode('utf-8'))
     
    client.close()
    
if __name__ == '__main__':
    for i in range(5):
        t = Thread(target=client_demo)
        t.start()

以上是关于线程Queue定时器进程池和线程池同步异步的主要内容,如果未能解决你的问题,请参考以下文章

python 之 并发编程(进程池与线程池同步异步阻塞非阻塞线程queue)

Okhttp的线程池和高并发

回炉重造之重读Windows核心编程-011-线程池和其他异步方式

GIL定时器线程queue进程池和线程池

GIL 线程池 进程池 同步 异步

学习日记0910线程池与进程池 同步调用与异步调用 函数回调