python之并发编程

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python之并发编程相关的知识,希望对你有一定的参考价值。

一、操作系统

1、概念

操作系统就是一个协调、管理和控制计算机硬件资源和软件资源的控制程序,操作系统位于计算机硬件与应用软件之间,本质也是一个软件。

操作系统由操作系统的内核(运行于内核态,管理硬件资源)以及系统调用(运行于用户态,为应用程序员写的应用程序提供系统调用接口)两部分组成

现代计算机或者网络都是多用户的,多个用户不仅共享硬件,而且共享文件,数据库等信息,共享意味着冲突和无序。


2、操作系统功能

    1、记录哪个程序使用什么资源

    2、对资源请求进行分配

    3、为不同的程序和用户调解互相冲突的资源请求。

我们可将上述操作系统的功能总结为:处理来自多个程序发起的多个(多个即多路)共享(共享即复用)资源的请求,简称多路复用


3、多路复用的实现方式

    1、时间上的复用

    当一个资源在时间上复用时,不同的程序或用户轮流使用它,第一个程序获取该资源使用结束后,在轮到第二个。。。第三个。。。

    2、空间上的复用

    每个客户都获取了一个大的资源中的一小部分资源,从而减少了排队等待资源的时间。

这两种方式合起来便是多道技术

二、python并发编程之多进程

1、进程

进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。

程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。

2、同步\异步and阻塞\非阻塞

  同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。所以绝大多数函数都是同步调用。但是一般我们说的同步、异步,特指那些需要其他部件协作或者需要一定时间完成的任务。

  异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低。如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。

  阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。

  非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。


小结:

    1、同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。

    2、阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

3、进程的创建

新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:

  1、在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)

  2、在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程

4、进程的状态

在两种情况下会导致一个进程在逻辑上不能运行

  1、进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作

  2、与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

进程之间的转换过程如下图:

技术分享图片

5、创建并开启子进程的两种方式创建并开启子进程的两种方式

#开进程的方法一:
import time
import random
from multiprocessing import Process
def learn(name):
    print('%s learning' %name)
    time.sleep(random.randrange(1,5))     #模拟进程执行消耗时间
    print('%s learn end' %name)
p1=Process(target=learn,args=('wang',)) #必须加,号,是元组的形式
p2=Process(target=learn,args=('li',))
p1.start()
p2.start()
print('主线程')
#开进程的方法二:
import time
import random
from multiprocessing import Process
class Learn(Process):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        print('%s learning' %self.name)
        time.sleep(random.randrange(1,5))
        print('%s learn end' %self.name)
p1=Learn('wang')
p2=Learn('li')
p1.start()
p2.start()
print('主线程')

6、进程直接的内存空间是隔离的

from multiprocessing import Process
n=100                 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了
def work():
    global n
    n=0
    print('子进程内: ',n)      #主进程内:  100
if __name__ == '__main__':
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)      #子进程内:  0

7、练习

基于tcp连接的socket通信的并发形式

# 服务端
from socket import *
from multiprocessing import Process
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
    while True:
        try:
            msg=conn.recv(1024)
            if not msg:break
            conn.send(msg.upper())
        except Exception:
            break
if __name__ == '__main__':
    while True:
        conn,client_addr=server.accept()
        p=Process(target=talk,args=(conn,client_addr))
        p.start()
# 多个客户端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    msg=client.recv(1024)
    print(msg.decode('utf-8'))
多个客户端都可以连接服务端,就实现了服务端的并发,不过客户端的个数也不是无限多的,这就需要进程池来解决

8、Process对象的join方法

from multiprocessing import Process
import time
import os
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(n)
    print('%s is done' %os.getpid())
if __name__ == '__main__':
    start_time=time.time()
    p1=Process(target=task,args=(1,))
    p2=Process(target=task,args=(3,))
    p1.start()
    p1.join()             #等待p1子进程运行完毕
    p2.start()
    p2.join()             #等待p2子进程运行完毕
    stop_time=time.time()
    print('主',(stop_time-start_time))
加上join后会让主进程等待子进程执行完毕才结束主进程,子进程p1执行完毕再执行p2,所以最后打印主 4.226164102554321


from multiprocessing import Process
import time
import os
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(n)
    print('%s is done' %os.getpid())
if __name__ == '__main__':
    start_time=time.time()
    p1=Process(target=task,args=(1,))
    p2=Process(target=task,args=(3,))
    p1.start()
    p2.start()
    p1.join()              # 等待p1子进程运行完毕
    p2.join()              #等待p2子进程运行完毕
    stop_time=time.time()
    print('主',(stop_time-start_time))
这种方式的p1和p2是并发执行的,这期间主进程卡在这里,等这两个子进程执行完毕后再执行主进程

9、守护进程

主进程创建守护进程

  1、守护进程会在主进程代码执行结束后就终止

  2、守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children


主进程代码运行完毕,守护进程就会结束
from multiprocessing import Process
from threading import Thread
import time
def foo():
    print(123)
    time.sleep(1)
    print("end123")
def bar():
    print(456)
    time.sleep(3)
    print("end456")
if __name__ == '__main__':
    p1=Process(target=foo)
    p2=Process(target=bar)
    p1.daemon=True          #表示p1作为守护进程
    p1.start()
    p2.start()
    print("main-------")
电脑性能差时候,先打印main----,这时候守护进程p1会被终止,然后是子进程的执行,会打印456,打印end456
打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,但不会打印end123,因为主进程打印main----时,p1也执行了,但是随即被终止

10、进程同步(互斥锁)

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,而共享带来的是竞争,竞争带来的结果就是错乱,就需要加锁处理

#模拟抢票
不加锁的话会发生数据错乱,因为每个子进程都得到票数为1,都进行买票,然后回写数据为0,最后显示每个都买到票了,所以要对子进程回写操作进行加锁
文件db的内容为json格式:{"count":1}
from multiprocessing import Process,Lock
import time,json,random
def search():
    dic=json.load(open('db.txt'))
    print('\033[43m剩余票数%s\033[0m' %dic['count'])
def get():
    dic=json.load(open('db.txt'))
    time.sleep(0.1)     #模拟读数据的网络延迟
    if dic['count'] >0:
        dic['count']-=1
        time.sleep(0.2) #模拟写数据的网络延迟
        json.dump(dic,open('db.txt','w'))
        print('\033[43m购票成功\033[0m')
def task(lock):
    search()
    lock.acquire()       #加锁以后才能进行写操作
    get()
    lock.release()       #最后释放锁,进程锁给下一个子进程执行时使用
if __name__ == '__main__':
    lock=Lock()
    for i in range(100):   #模拟并发100个客户端抢票
        p=Process(target=task,args=(lock,))
        p.start()
加锁把并发变成了串行,运行效率变慢了,但保证了数据安全

11、队列

进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的创建队列的类(底层就是以管道和锁定的方式实现):

 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。



    q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
    q.get_nowait(): 同q.get(False)
    q.put_nowait(): 同q.put(False)
    q.empty(): 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    q.qsize(): 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
#示例
from multiprocessing import Queue
q=Queue(3)                #队列最多有3个值
q.put('first')
q.put(2)
q.put({'count':3})
# q.put('fourth',block=False) #相当于 q.put_nowait('fourth')  ,超过队列最大值,抛出异常
# q.put('fourth',block=True,timeout=3)     #会有3秒的超时时间,然后抛出异常结束程序
print(q.get())
print(q.get())
print(q.get())
# print(q.get(block=False)) #相当于q.get_nowait()  会抛出异常结束程序
# print(q.get(block=True,timeout=3))    #会有3秒的超时时间,然后抛出异常结束程序

12、生产者消费者模型

这种模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此通过阻塞队列来进行通讯,生产者生产完数据之后直接扔给阻塞队列,消费者直接从阻塞队列里取,平衡了生产者和消费者的处理能力。


# 程序中有两类角色

    一类负责生产数据(生产者)

    一类负责处理数据(消费者)

# 引入生产者消费者模型为了解决的问题是:

    平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度

# 实现:

    生产者 < -->队列 <—— > 消费者

# 生产者消费者模型实现类程序的解耦和


from multiprocessing import Process, JoinableQueue
import time
import random
def producer(name, food, q):
    for i in range(3):
        res = '%s%s' % (food, i)
        time.sleep(random.randint(1, 3))
        q.put(res)
        print('厨师[%s]生产了<%s>' % (name, res))
def consumer(name, q):
    while True:
        res = q.get()
        if res is None: break
        time.sleep(random.randint(1, 3))
        print('吃货[%s]吃了<%s>' % (name, res))
        q.task_done()
if __name__ == '__main__':
    q = JoinableQueue()  # 队列
    # 生产者们
    p1 = Process(target=producer, args=('pipe', 'apple', q))
    p2 = Process(target=producer, args=('coco', 'banana', q))
    # 消费者们
    c1 = Process(target=consumer, args=('wang', q))
    c2 = Process(target=consumer, args=('li', q))
    c3 = Process(target=consumer, args=('tom', q))
    c1.daemon = True
    c2.daemon = True
    c3.daemon = True
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()
    p1.join()       #保证子进程p1数据生产完成
    p2.join()       #保证子进程p2数据生产完成
    q.join()        #保证q队列里面的值被取完,这样就保证了消费者的进程也执行完了
    print('主程序')  #打印主程序的时候所有的子进程都执行完毕了,这时候c1、c2、c3作为守护进程就会终止

三、python并发编程之多线程

1、概念

进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

多线程(即多个控制线程)指的是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间

2、多线程

在一个进程中开启多个线程。

简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。

详细的讲分为4点:

     1、多线程共享一个进程的地址空间

      2、线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用

      3、若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。

      4、在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开进程开销要小的多。(这一条并不适用于python)

3、开启线程的两种方式

#方式一
from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)
if __name__ == '__main__':
    t=Thread(target=sayhi,args=('wang',))
    t.start()
    print('主线程')
#方式二
from threading import Thread
import time
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        time.sleep(2)
        print('%s say hello' % self.name)
if __name__ == '__main__':
    t = Sayhi('wang')
    t.start()
    print('主线程')

4、练习

#1、多线程并发的socket

服务端
from threading import Thread,current_thread
from socket import *
def comunicate(conn):
    print('子线程:%s' %current_thread().getName())     #
    while True:
        try:
            data=conn.recv(1024)
            if not data:break
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()
def server(ip,port):
    print('主线程:%s' %current_thread().getName())    #getName可以得到线程名
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        print(addr)
        # comunicate(conn)
        t=Thread(target=comunicate,args=(conn,))
        t.start()
    server.close()
if __name__ == '__main__':
    server('127.0.0.1', 8081)
客户端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8081))
while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))
client.close()

#2、三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

from threading import Thread
msg_l=[]
format_l=[]
def talk():
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        msg_l.append(msg)
def format_msg():
    while True:
        if msg_l:
            res=msg_l.pop()            #从命令列表里取出列表最后的元素,即最早输入消息
            format_l.append(res.upper()) #将用户输入的内容格式化成大写保存在format_l列表
def save():
    while True:
        if format_l:
            with open('db.txt','a',encoding='utf-8') as f:   #格式化后的结果保存在db.txt文件里
                res=format_l.pop()
                f.write('%s\n' %res)
if __name__ == '__main__':
    t1=Thread(target=talk)
    t2=Thread(target=format_msg)
    t3=Thread(target=save)
    t1.start()
    t2.start()
    t3.start()

5、主线程等待子线程结束

from threading import Thread
import time
def sayhi(name):
    time.sleep(2)
    print('%s say hello' %name)
if __name__ == '__main__':
    t=Thread(target=sayhi,args=('wang',))
    t.start()
    t.join()            #主线程等待子线程执行
    print('主线程')
    print(t.is_alive())     #False

6、守护线程

from threading import Thread
import time
def sayhi(name):
    print('====>start')
    time.sleep(2)
    print('%s say hello' %name)
if __name__ == '__main__':
    t=Thread(target=sayhi,args=('wang',))
    t.daemon=True    #或者t.setDaemon(True)
    t.start()
    print('主线程')
最后结果====>start ,然后打印主线程后结束,主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)

7、线程的互斥锁

from threading import Thread,Lock
import time
n=100
def task():
    global n
    with mutex:
        temp=n
        time.sleep(0.1)        #模拟线程消耗的时间
        n=temp-1               #n每次减去1
if __name__ == '__main__':
    start_time=time.time()
    mutex=Lock()
    t_l=[]
    for i in range(100):
        t=Thread(target=task)
        t_l.append(t)
        t.start()
    for t in t_l:
        t.join()
    stop_time=time.time()
    print('主',n)
    print('run time is %s' %(stop_time-start_time))   #所有线程消耗时间的总和

8、同步锁(程序抢到锁后才有执行权限)

#计算密集型:开多进程

from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i
if __name__ == '__main__':
    l=[]
    start=time.time()
    for i in range(4):        #计算机是几核的这里就开几个进程,用print(os.cpu_count()) 查看
        p=Process(target=work) #run time is 7.0024003982543945
        # p=Thread(target=work)    #run time is 26.09249234199524
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))
可以看到开多进程的在计算密集型的程序中用时少

#I/O密集型:多线程效率高

from multiprocessing import Process
from threading import Thread
import threading
import os,time
def work():
    time.sleep(2)
if __name__ == '__main__':
    l=[]
    start=time.time()
    for i in range(400):
        p=Process(target=work)     # 12.465712785720825
        # p=Thread(target=work)   #2.037116765975952
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print('run time is %s' %(stop-start))

#未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全

from threading import current_thread,Thread,Lock
import os,time
def task():
    time.sleep(3)               #未加锁的代码并发运行
    print('%s start to run' %current_thread().getName())
    global n
    lock.acquire()               #加锁的代码串行运行
    temp=n
    time.sleep(0.5)
    n=temp-1
    lock.release()
if __name__ == '__main__':
    n=100
    lock=Lock()
    threads=[]
    start_time=time.time()
    for i in range(100):
        t=Thread(target=task)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    stop_time=time.time()
    print('主:%s n:%s' %(stop_time-start_time,n))

四、paramiko模块

1、安装

pip3 install paramiko #在python3中


pip3 install pycrypto #在python2中

pip3 install paramiko

2、应用

##########基于用户名密码连接###############
import paramiko
ssh = paramiko.SSHClient()                                # 创建SSH对象
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())            # 允许连接不在know_hosts文件中的主机
ssh.connect(hostname='106.74.230.135', port=22101, username='root', password='123456')   # 连接服务器
stdin, stdout, stderr = ssh.exec_command('df')                   # 执行命令
result = stdout.read()                                   # 获取命令结果
print(result.decode('utf-8'))
ssh.close() 

                                           # 关闭连接
SSHClient封装Transport
import paramiko
transport = paramiko.Transport(('106.74.230.135', 22101))
transport.connect(username='root', password='123456')
ssh = paramiko.SSHClient()
ssh._transport = transport
stdin, stdout, stderr = ssh.exec_command('df')
res=stdout.read()
print(res.decode('utf-8'))
transport.close()

#################基于秘钥连接##############
# 前提:服务端必须有文件名:authorized_keys
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname='106.74.230.135', port=22101, username='root', pkey=private_key)
stdin, stdout, stderr = ssh.exec_command('df')
result = stdout.read()
print(result.decode('utf-8'))
ssh.close()


SSHClient封装Transport
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
transport = paramiko.Transport(('106.74.230.135', 22101))
transport.connect(username='root', pkey=private_key)
ssh = paramiko.SSHClient()
ssh._transport = transport
stdin, stdout, stderr = ssh.exec_command('df')
result=stdout.read()
print(result.decode('utf-8'))
transport.close()

###########基于用户名密码上传下载#########
import paramiko
transport = paramiko.Transport(('106.74.220.225', 22101))
transport.connect(username='root', password='123456')
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(r'D:\video\location.py', '/root/test.txt')      # 将location.py 上传至服务器 /root/test.py
sftp.get('/root/test.txt', r'D:\video\location.py')     # 将/root/test.txt下载到本地 D:\video\location.py
transport.close()

####基于秘钥上传下载#####
import paramiko
private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
transport = paramiko.Transport(('106.74.220.225', 22101))
transport.connect(username='root', pkey=private_key )
sftp = paramiko.SFTPClient.from_transport(transport)
sftp.put(r'D:\video\location.py', '/root/test.txt')   # 将location.py 上传至服务器 /root/test.py
sftp.get('/root/test.txt', r'D:\video\location.py')  # 将/root/test.txt下载到本地 D:\video\location.py
transport.close()




以上是关于python之并发编程的主要内容,如果未能解决你的问题,请参考以下文章

python之并发编程

Python3 与 C# 并发编程之~ 上篇

Python并发编程之多进程

python并发编程之多进程

Python 并发编程之线程

python学习40——并发编程之多线程