进程队列补充socket实现服务器并发线程完结

Posted 天天向上的力量

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进程队列补充socket实现服务器并发线程完结相关的知识,希望对你有一定的参考价值。

1.队列补充

队列内部是管道+锁(数据在队列中是阻塞的)

2.关于python并发与并行的补充

解释型语言单个进程下多个线程不可以并行,但是向C语言等其他语言中在多核情况下是可以实现并行的,所有语言在单核下都是无法实现并行的,只能并发。

3.TCP服务端实现并发

#服务端
import socket
from threading import Thread



server = socket.socket()
server.bind(('127.0.0.1',6666))
server.listen(5)
def serv(conn,addr):
    while True:
        try:
            print(addr)
            rec_data = conn.recv(1024).decode('utf-8')
            print(rec_data)
            send_data = rec_data.upper()
            conn.send(send_data.encode('utf-8'))
        except Exception as e:
            print(e)
            break

while True:
    conn,addr = server.accept()
    t = Thread(target=serv,args=(conn,addr))
    t.start()
#客户端
import socket
import time

client = socket.socket()
client.connect(('127.0.0.1',6666))
while True:
    client.send(b'hello')
    data = client.recv(1024)
    print(data)
    time.sleep(1)

4.GIL全局解释器锁

在CPython中,全局解释器锁(即GIL)是一个互斥锁,可以防止一个进程中的多个线程同时(并行)执行。 锁定是必要的,主要是因为CPython的内存管理不是线程安全的。GIL的存在就是为了保证线程安全

什么是保证线程安全呢?

同一进程的所有线程都运行在一个进程内,毫无疑问这些线程具有以下几个特点:

1、所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
2、所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。
综上:

如果多个线程的target=work,那么执行流程是:

多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行

解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码。

技术图片

GIL与Lock

机智的同学可能会问到这个问题:Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock?

首先,我们需要达成共识:锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据

然后,我们可以得出结论:保护不同的数据就应该加不同的锁。

最后,问题就很明朗了,GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock,如下图

技术图片

注意:多个线程过来执行,一旦遇到IO操作就会立马释放GIL解释器锁,交个下一个先进来的线程。

在纯计算程序中GIL锁起到锁定python解释器的作用,就是一个线程抢到解释器后不会再有其他线程抢到解释器的使用权(直到这个程序遇到IO操作,这时GIL会释放解释器的使用权)。

import time
from threading import Thread,current_thread


number = 100

def task():
    global number
    number2 = number
    number = number2 - 1
    print(number,current_thread().name)

for line in range(100):
    t = Thread(target=task)
    t.start()
    
99 Thread-1
98 Thread-2
97 Thread-3
96 Thread-4
95 Thread-5
94 Thread-6
93 Thread-7
92 Thread-8
91 Thread-9
90 Thread-10
。
。
。

给这个程序加上IO操作看下打印结果:

import time
from threading import Thread,current_thread


number = 100

def task():
    global number
    number2 = number
    #print(number)
    time.sleep(1)
    number = number2 - 1
    #time.sleep(1)#如果sleep放在这里,则会打印结果都是0,这是因为线程间数据是共享的
    print(number,current_thread().name)

for line in range(100):
    t = Thread(target=task)
    t.start()
    
99 Thread-24
99 Thread-23
99 Thread-22
99 Thread-20
99 Thread-18
99 Thread-17
99 Thread-15
99 Thread-21
99 Thread-14
.
.
.

5.验证多线程的作用

什么时候使用多线程,什么时候使用多进程,多线程和多进程各有什么优缺点?

结论:在计算密集型程序中使用多进程,这时能够充分发挥计算机多核的优势;在IO密集型的程序中使用多线程,这时能够充分发挥多线程对CPU高校利用率的优势。高效执行多个进程,内有多个IO密集型程序,要使用多进程+多线程。

对结论的验证:

运算密集型操作验证:

from threading import Thread
from multiprocessing import Process
import os,time

#计算密集型多进程下运行
def work1():
    number = 0
    for line in range(50000000):
        number += 1


if __name__ == '__main__':
    #测试计算密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Process(target=work1)
        list1.append(p)
        p.start()
        #p.join()#join如果放在这里比在列表里执行速度慢,是因为在列表里是等所有的进程都起来之后再告诉系统要加join,而在第一个for循环里面则是每起一个进程都会告诉系统一次,这个过程需要时间。

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#12.24461030960083


#多线程下运行
from threading import Thread
from multiprocessing import Process
import os,time

#计算密集型
def work1():
    number = 0
    for line in range(50000000):
        number += 1
#IO密集型
def work2():
    time.sleep(1)

if __name__ == '__main__':
    #测试计算密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Thread(target=work1)
        list1.append(p)
        p.start()
        #p.join()#4.407487869262695

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#16.305360794067383

这里本人的测试结果是在计算的数据比较大时开启多进程才会有优势,如果运算数据比较小,开启多线程运算速度反而比开启多进程快得多。

IO密集型操作验证:

#多进程测试

from threading import Thread
from multiprocessing import Process
import os,time


#IO密集型
def work2():
    time.sleep(1)

if __name__ == '__main__':
    #测试IO密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Process(target=work2)
        list1.append(p)
        p.start()
        #p.join()#4.407487869262695

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#2.642327070236206


#多线程测试
from threading import Thread
from multiprocessing import Process
import os,time


#IO密集型
def work2():
    time.sleep(1)

if __name__ == '__main__':
    #测试IO密集型
    print(os.cpu_count())#打印CPU有几个内核
    start_time = time.time()
    list1 = []
    for line in range(4):
        p = Thread(target=work2)
        list1.append(p)
        p.start()
        #p.join()#4.407487869262695

    for p in list1:
        p.join()
    end_time = time.time()
    print(f'程序执行的时间{end_time - start_time}')
#1.0189073085784912

可以看出在IO密集型操作中,开相同数量的程要比开相同数量的线程执行速度慢

6.死锁现象

所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁。


from threading import Lock,Thread,current_thread
import time


mutex_a = Lock()#实例化一把锁
mutex_b = Lock()

class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        mutex_b.release()
        print(f'用户{self.name}释放了锁b')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')

    def func2(self):
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        time.sleep(1)
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')

for line in range(10):
    t = MyThread()
    t.start()
用户Thread-1抢到了锁a
用户Thread-1抢到了锁b
用户Thread-1释放了锁b
用户Thread-1释放了锁a
用户Thread-1抢到了锁b
用户Thread-2抢到了锁a

当线程1抢到了b而线程2抢到了a时,线程1要执行强a的任务,而线程2要执行抢b的任务,而两把锁都没有释放手中的锁,所以就造成了死锁的现象。

7.递归锁

递归锁用于解决死锁问题,递归锁可以被多个线程使用,当第一个线程使用时,遇到几把锁,它的引用计数就为几,只有当它的引用计数为零时才会给第二个线程使用。这样拥有递归锁的那个线程就可以将自己的锁里的内容执行完然后,其他使用递归锁的线程才可以执行。也就是但是第一个使用这把锁的线程会对这把锁加一个引用计数,只有引用计数为零时才能真正释放该锁。

from threading import Lock,Thread,RLock
import time


mutex_a = mutex_b = RLock()#实例化一把递归锁
class MyThread(Thread):
    def run(self):
        self.func1()
        self.func2()

    def func1(self):
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        mutex_b.release()
        print(f'用户{self.name}释放了锁b')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')

    def func2(self):
        mutex_b.acquire()
        print(f'用户{self.name}抢到了锁b')
        time.sleep(0.1)
        mutex_a.acquire()
        print(f'用户{self.name}抢到了锁a')
        mutex_a.release()
        print(f'用户{self.name}释放了锁a')
        mutex_b.release()
        print(f'用户{self.name}释放锁b')

for line in range(10):
    t = MyThread()
    t.start()

8.信号量(了解)

互斥锁比喻成一个家用马桶,同一时间只能一个人用;信号量比喻成一个公厕,同一时间可以有多个人用。

from threading import Semaphore,Lock,current_thread,Thread
import time

sm = Semaphore(5)#5个马桶
#mutex = Lock()#一个马桶
def task():
    sm.acquire()
    print(f'{current_thread().name}执行任务')
    time.sleep(1)
    sm.release()

for line in range(20):
    t = Thread(target=task)
    t.start()
#这段代码的功能是每次让五个线程并发执行

9.线程队列

线程Q:线程队列 FIFO(先进先出)就是队列,面试会问。

queue 是python解释器自带的模块,进程中的Queue是python解释器自带的模块multiprocessing里面的一个类。

普通队列(FIFO):先进先出
特殊队列(LIFO):后进先出

import queue
q = queue.Queue()#先进先出
q.put(1)
q.put(2)
print('q',q.get())

q1 = queue.LifoQueue()#先进后出
q1.put(1)
q1.put(2)
print('q1',q1.get())

q 1
q1 2

优先级队列
优先级根据数字来判断,数字为几优先级就为几,1的优先级最高
若参数中传的是元组,优先级以第一个元素的数字大小为准,如果元组的第一个元素都为字母则以字母的ASCII码为准,如果第一个元素的优先级相同就判断第二个元素的顺序(汉字也会判断顺序)但是如果同一列的元素既有数字又有字母会报错。

import queue
q2 = queue.PriorityQueue()#优先级队列

q2.put((1,2,3))
q2.put((2,2,3))
q2.put((3,2,3))
print(q2.get())

(1, 2, 3)

句柄

句柄(handle),有多种意义,第一种解释:句柄是一种特殊的智能指针。当一个应用程序要引用其他系统(如数据库、操作系统)所管理的内存块或对象时,就要使用句柄。

以上是关于进程队列补充socket实现服务器并发线程完结的主要内容,如果未能解决你的问题,请参考以下文章

进程池线程池 协程

python socket多线程和多进程

Linux-TCP编程流程-Socket编程-单线程实现TCP客户端和服务端交互-多进程实现TCP客户端和服务端交互

php socket多进程简单服务器

第十七节 单进程单线程非堵塞实现并发验证

并发编程补充知识之标准线程池