线程进程和协程

Posted charliedaifu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程进程和协程相关的知识,希望对你有一定的参考价值。

目录

一、队列(queue)

二、线程(threading

三、进程(multiprocessing)

四、协程(gevent / greenlet

 

 

 

一、队列(queue)

1、队列分类

  • queue.Queue 先进先出队列;

  • q = queue.LifoQueue #后进先出队列,继承Queue;

  • q = queue.PriorityQueue #优先级队列,继承Queue;

  • 注意:这些队列都是在Python内存中创建的,Python进程结束,队列就自动清空;
#需要添加一个元组(级别,内容),先取出级别小的内容
q = queue.PriorityQueue()
q.put((4,"a"))
q.put((2,"a"))
q.put((3,"a"))
print(q.get())
print(q.get())
print(q.get())
# (2, ‘a‘)
# (3, ‘a‘)
# (4, ‘a‘)
  • q =collections.deque #双向队列,支持从任意一端增加删除元素。deque是线程安全的,内存高效的队列,它被设计为从两端追加和弹出都非常快。
from collections import deque
d = deque()
d.append(11)
d.append(22)
d.appendleft(33)#从左边加数据
#数据顺序33,11,22
print(d.pop())#取数据
print(d.popleft())#从左边取数据

2、queue.Queue 先进先出队列

  • put(数据内容,block,timeout):放数据,block为False,不阻塞,默认阻塞,timeout设置等待时间,超时报错,默认一直等待

  • get(数据内容,block,timeout):取数据,block为False,不阻塞,默认阻塞,timeout设置等待时间,超时报错,默认一直等待

import queue
q = queue.Queue(5)#队列最大长度为2,默认可以无限个
q.put(11)
q.put(22)
# q.put(33,block=False)#直接报错
# q.put(33,timeout=2)#等待2秒后报错
print(q.qsize())#队列长度
print(q.get())
q.task_done()
print(q.get())
q.task_done()#每执行一次取的任务,就执行一次,表示本次任务结束
print(q.empty())#检查队列是否为空
print(q.full())#检查队列是否已满
q.join() #如果每次没有设置task_done,程序就不会结束,一直等待

3、队列和线程应用实例

#生产者和消费者队列、线程结合应用实例
import queue
import threading
import time
q = queue.Queue(20)#创建一个队列,存放三个厨师生产的包子
def productor(arg):
    ‘‘‘生产者‘‘‘
    while True:
        q.put(str(arg) + " - 包子")
def consumer(arg):
    ‘‘‘消费者‘‘‘
    while True:
        print(arg,q.get())
        time.sleep(2)
for i in range(3):
    ‘‘‘三个厨师‘‘‘
    t = threading.Thread(target=productor,args=(i,))
    t.start()
for n in range(100):
    ‘‘‘100个消费者‘‘‘
    t = threading.Thread(target=consumer,args=(n,))
    t.start()

二、线程(threading

1、创建线程有两种方法:第一种利用系统自带的类threading创建,尽量用系统自带的类创建;

import threading
import time
  
def show(arg):
    time.sleep(1)
    print thread+str(arg)
  
for i in range(10):
    t = threading.Thread(target=show, args=(i,))
    t.start()
  
print main thread stop

上述代码创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。

2、第二种方法:自定义threading类

#自定义一个继承threading的类和一个run方法
import threading
import time
class MyThread(threading.Thread):
    def __init__(self, func,args):
        self.func = func
        self.args = args
        super(MyThread,self).__init__()
    def run(self):  # 定义每个线程要运行的函数
        self.func(self.args)
        time.sleep(2)
def f(arg):
    print(arg)
t1 = MyThread(f,123)
t2 = MyThread(f,123)
t1.start()#123
t2.start()#123

3、threading类的基本方法

  • start            线程准备就绪,等待CPU调度

  • setName     为线程设置名称

  • getName     获取线程名称

  • setDaemon   设置为后台线程或前台线程(默认);如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止

import threading
def f(x):
    print(x)
t = threading.Thread(target=f,args=(100,))
t.setDaemon(True)  #True表示主线程不等该子线程
t.start() #不代表当前线程会被立即执行
print(end)
  • join(n)   逐个执行每个线程,执行完毕后继续往下执行,n表示最多等几秒,该方法使得多线程变得无意义

  • run       线程被cpu调度后自动执行线程对象的run方法

4、总结

  • 一个应用程序可以有多进程、多线程;默认是单进程、单线程;

  • 每一个进程有一个全局解释器锁(GIL),每一次CPU调用只能调用一条线程;

  • 如果是IO操作,不占用CPU,单进程,多线程可以提高并发;

  • 如果是计算型操作,占用CPU,多进程提高并发;

  • Threading用于提供线程相关的操作,线程是应用程序中工作的最小单元。

5、线程锁(Lock、RLock)

  • 由于线程之间是进行随机调度,当多个线程同时修改同一条数据时可能会出现脏数据,所以出现了线程锁 - 同一时刻只允许一个线程执行操作。

import threading
import time
NUM = 10
lock1 = threading.Lock()#单次锁,不能重复使用
def f1():
    global NUM
    lock1.acquire()#上锁
    NUM -= 1
    time.sleep(2)
    print(NUM)
    lock1.release()#解锁
for i in range(10):
    t = threading.Thread(target=f1)
    t.start()
lock2 = threading.RLock()#连环锁,可以递归使用
def f2():
    global NUM
    lock2.acquire()#上锁
    NUM -= 1
    lock2.acquire()
    time.sleep(2)
    lock2.release()
    print(NUM)
    lock2.release()#解锁
for i in range(10):
    t = threading.Thread(target=f2)
    t.start()

6、信号量(Semaphore)

  • 同时允许一定数量的线程更改数据
import threading, time
# 最多允许5个线程同时运行,相当于一个线程锁
semaphore = threading.BoundedSemaphore(5)
def run(n):
    semaphore.acquire()
    time.sleep(3)
    print(n +1)
    semaphore.release()
for i in range(20):
    t = threading.Thread(target=run, args=(i,))
    t.start()

7、事件(event)

  • python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

  • 事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法便不再阻塞。

  • clear:将“Flag”设置为False
  • set:将“Flag”设置为True
import threading
event = threading.Event()
def func(i):
    print(str(i) + "- wait")
    event.wait()#检测是什么灯,默认是红灯
    print(str(i) + " - start")
for i in range(5):
    t = threading.Thread(target=func,args=(i,))
    t.start()
event.clear()#主动设置成红灯,可以不写
inp = input("
>>>:")
if inp == "1":
    event.set()#设置为绿灯

8、条件(Condition)

  • 使得线程等待,只有满足某条件时,才释放n个线程,下面详细说明实现的两种方法:
#notify(n)n代表释放几个线程
import threading 
def run(n):
    con.acquire()
    con.wait()
    print(n+10)
    con.release() 
if __name__ == __main__:
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start() 
    while True:
        inp = input(>>>)
        if inp == q:
            break
        con.acquire()
        con.notify(int(inp))#传入几就释放几个线程
        con.release()
#wait_for(func)等待函数的返回结果,如果为True,就释放一个
imoort threading
def func():
    ret = False
    inp = input(>>>)
    if inp == 1:
        ret = True
    return ret
def run(n):
    con.acquire()
    con.wait_for(func)
    print("run the thread: %s" %n)
    con.release()

if __name__ == __main__:
    con = threading.Condition()
    for i in range(10):
        t = threading.Thread(target=run, args=(i,))
        t.start()

9、Timer

  • 定时器,指定n秒后执行某操作
#1秒后释放
from threading import Timer
def hello():
    print("hello, world")
t = Timer(1, hello)
t.start()

10、自定义线程池

#第一种方法,比较low
import threading
import queue
import time
class ThreadingPool:
    def __init__(self,maxsize):
        self.maxsize = maxsize#线程池的最大个数
        self.q = queue.Queue(maxsize)
        for i in range(maxsize):
            #将线程池加满
            self.q.put(threading.Thread)
    def get_thread(self):
        #从线程池取线程
        return self.q.get()
    def add_thread(self):
        #往线程池加线程
        self.q.put(threading.Thread)
pool = ThreadingPool(5)
def func(arg,p):
    print(arg)
    time.sleep(2)
    #每取一次就添加一个线程
    p.add_thread()
for i in range(100):
    thread = pool.get_thread()#获取线程池里的类
    t = thread(target=func,args=(i,pool,))
    t.start()
技术分享图片
import queue
import threading
import contextlib
import time

StopEvent = object()


class ThreadPool(object):

    def __init__(self, max_num, max_task_num = None):
        if max_task_num:
            self.q = queue.Queue(max_task_num)
        else:
            self.q = queue.Queue()
        self.max_num = max_num
        self.cancel = False
        self.terminal = False
        self.generate_list = []
        self.free_list = []

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
        :return: 如果线程池已经终止,则返回True否则None
        """
        if self.cancel:
            return
        if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
            self.generate_thread()
        w = (func, args, callback,)
        self.q.put(w)

    def generate_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.start()

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.currentThread()
        self.generate_list.append(current_thread)

        event = self.q.get()
        while event != StopEvent:

            func, arguments, callback = event
            try:
                result = func(*arguments)
                success = True
            except Exception as e:
                success = False
                result = None

            if callback is not None:
                try:
                    callback(success, result)
                except Exception as e:
                    pass

            with self.worker_state(self.free_list, current_thread):
                if self.terminal:
                    event = StopEvent
                else:
                    event = self.q.get()
        else:

            self.generate_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        self.cancel = True
        full_size = len(self.generate_list)
        while full_size:
            self.q.put(StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.generate_list:
            self.q.put(StopEvent)

        self.q.queue.clear()

    @contextlib.contextmanager
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)



# How to use


pool = ThreadPool(5)

def callback(status, result):
    # status, execute action status
    # result, execute action return value
    pass


def action(i):
    print(i)

for i in range(30):
    ret = pool.run(action, (i,), callback)

time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
高级版本自定义线程池

三、进程(multiprocessing)

1、创建进程:由于进程之间的数据需要各自持有一份,所以创建进程需要的非常大的开销

from multiprocessing import Process
def foo(i):
    print(hi, i)
#在Windows中不能运行,只能加这句用来调试进程,正式项目不能用
if __name__ == __main__:
    for i in range(10):
        p = Process(target=foo, args=(i,))
        p.start()

2、进程数据共享

# 方法一,Array
from multiprocessing import Process, Array
def Foo(i,arg):
    arg[i] = 100 + i
    for item in arg:
        print(i, ---->, item)
    print("========")
if __name__ == __main__:
    temp = Array(i, 5)#数组必须指定类型和大小
    for i in range(5):
        p = Process(target=Foo, args=(i,temp,))
        p.start()
#方法二:manage.dict()共享数据
from multiprocessing import Process, Manager
def Foo(i,arg):
    arg[i] = 100 + i
    print(arg.values())
if __name__ == __main__:
    manage = Manager()
    dic = manage.dict()
    for i in range(5):
        p = Process(target=Foo, args=(i,dic,))
        p.start()
# 一定要加上,否则主进程执行完毕就断开连接,子进程执行完毕后找不到连接了
        p.join()
#Array类型对应表
# ‘c‘: ctypes.c_char,  ‘u‘: ctypes.c_wchar,
# ‘b‘: ctypes.c_byte,  ‘B‘: ctypes.c_ubyte,
# ‘h‘: ctypes.c_short, ‘H‘: ctypes.c_ushort,
# ‘i‘: ctypes.c_int,   ‘I‘: ctypes.c_uint,
# ‘l‘: ctypes.c_long,  ‘L‘: ctypes.c_ulong,
# ‘f‘: ctypes.c_float, ‘d‘: ctypes.c_double

3、进程锁

#进程里的单程锁,连环锁,事件,信号量,和线程里的用法一样
from multiprocessing import Lock,RLock,Event,BoundedSemaphore

4、进程池

  • 进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

  • 进程池中有两个方法:apply()、apply_async()

from multiprocessing import Pool
import time
def Foo(i):
    time.sleep(1)
    print(i+1)
if __name__ == "__main__":
    pool = Pool(5)
    for i in range(30):
        #pool.apply(Foo,(i,))#所有进程串行执行,一个接一个
        pool.apply_async(func=Foo, args=(i,))#异步执行
    #pool.terminate()#无论当前进程是否执行完毕,直接关闭
    pool.close()# 所有进程执行完毕后再关闭
    pool.join() #进程池的join前面必须加上close和terminate其中一个

5、总结:IO密集型使用多线程,因为不调用CPU;计算密集型使用多进程,需要调用CPU;

四、协程(gevent / greenlet

1、线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员

2、协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

3、协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

4、greenlet是python的一个C扩展,来源于Stackless python,旨在提供可自行调度的‘微线程’, 即协程;greenlet用switch来表示协程的切换,从一个协程切换到另一个协程需要显式指定。

from greenlet import greenlet
def test1():
    print(12)
    gr2.switch()
    print(34)
    gr2.switch()
def test2():
    print(56)
    gr1.switch()
    print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
#12,56,34,78

5、gevent是第三方库,通过greenlet实现协程,其基本思想是:

  • greenlet可以实现协程,不过每一次都要人为的去指向下一个该执行的协程,显得太过麻烦。python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent

  • 当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

  • 由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:

from gevent import monkey;monkey.patch_all()
import gevent
import requests
def f(url):
    print(GET:%s%url)
    response = requests.get(url)
    data = response.text
    print("%d bytes received from %s."%(len(data),url))
gevent.joinall([
    gevent.spawn(f,"http://www.baidu.com/"),
    gevent.spawn(f,"http://www.yahoo.com/"),
    gevent.spawn(f,"http://www.python.org")
])
# GET:http://www.baidu.com/
# GET:http://www.yahoo.com/
# GET:http://www.python.org
# 2381 bytes received from http://www.baidu.com/.
# 48970 bytes received from http://www.python.org.
# 529733 bytes received from http://www.yahoo.com/.

 

以上是关于线程进程和协程的主要内容,如果未能解决你的问题,请参考以下文章

进程线程和协程区别

python之线程进程和协程

python线程进程和协程

进程线程和协程的区别(转)

进程线程和协程的理解

[转帖]进程线程和协程之间的区别和联系