python 闯关之路四(下)(并发编程与数据库编程) 并发编程重点

Posted andyzhang-

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 闯关之路四(下)(并发编程与数据库编程) 并发编程重点相关的知识,希望对你有一定的参考价值。

python 闯关之路四(下)(并发编程与数据库编程)

 

并发编程重点:

1
2
3
4
5
6
7
并发编程:线程、进程、队列、IO多路模型
 
操作系统工作原理介绍、线程、进程演化史、特点、区别、互斥锁、信号、
事件、join、GIL、进程间通信、管道、队列。
 
生产者消息者模型、异步模型、IO多路复用模型、select\\poll\\epoll 高性
能IO模型源码实例解析、高并发FTP server开发

1、请写一个包含10个线程的程序,主线程必须等待每一个子线程执行完成之后才结束执行,每一个子线程执行的时候都需要打印当前线程名、当前活跃线程数量;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from threading import Thread,currentThread,activeCount
import time
def task(n):
    print(\'线程名:%s----%s\'%(currentThread().name,n))
    time.sleep(1)
    print(\'数量:%s\'%activeCount())
     
if __name__ == "__main__":
    t_li = []
    for in range(10):
        = Thread(target=task,args=(i,))
        t.start()
        t_li.append(t)
    for in t_li:
        t.join()
         
    print(\'主,end----\')

2、请写一个包含10个线程的程序,并给每一个子线程都创建名为"name"的线程私有变量,变量值为“james”;

1
2
3
4
5
6
7
8
9
10
11
from threading import Thread
def task(name):
    print(\'%s is running\'%name)
    print(\'end ---\')
     
if __name__ == "__main__":
    for in range(10):
        = Thread(target=task,args=(\'james_%s\'%i,))
        t.start()
         
    print(\'主 end ---\')

3、请使用协程写一个消费者生产者模型;

协程知识点:https://www.cnblogs.com/wj-1314/p/9040121.html

协程:单线程下,无法利用多核,可以是一个程序开多个进程,每个进程开启多个线程,每隔线程开启协程;

  协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
def consumer():
    while True:
        = yield
        print(\'消费:\', x)
         
def producter():
    = consumer()
    next(c)
    for in range(10):
        print(\'生产:\', i)
        c.send(i)
         
producter()

4、写一个程序,包含十个线程,子线程必须等待主线程sleep 10秒钟之后才执行,并打印当前时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from threading import Thread,Event
import time
import datetime
def task():
    # while not event.is_set():
    #     print(\'...\')
    print(\'...\')
    event.wait(10)
    print(\'time:\',datetime.datetime.now())
     
if __name__ == \'__main__\':
    event = Event()
    for in range(10):
        = Thread(target=task)
        t.start()
         
    time.sleep(10)
    event.set()

5、写一个程序,包含十个线程,同时只能有五个子线程并行执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from threading import Thread,Semaphore,currentThread
import time
def task(n):
    sm.acquire()
    print(\'%s---\'%n,currentThread().name)
    time.sleep(1)
    print(\'end----\')
    sm.release()
     
if __name__ == \'__main__\':
    sm = Semaphore(5)
    for in range(10):
        = Thread(target=task,args=(i,))
        t.start()

6、写一个程序 ,包含一个名为hello的函数,函数的功能是打印字符串“Hello, World!”,该函数必须在程序执行30秒之后才开始执行(不能使用time.sleep());

1
2
3
4
5
6
7
from threading import Timer
def hello(name):
    print(\'%s say \'%name,\'Hello World!\')
     
if __name__ == "__main__":
    = Timer(5,hello,args=(\'james\',))
    t.start()

7、写一个程序,利用queue实现进程间通信;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from multiprocessing import Process,Queue,current_process
import time
def consumer(q):
    while True:
        res = q.get()
        if not res:break
        print(\'消费了:\',res,\'--\',current_process().name)
         
def producter(q):
    for in range(5):
        print(\'生产:\',i)
        time.sleep(1)
        q.put(i)
         
if __name__ == "__main__":
    = Queue()
    p1 = Process(target=producter,args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    p1.start()
    c1.start()
    c2.start()
   
    p1.join()
    q.put(None)
    q.put(None)
    print(\'主\')
     
# JoinableQueue
from multiprocessing import Process,JoinableQueue,current_process
import time
def consumer(q):
    while True:
        res = q.get()
        print(\'消费了:\',res,\'--\',current_process().name)
        q.task_done()
         
def producter(q):
    for in range(5):
        print(\'生产:\',i,\'--\',current_process().name)
        time.sleep(1)
        q.put(i)
    q.join()
     
if __name__ == "__main__":
    = JoinableQueue()
    p1 = Process(target=producter,args=(q,))
    p2 = Process(target=producter, args=(q,))
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
     
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()
     
    p1.join()
    p2.join()
    print(\'主\')

8、写一个程序,利用pipe实现进程间通信;

1
2
3
4
5
6
7
8
9
10
11
from multiprocessing import Process,Pipe
def task(conn):
    conn.send(\'hello world\')
    conn.close()
     
if __name__ == "__main__":
    parent_conn,child_conn = Pipe()
    = Process(target=task,args=(child_conn,))
    p.start()
    p.join()
    print(parent_conn.recv())

9、使用selectors模块创建一个处理客户端消息的服务器程序;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# server  blocking IO 
import socket
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.bind((\'127.0.0.1\',8080))
server.listen(5)
while True:
    conn,addr = server.accept()
    print(addr)
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except Exception as e:
            print(e)
            break
             
# server  IO多路复用 selectors 会根据操作系统选择select poll epoll          
import socket
import selectors
sel = selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr = server_fileobj.accept()
    print(addr)
    sel.register(conn,selectors.EVENT_READ,read)
     
def read(conn,mask):
    try:
        data = conn.recv(1024)
        if not data:
            print(\'closing..\',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper())
    except Exception:
        print(\'closeing...\',conn)
        sel.unregister(conn)
        conn.close()
         
server_fileobj = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server_fileobj.bind((\'127.0.0.1\',8080))
server_fileobj.listen(5)
server_fileobj.setblocking(False)
sel.register(server_fileobj,selectors.EVENT_READ,accept)
while True:
    events = sel.select()
    for sel_obj,mask in events:
        callback = sel_obj.data
        callback(sel_obj.fileobj,mask)
         
# client
# -*- coding:utf-8 -*-
import socket
client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
client.connect((\'127.0.0.1\',8080))
while True:
    msg = input(\'>>>:\').strip()
    if not msg:continue
    client.send(msg.encode(\'utf-8\'))
    data = client.recv(1024)
    print(data.decode(\'utf-8\'))
 

10、使用socketserver创建服务器程序时,如果使用fork或者线程服务器,一个潜在的问题是,恶意的程序可能会发送大量的请求导致服务器崩溃,请写一个程序,避免此类问题;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# server socketserver 模块内部使用IO

以上是关于python 闯关之路四(下)(并发编程与数据库编程) 并发编程重点的主要内容,如果未能解决你的问题,请参考以下文章

python 闯关之路一(语法基础)

百万年薪python之路 -- 并发编程之 多线程 二

Python之路--Python基础12--并发编程之协程

python之路 -- 并发编程之线程

百万年薪python之路 -- 并发编程之 多线程 一

Python之路--Python基础

(c)2006-2024 SYSTEM All Rights Reserved IT常识