并发编程重点:
1
2
3
4
5
6
7
|
并发编程:线程、进程、队列、IO多路模型 操作系统工作原理介绍、线程、进程演化史、特点、区别、互斥锁、信号、 事件、join、GIL、进程间通信、管道、队列。 生产者消息者模型、异步模型、IO多路复用模型、select\\poll\\epoll 高性 能IO模型源码实例解析、高并发FTP server开发 |
1、请写一个包含10个线程的程序,主线程必须等待每一个子线程执行完成之后才结束执行,每一个子线程执行的时候都需要打印当前线程名、当前活跃线程数量;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
from threading import Thread,currentThread,activeCount import time def task(n): print ( \'线程名:%s----%s\' % (currentThread().name,n)) time.sleep( 1 ) print ( \'数量:%s\' % activeCount()) if __name__ = = "__main__" : t_li = [] for i in range ( 10 ): t = Thread(target = task,args = (i,)) t.start() t_li.append(t) for t in t_li: t.join() print ( \'主,end----\' ) |
2、请写一个包含10个线程的程序,并给每一个子线程都创建名为"name"的线程私有变量,变量值为“james”;
1
2
3
4
5
6
7
8
9
10
11
|
from threading import Thread def task(name): print ( \'%s is running\' % name) print ( \'end ---\' ) if __name__ = = "__main__" : for i in range ( 10 ): t = Thread(target = task,args = ( \'james_%s\' % i,)) t.start() print ( \'主 end ---\' ) |
3、请使用协程写一个消费者生产者模型;
协程知识点:https://www.cnblogs.com/wj-1314/p/9040121.html
协程:单线程下,无法利用多核,可以是一个程序开多个进程,每个进程开启多个线程,每隔线程开启协程;
协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
def consumer(): while True : x = yield print ( \'消费:\' , x) def producter(): c = consumer() next (c) for i in range ( 10 ): print ( \'生产:\' , i) c.send(i) producter() |
4、写一个程序,包含十个线程,子线程必须等待主线程sleep 10秒钟之后才执行,并打印当前时间;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from threading import Thread,Event import time import datetime def task(): # while not event.is_set(): # print(\'...\') print ( \'...\' ) event.wait( 10 ) print ( \'time:\' ,datetime.datetime.now()) if __name__ = = \'__main__\' : event = Event() for i in range ( 10 ): t = Thread(target = task) t.start() time.sleep( 10 ) event. set () |
5、写一个程序,包含十个线程,同时只能有五个子线程并行执行;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
from threading import Thread,Semaphore,currentThread import time def task(n): sm.acquire() print ( \'%s---\' % n,currentThread().name) time.sleep( 1 ) print ( \'end----\' ) sm.release() if __name__ = = \'__main__\' : sm = Semaphore( 5 ) for i in range ( 10 ): t = Thread(target = task,args = (i,)) t.start() |
6、写一个程序 ,包含一个名为hello的函数,函数的功能是打印字符串“Hello, World!”,该函数必须在程序执行30秒之后才开始执行(不能使用time.sleep());
1
2
3
4
5
6
7
|
from threading import Timer def hello(name): print ( \'%s say \' % name, \'Hello World!\' ) if __name__ = = "__main__" : t = Timer( 5 ,hello,args = ( \'james\' ,)) t.start() |
7、写一个程序,利用queue实现进程间通信;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
from multiprocessing import Process,Queue,current_process import time def consumer(q): while True : res = q.get() if not res: break print ( \'消费了:\' ,res, \'--\' ,current_process().name) def producter(q): for i in range ( 5 ): print ( \'生产:\' ,i) time.sleep( 1 ) q.put(i) if __name__ = = "__main__" : q = Queue() p1 = Process(target = producter,args = (q,)) c1 = Process(target = consumer,args = (q,)) c2 = Process(target = consumer,args = (q,)) p1.start() c1.start() c2.start() p1.join() q.put( None ) q.put( None ) print ( \'主\' ) # JoinableQueue from multiprocessing import Process,JoinableQueue,current_process import time def consumer(q): while True : res = q.get() print ( \'消费了:\' ,res, \'--\' ,current_process().name) q.task_done() def producter(q): for i in range ( 5 ): print ( \'生产:\' ,i, \'--\' ,current_process().name) time.sleep( 1 ) q.put(i) q.join() if __name__ = = "__main__" : q = JoinableQueue() p1 = Process(target = producter,args = (q,)) p2 = Process(target = producter, args = (q,)) c1 = Process(target = consumer,args = (q,)) c2 = Process(target = consumer,args = (q,)) p1.start() p2.start() c1.daemon = True c2.daemon = True c1.start() c2.start() p1.join() p2.join() print ( \'主\' ) |
8、写一个程序,利用pipe实现进程间通信;
1
2
3
4
5
6
7
8
9
10
11
|
from multiprocessing import Process,Pipe def task(conn): conn.send( \'hello world\' ) conn.close() if __name__ = = "__main__" : parent_conn,child_conn = Pipe() p = Process(target = task,args = (child_conn,)) p.start() p.join() print (parent_conn.recv()) |
9、使用selectors模块创建一个处理客户端消息的服务器程序;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
# server blocking IO import socket server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.bind(( \'127.0.0.1\' , 8080 )) server.listen( 5 ) while True : conn,addr = server.accept() print (addr) while True : try : data = conn.recv( 1024 ) if not data: break conn.send(data.upper()) except Exception as e: print (e) break # server IO多路复用 selectors 会根据操作系统选择select poll epoll import socket import selectors sel = selectors.DefaultSelector() def accept(server_fileobj,mask): conn,addr = server_fileobj.accept() print (addr) sel.register(conn,selectors.EVENT_READ,read) def read(conn,mask): try : data = conn.recv( 1024 ) if not data: print ( \'closing..\' ,conn) sel.unregister(conn) conn.close() return conn.send(data.upper()) except Exception: print ( \'closeing...\' ,conn) sel.unregister(conn) conn.close() server_fileobj = socket.socket(socket.AF_INET,socket.SOCK_STREAM) server_fileobj.bind(( \'127.0.0.1\' , 8080 )) server_fileobj.listen( 5 ) server_fileobj.setblocking( False ) sel.register(server_fileobj,selectors.EVENT_READ,accept) while True : events = sel.select() for sel_obj,mask in events: callback = sel_obj.data callback(sel_obj.fileobj,mask) # client # -*- coding:utf-8 -*- import socket client = socket.socket(socket.AF_INET,socket.SOCK_STREAM) client.connect(( \'127.0.0.1\' , 8080 )) while True : msg = input ( \'>>>:\' ).strip() if not msg: continue client.send(msg.encode( \'utf-8\' )) data = client.recv( 1024 ) print (data.decode( \'utf-8\' )) |
10、使用socketserver创建服务器程序时,如果使用fork或者线程服务器,一个潜在的问题是,恶意的程序可能会发送大量的请求导致服务器崩溃,请写一个程序,避免此类问题;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
# server socketserver 模块内部使用IO |