Python并发复习

Posted Geoffrey_one

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发复习相关的知识,希望对你有一定的参考价值。

一、多线程的调用

threading 模块建立在thread 模块之上。thread模块以低级、原始的方式来处理和控制线程,而threading 模块通过对thread进行二次封装,

提供了更方便的api来处理线程。

多线程的调用有两种方式,函数式继承式

import threading
import time
 
def sayhi(num): #定义每个线程要运行的函数
 
    print("running on number:%s" %num)
 
    time.sleep(3)
 
if __name__ == \'__main__\':
 
    t1 = threading.Thread(target=sayhi,args=(1,)) #生成一个线程实例
    t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一个线程实例
 
    t1.start() #启动线程
    t2.start() #启动另一个线程

 

import threading
import time


class MyThread(threading.Thread):
    def __init__(self,num):
        super().__init__
        self.num = num

    def run(self):#定义每个线程要运行的函数

        print("running on number:%s" %self.num)

        time.sleep(3)

if __name__ == \'__main__\':

    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

 二、 阻塞线程和守护线程

join():

  在子线程完成运行之前,这个子线程的父线程将一直被阻塞。

setDaemon(True):

         将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。

         当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成

         想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程

         完成了,不管子线程是否完成,都要和主线程一起退出,

 1 #!/usr/bin/env python
 2 # encoding: utf-8
 3 
 4 """
 5 @version: python37
 6 @author: Geoffrey
 7 @file: 多线程复习.py
 8 @time: 18-10-27 上午9:48
 9 """
10 
11 import threading
12 from time import ctime,sleep
13 
14 
15 def func(name, i):
16         print (f"开始第{i}个线程 --- {name} {ctime()}")
17         sleep(3)
18         print(f"停止 第{i}个线程 {ctime}")
19 
20 
21 threads = []
22 
23 t0 = threading.Thread(target=func,args=(\'Julia\',0))
24 t1 = threading.Thread(target=func,args=(\'Python\',1))
25 t2 = threading.Thread(target=func,args=(\'C++\',2))
26 t3 = threading.Thread(target=func,args=(\'Java\',3))
27 
28 threads.append(t0)
29 threads.append(t1)
30 threads.append(t2)
31 threads.append(t3)
32 
33 if __name__ == \'__main__\':
34 
35     for i,t in enumerate(threads):
36 37         t.setDaemon(bool(i))# 将线程声明为守护线程,必须在start() 方法调用之前设置,如果不设置为守护线程程序会被无限挂起。
38         t.start()
39         # t.join() # 设置为阻塞线程
40 
41     print (" --------- 主线程执行完成 %s" %ctime())

不设置守护线程,默认是非守护线程。

如果t1,t2,t3设置设置为守护线程,执行结果为在主线程执行完毕后,他们是否执行完成不能保证,随着主进程结束而结束。

 

 如果设置全部为阻塞线程,结果为轮流执行,相当于单线程。

三、互斥锁,GIL

1、Cpython的GIL解释器锁的工作机制   

         在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。Python同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。CPython是Python的一种,GIL不是Python语言的缺陷。所以明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

1.2 GIL的介绍

         GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test1.py,python test2.py,python test3.py会产生3个不同的python进程。

 

1.3 GIL原理   


 

第一点:
所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
第二点:
所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。
   
总结:

解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,即GIL,保证python解释器同一时间只能执行一个任务的代码。

 

 

1.4 GIL和Lock 

锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据。保护不同的数据就应该加不同的锁。所以,解释就是GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock。如下图:


1、100个线程去抢GIL锁,即抢执行权限

2、肯定有一个线程先抢到GIL(暂且称为线程1),然后开始执行,一旦执行就会拿到lock.acquire()

3、极有可能线程1还未运行完毕,就有另外一个线程2抢到GIL,然后开始运行,但线程2发现互斥锁lock还未被线程1释放,于是阻塞,被迫交出执行权限,即释放GIL

4、直到线程1重新抢到GIL,开始从上次暂停的位置继续执行,直到正常释放互斥锁lock,然后其他的线程再重复2 3 4的过程

 

四 、线程死锁和递归锁 

如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源都正在使用,所有这两个线程在无外力作用下将一直等待下去。

 1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lockA.acquire()
 6         print(self.name,"获得锁A",time.ctime())
 7         time.sleep(3)
 8         lockB.acquire()
 9         print(self.name,"获得锁B",time.ctime())
10         lockB.release()
11         print(self.name,\'释放锁B\')
12         lockA.release()
13         print(self.name,\'释放锁A\')
14 
15     def doB(self):
16         lockB.acquire()
17         print(self.name,"获得锁B",time.ctime())
18         time.sleep(2)
19         lockA.acquire()
20         print(self.name,"获得锁A",time.ctime())
21         lockA.release()
22         print(self.name,\'释放锁A\')
23         lockB.release()
24         print(self.name,\'释放锁B\')
25 
26     def run(self):
27         self.doA()
28         self.doB()
29 if __name__=="__main__":
30 
31     lockA=threading.Lock()
32     lockB=threading.Lock()
33     threads=[]
34 
35     for i in range(5):
36         threads.append(myThread())
37     for t in threads:
38         t.start()
39     for t in threads:
40         t.join()

 

解决办法:使用递归锁RLOCK。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次acquire。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

 五、生产者消费者模型

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而是交给前台,而客户去饭菜也不需要不找厨师,直接去前台领取即可,这也是一个解耦的过程。

使用队列:

 1 #!/usr/bin/env python
 2 # encoding: utf-8
 3 
 4 """
 5 @version: python37
 6 @author: Geoffrey
 7 @file: 生产者消费者模型.py
 8 @time: 18-10-27 下午7:45
 9 """
10 
11 import time,random
12 import queue,threading
13 
14 def Producer(name):
15   count = 0
16   while count <10:
17     # print("生产中........")
18     time.sleep(random.randrange(3))
19     q.put(count)
20     print(f\'生产者 {name} 生产了 {count} 号包子...\')
21     count += 1
22 
23 def Consumer(name):
24   count = 0
25   while count <10:
26     time.sleep(random.randrange(4))
27     if not q.empty():
28         data = q.get()
29         print(f\'\\033[32;1m消费者 {name} 吃掉了地 {data} 号包子...\\033[0m\' )
30     else:
31         print("-----没有包子了----")
32     count +=1
33 
34 if __name__ == \'__main__\':
35     q = queue.Queue()
36 
37     p1 = threading.Thread(target=Producer, args=(\'A\',))
38     c1 = threading.Thread(target=Consumer, args=(\'B\',))
39     c2 = threading.Thread(target=Consumer, args=(\'C\',))
40     c3 = threading.Thread(target=Consumer, args=(\'D\',))
41     p1.start()
42     c1.start()
43     c2.start()
44     c3.start()

不使用队列:

  1 #!/usr/bin/env python
  2 # encoding: utf-8
  3 
  4 """
  5 @version: python37
  6 @author: Geoffrey
  7 @file: 生产者消费者_condition.py
  8 @time: 18-10-27 下午8:03
  9 """
 10 
 11 import threading
 12 import time
 13 
 14 # 商品
 15 product = None
 16 # 条件变量
 17 con = threading.Condition()
 18 
 19 # 生产者方法
 20 def produce():
 21     global product
 22     product = 0
 23 
 24     if con.acquire():
 25         while True:
 26             if product < 5:
 27                 product += 1
 28                 print (f\'生产出商品{product}\')
 29 
 30             else:
 31                 # 通知消费者,商品已经生产
 32                 con.notify()
 33                 # 等待通知
 34                 con.wait()
 35             time.sleep(0.5)
 36 
 37 # 消费者方法
 38 def consume():
 39     global product
 40     product = 0
 41 
 42     if con.acquire():
 43         while True:
 44             if product > 0:
 45                 print (f\'消费了第{product}商品\')
 46                 product -= 1
 47             else:
 48                 # 通知生产者,商品已经没了
 49                 con.notify()
 50 
 51                 # 等待通知
 52                 con.wait()
 53             time.sleep(0.5)
 54 
 55 t1 = threading.Thread(target=produce)
 56 t2 = threading.Thread(target=consume)
 57 
 58 t1.start()
 59 t2.start()

 

以上是关于Python并发复习的主要内容,如果未能解决你的问题,请参考以下文章

python 复习—并发编程实战——并发编程总结

Python并发复习3 - 多进程模块 multiprocessing

python 复习—并发编程——线程锁threading.local线程池生产者消费者模型线程安全

Python并发复习4- concurrent.futures模块(线程池和进程池)

python 复习—并发编程系统并发线程和进程协程GIL锁CPU/IO密集型计算

python 复习—并发编程——IO多路复用协程