并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制

Posted Alice的小屋

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制相关的知识,希望对你有一定的参考价值。

1.线程queue :会有锁
q=queue.Queue(3)
q.get()
q.put()

先进先出 队列
后进先出 堆栈
优先级队列
 1 """先进先出 队列"""
 2 import queue
 3 q=queue.Queue(3) #先进先出->队列
 4 
 5 q.put(\'first\')
 6 q.put(2)
 7 # q.put(\'third\')
 8 # q.put(4)
 9 q.put(4,block=False) #q.put_nowait(4)
10 # q.put_nowait(4)
11 # q.put(4,block=True)  # True 阻塞 False 不阻塞 直接告诉你 队列满了
12 # q.put(4,block=True,timeout=3) # 阻塞等待3秒 还没有拿走数据就抛异常
13 #
14 print(q.get())
15 print(q.get())
16 print(q.get())
17 print(q.get(block=True,timeout=2))    # false 不阻塞没有数据就抛异常  默认是阻塞 block=True
18 print(q.get_nowait()) # 相当于block=false
19 # def get(self, block=True, timeout=None):
20 
21 
22 """后进先出 堆栈"""
23 import queue
24 q=queue.LifoQueue(3)  #后进先出->堆栈
25 q.put(\'first\')
26 q.put(2)
27 q.put(\'third\')
28 
29 print(q.get())
30 print(q.get())
31 print(q.get())
32 
33 """优先级队列 """
34 import queue
35 q=queue.PriorityQueue(3) #优先级队列
36 
37 q.put((10,{\'alice\':12}))  # 数字越小 优先级越高 优先拿出来
38 q.put((40,\'two\'))
39 q.put((30,\'three\'))
40 
41 print(q.get())
42 print(q.get())
43 print(q.get())
2.线程池进程池:
client server 是IO 操作应该用多线程
计算密集型: 用多进程
io密集型:用多线程

池:对数目加以限制,保证机器正常运行
 1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 import os,time,random
 3 
 4 def task(name):
 5     print(\'name:%s pid:%s run\' %(name,os.getpid()))
 6     time.sleep(random.randint(1,3))
 7 
 8 
 9 if __name__ == \'__main__\':
10     pool=ProcessPoolExecutor(4)  # 不指定 默认是cpu的核数
11     # pool=ThreadPoolExecutor(5)
12 
13     for i in range(10):
14         pool.submit(task,\'egon%s\' %i)  # 异步调用池子收了10个任务,但同一时间只有4个任务在进行
15 
16     pool.shutdown(wait=True)  # 类似join  代表往池子里面丢任务的入口封死了 计数器-1
17 
18 
19     print(\'\')
20 """
21 主                         # # 异步调用池子收了10个任务,但同一时间只有4个任务在进行
22 name:egon0 pid:60056 run     # 只有4个pid
23 name:egon1 pid:64700 run
24 name:egon2 pid:59940 run
25 name:egon3 pid:60888 run
26 
27 name:egon4 pid:60888 run
28 
29 name:egon5 pid:60056 run
30 name:egon6 pid:60888 run
31 
32 name:egon7 pid:60056 run
33 name:egon8 pid:64700 run
34 name:egon9 pid:59940 run 
35 """
36 # pool.shutdown(wait=True) # 代表往池子里面丢任务的入口封死了 计数器-1
37 """
38 name:egon0 pid:57124 run
39 name:egon1 pid:62252 run
40 name:egon2 pid:55736 run
41 name:egon3 pid:62060 run
42 name:egon4 pid:57124 run
43 name:egon5 pid:62252 run
44 name:egon6 pid:55736 run
45 name:egon7 pid:55736 run
46 name:egon8 pid:62060 run
47 name:egon9 pid:55736 run
48 49 """
50 
51 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
52 from threading import currentThread
53 import os,time,random
54 
55 def task():
56     print(\'name:%s pid:%s run\' %(currentThread().getName(),os.getpid()))
57     time.sleep(random.randint(1,3))
58 
59 
60 if __name__ == \'__main__\':
61     pool=ThreadPoolExecutor(5)
62 
63     for i in range(10):
64         pool.submit(task)
65 
66     pool.shutdown(wait=True)
67 
68 
69     print(\'\')
70 """
71 name:ThreadPoolExecutor-0_0 pid:61508 run
72 name:ThreadPoolExecutor-0_1 pid:61508 run
73 name:ThreadPoolExecutor-0_2 pid:61508 run
74 name:ThreadPoolExecutor-0_3 pid:61508 run
75 name:ThreadPoolExecutor-0_4 pid:61508 run
76 name:ThreadPoolExecutor-0_2 pid:61508 run
77 name:ThreadPoolExecutor-0_4 pid:61508 run
78 name:ThreadPoolExecutor-0_0 pid:61508 run
79 name:ThreadPoolExecutor-0_3 pid:61508 run
80 name:ThreadPoolExecutor-0_1 pid:61508 run
81 82 """
3.异步调用与回调机制:
提交任务的两种方式:
同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行,效率低
异步调用:提交完任务后,不等待任务执行完毕。异步调用+回调机制 自动触发叫回调
 1 """同步调用"""
 2 from concurrent.futures import ThreadPoolExecutor
 3 import time
 4 import random
 5 
 6 def la(name):
 7     print(\'%s is laing\' %name)
 8     time.sleep(random.randint(3,5))
 9     res=random.randint(7,13)*\'#\'
10     return {\'name\':name,\'res\':res}
11 
12 def weigh(shit):
13     name=shit[\'name\']
14     size=len(shit[\'res\'])
15     print(\'%s 拉了 《%s》kg\' %(name,size))
16 
17 
18 if __name__ == \'__main__\':
19     pool=ThreadPoolExecutor(13)
20 
21     shit1=pool.submit(la,\'alex\').result()
22     weigh(shit1)
23 
24     shit2=pool.submit(la,\'wupeiqi\').result()
25     weigh(shit2)
26 
27     shit3=pool.submit(la,\'yuanhao\').result()
28     weigh(shit3)
29 
30 
31 """异步调用 + 回调机制  自动触发叫回调"""
32 from concurrent.futures import ThreadPoolExecutor
33 import time
34 import random
35 
36 def la(name):
37     print(\'%s is laing\' %name)
38     time.sleep(random.randint(3,5))
39     res=random.randint(7,13)*\'#\'
40     return {\'name\':name,\'res\':res}
41     # weigh({\'name\':name,\'res\':res})  # 这样写不好  所有功能 写在一起了
42 
43 
44 def weigh(shit):
45     shit=shit.result()  # 拿到是 对象 需要result()
46     name=shit[\'name\']
47     size=len(shit[\'res\'])
48     print(\'%s 拉了 《%s》kg\' %(name,size))
49 
50 
51 if __name__ == \'__main__\':
52     pool=ThreadPoolExecutor(13)
53 
54     # pool.submit(la, \'alex\')
55     # pool.submit(la, \'wupeiqi\')
56     # pool.submit(la, \'yuanhao\')
57 
58     pool.submit(la,\'alex\').add_done_callback(weigh) # 实现了程序的解耦合
59     pool.submit(la,\'wupeiqi\').add_done_callback(weigh)
60     pool.submit(la,\'yuanhao\').add_done_callback(weigh)
4.异步调用与回调机制应用:
pip3 install requests
requests

异步调用+回调机制的 应用场景:
from concurrent.futures import ThreadPoolExecutor
import requests
import time

def get(url):   # io操作  基于线程 数目有限 用线程池
    print(\'GET %s\' %url)
    response=requests.get(url)
    time.sleep(3)
    return {\'url\':url,\'content\':response.text}


def parse(res):
    res=res.result()
    print(\'%s parse res is %s\' %(res[\'url\'],len(res[\'content\'])))


if __name__ == \'__main__\':
    urls=[
        \'http://www.cnblogs.com/linhaifeng\',
        \'https://www.python.org\',
        \'https://www.openstack.org\',
    ]

    pool=ThreadPoolExecutor(2)

    for url in urls:
        pool.submit(get,url).add_done_callback(parse)

以上是关于并发编程 - 线程 - 1.线程queue/2.线程池进程池/3.异步调用与回调机制的主要内容,如果未能解决你的问题,请参考以下文章

并发编程多线程基础

C++并发编程----实现线无锁线程安全的数据结构(《C++ Concurrency in Action》 读书笔记)

java并发编程之美-阅读记录1

Java多线程编程——对象及变量的并发访问

Java多线程编程——对象及变量的并发访问

Java 并发编程线程简介 ( 并发类型 | 线程状态 | CPU 数据缓存 )