进程通信和线程通信
进程间通信:
进程是拥有独立空间的,如果开多个进程对某个数据进行处理,由于进程是独立空间,那么两个进程如何通信拥有共同
空间呢?我们可以在父进程里启动一个服务器进程开辟一个公共空间。开启之后在子进程处理数据,父进程也会出现相应的
效果。Manager 是专门用来做共享的!
1 from multiprocessing import Process, Manager
2 mgr = Manager()
3 d = mgr.dict()
4 def func(d):
5 d[‘a‘] = ‘a‘
6 print(d)
7 p = Process(target=func, args=(d,))
8 p.start()
9 p.join()
10 print(d)
11 # 运行结果:
12 {‘a‘: ‘a‘}
13 {‘a‘: ‘a‘}
使用Manager之后会失效资源共享,那么出现资源竞争怎么办?(参照线程资源竞争)我们可以使用队列,它默认使用了
锁的功能!使用会非常方便。
1 from multiprocessing import Process, Queue
2 q = Queue()
3 q.put(1)
4
5
6 def func(q):
7 print(q.get())
8 q.put(2)
9
10
11 p = Process(target=func, args=(q,))
12 p.start()
13 p.join()
14 print(q.get())
15 # 返回结果:(表明队列是共有资源,可以共享)
16 1
17 2
如果直接用 import queue 发现资源并没有共享!
线程间通信:
线程间是共享空间的,每个线程对数据的修改都会起作用。如果两个线程同时修改,那么如果线程一还没有修改完,线程
二就开始修改,这样就会出问题,这个时候就需要线程锁来解决,类似于数据库里的原子操作(最小的操作单元,必须执行完才
能执行其他的)。
1 from threading import Thread
2 a = 0
3 n = 10000000
4 def incr(n):
5 global a
6 for i in range(n):
7 a += 1
8 def dncr(n):
9 global a
10 for i in range(n):
11 a -= 1
12 t1 = Thread(target=incr, args=(n,))
13 t2 = Thread(target=dncr, args=(n,))
14 t1.start()
15 t2.start()
16 t1.join()
17 t2.join()
18 print(a)
19 # 运行多次的结果:
20 -288397 -122765 -1761997
可以看出上面的代码出现了资源竞争的情况导致错误执行,我们使用锁来控制资源共享:
1 from threading import Thread, Lock
2
3 a = 0
4 n = 10000000
5 lock = Lock() # 创建一把锁
6
7 def incr(n):
8 global a
9 for i in range(n):
10 lock.acquire() # 获取一把锁
11 a += 1
12 lock.release() # 释放锁
13
14
15 def dncr(n):
16 global a
17 for i in range(n):
18 lock.acquire() # 获取一把锁
19 a -= 1
20 lock.release() # 释放锁
21
22
23 t1 = Thread(target=incr, args=(n,))
24 t2 = Thread(target=dncr, args=(n,))
25 t1.start()
26 t2.start()
27 t1.join()
28 t2.join()
29 print(a)
创建锁也可以使用with lock: 部分代码如下:运行结果都为0:
1 def incr(n):
2 global a
3 for i in range(n):
4 with lock:
5 a += 1
消费者模式和生产者模式:
生产者只关心队列是否已经满了,没有就生产,往队列里添加
消费者只关心队列是否为空,为空就阻塞
1 import threading
2 import random
3 import queue
4
5
6 class Producer(threading.Thread):
7
8 def __init__(self, queue):
9 super().__init__()
10 self.queue = queue
11
12 def run(self):
13 for i in range(10):
14 r = random.randint(0, 9)
15 if self.queue.qsize() < 3:
16 self.queue.put(r)
17 print(‘往队列里添加一个数据{}‘.format(r))
18
19
20 class Consumer(threading.Thread):
21
22 def __init__(self, queue):
23 super().__init__()
24 self.queue = queue
25
26 def run(self):
27 for i in range(10):
28 r = random.randint(0, 9)
29 if self.queue.empty():
30 data = self.queue.get()
31 print(‘从队列里get一个数据{}‘.format(data))
32
33
34 if __name__ == ‘__main__‘:
35 q = queue.Queue()
36 p1 = Producer(q)
37 c1 = Consumer(q)
38 p1.start()
39 c1.start()
40 p1.join()
41 c1.join()
42 q.join()
这里代码会阻塞,因为我们无法知道两个线程谁先执行,是没有规律的,线程p1几乎是瞬间将循环执行完,所以线程p1
只put了3个数据进去。线程c1在get数据的时候也只能取3个了,然后队列为空,会进去阻塞状态!可以对代码进行改进:
1 import threading
2 import random
3 import queue
4 import time
5
6
7 class Producer(threading.Thread):
8
9 def __init__(self, queue):
10 super().__init__()
11 self.queue = queue
12
13 def run(self):
14 while True:
15 r = random.randint(0, 9)
16 if not self.queue.full():
17 self.queue.put(r)
18 print(‘往队列里添加一个数据{}‘.format(r))
19 time.sleep(1)
20 else:
21 print(‘队列满了‘)
22
23
24 class Consumer(threading.Thread):
25
26 def __init__(self, queue):
27 super().__init__()
28 self.queue = queue
29
30 def run(self):
31 while True:
32 data = self.queue.get()
33 print(‘从队列里get一个数据{}‘.format(data))
34 time.sleep(1)
35
36
37 if __name__ == ‘__main__‘:
38 q = queue.Queue(5)
39 p1 = Producer(q)
40 c1 = Consumer(q)
41 p1.start()
42 c1.start()
43 p1.join()
44 c1.join()
45 q.join()
46
47 # 代码运行结果:
48 往队列里添加一个数据4
49 从队列里get一个数据4
50 往队列里添加一个数据9
51 从队列里get一个数据9
52 ^CTraceback (most recent call last):
53 File "/home/pyvip/tz_spider/通信/xianchengtongxin.py", line 73, in <module>
54 p1.join()
55 File "/usr/lib/python3.5/threading.py", line 1054, in join
56 self._wait_for_tstate_lock()
57 File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
58 elif lock.acquire(block, timeout):
59 KeyboardInterrupt
60 往队列里添加一个数据3
61 从队列里get一个数据3
62 往队列里添加一个数据7
63 从队列里get一个数据7
64 往队列里添加一个数据9
65 从队列里get一个数据9
66 往队列里添加一个数据9
67 从队列里get一个数据9
68 往队列里添加一个数据4
69 从队列里get一个数据4
70
71 Process finished with exit code -1
上面的代码是会一直运行的,这里是手动停止的结果!