十并发编程

Posted lisenlin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了十并发编程相关的知识,希望对你有一定的参考价值。

一、进程

  1.什么是进程

#一个正在进行的过程,或者说是一个程序的运行过程
#其实进程是对正在运行的程序的一种抽象/概括的说法

#进程的概念起源操作系统,进程是操作系统最核心的概念之一
#操作系统其它所有的概念都是围绕进程展开的

  2.操作系统的作用

#1.隐藏丑陋复杂的硬件接口,提供良好的抽象接口
#2.管理、调度进程,并且将多个进程对硬件的竞争变得有序

  3.操作系统的多道技术

#1.产生背景:针对单核,实现并发
    ps:现在的主机一般都是多核,那么每个核都会利用多道技术,有4个cpu,
        运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度
        到4个cpu中的任意一个,具体由操作系统调度算法决定

#2.优点
    ##1.空间上的复用:如内存中同时有多道程序,互相隔离
    ##2.时间上的复用:复用一个cpu的时间片
    强调:遇到io切换,占用cpu时间过长也切,核心在于切之前将进程的
         状态保存下来,这样才能保证下次切换回来时,能基于上次切
          走的位置继续运行

  4.并行与并发

#1.并发:是伪并发,即看起来是同时运行。单个cpu+多道技术就可以并发
#2.并行:真正意义上的同时运行,只有具备多个cpu才能实现并行

  5.开启进程的俩种方式

##1.直接调用Process
import time
from multiprocessing import Process


def task(name):
    print(%s is running % name)
    time.sleep(3)
    print(%s is done % name)


if __name__ == "__main__":
    # Process(target=task,kwargs={‘name‘:‘任务1‘})    #kwargs为传递关键字参数
    # Process在windows下必须在main下运行,否则会无限创建子进程,
    # 因为在windows下子进程会执行父进程所有语句,获得所有名称空间。传参args里必须加逗号,才能表示args是元组,否则报错
    p=Process(target=task, args=(任务1,))   #args为传递位置参数
    p.start()  # 只是给操作系统发送开启子进程的信号
    print()



##2.自定义类,继承Process
from multiprocessing import Process
import time
class MyProcess(Process):   #自定义进程类,继承Process
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 必写,p.start()调用的是p.run()
        print(%s is running % self.name)
        time.sleep(3)
        print(%s is done % self.name)


if __name__ == "__main__":
    p = MyProcess(lisl)
    p.start()  # p.start()=p.run(),如果直接使用p.run(),就不会创建子进程并发执行
    print()

  

  6.进程的pid与ppid(父类pid)

##通过查看pid来判断是否是开启了另一个进程
import time
import os
from multiprocessing import Process
def task():
    print(子进程%s is running%os.getpid())
    print(父进程是%s%os.getppid())
    time.sleep(3)


if __name__=="__main__":
    p=Process(target=task,)
    p.start()
    print(主进程%s%os.getpid(),)
    print(主进程的父进程%s%os.getppid(),)  #在Pycharm运行就是Pycharm;在cmd运行父进程就是cmd.exe

  7.进程对象的join方法

##单个子进程
import time
from multiprocessing import Process
def task(x):
    print(%s is running%x)
    time.sleep(3)
    print(%s is done%x)

if __name__=="__main__":
    p=Process(target=task,args=(lisl,))
    p.start()
    p.join()    #主进程等待子进程运行完在往下执行
    print()
技术分享图片
##多个子进程
import time, os
from multiprocessing import Process


def task(n):
    print(%s is running % os.getpid())
    time.sleep(n)
    print(%s is done % os.getpid())


if __name__ == "__main__":
    p1 = Process(target=task, args=(1,))
    p2 = Process(target=task, args=(2,))
    p3 = Process(target=task, args=(3,))

    start_time = time.time()
    p1.start()
    p2.start()
    p3.start()

    p1.join()  # 等待1s
    p2.join()  # 等待1s
    p3.join()  # 等待1s

    stop_time = time.time()
    print(, stop_time - start_time)



##多个子进程(升级版)
from multiprocessing import Process
import os
import time

def task(n):
    print(%s is running%os.getpid())
    time.sleep(n)
    print (%s is done%os.getpid())

if __name__ == __main__:
    p_l=[]
    start_time=time.time()
    for i in range(1,4):
        p=Process(target=task,args=(i,))
        p_l.append(p)
        p.start()

    for p in p_l:
        p.join()

    end_time=time.time()

    print(主进程执行时间%s%(end_time-start_time))
多个子进程

  8.进程对象的其他相关属性和方法

import time
from multiprocessing import Process
def task(x):
    print(%s is running%os.getpid)
    time.sleep(x)

if __name__ == __main__:
    p=Process(target=task,args=(2,),name=子进程1)
    p.start()
    print(p.name)   #打印子进程1,默认是打印Process-1
    print(p.pid)    #进程号
    print(p.is_alive()) #子进程是否存活
    p.terminate()   #让操作系统去干掉子进程,需要时间

  9.进程之间内存空间隔离

from multiprocessing import Process
x = 100

def task():
    global x
    x = 0

if __name__ == __main__:
    p = Process(target=task, name=子进程1)
    p.start()

    p.join()  # 等待子进程执行完
    print(, x)  # x=100,子进程不影响主进程变量x

  10.僵尸进程与孤儿进程

#僵尸进程(有害)
一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。
总之,子进程结束而父进程还未结束

#孤儿进程(无害)
一个父进程退出,而它的一个或多个子进程还在运行,那么子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对他们完成状态收集工作
总之,父进程比子进程早结束

   10.关于while True,input,与子进程的问题

from multiprocessing import Process
import time
def task(name):
    print(%s is running%name)
    time.sleep(5)
    print(%s is done%name)


if __name__ == __main__:

    while True:
        choice=input(>>:).strip()    #卡在input时子进程执行完也无法将结果输出,需等待主进程过了input阶段,有充足的时间才可输出
        if not choice:    
            continue
        p=Process(target=task,args=(唐三,))
        p.start()
        print()


#主进程会等待子进程执行完才结束。如果主进程执行完,想要子进程也随之结束,可将子进程设置成守护进程。

   11.守护进程

#当子进程执行的任务在父进程代码运行完毕后就没有存在的必要了,该子进程就应该被设置为守护进程
from multiprocessing import Process
import time
def task(name):
    print(%s is running%name)
    time.sleep(5)
    print (%s is done%name)

if __name__ == __main__:
    p=Process(target=task,args=(lisl,))
    p.daemon = True #设置为守护进程,默认False
    p.start()
    print()
from multiprocessing import Process
import time
def task1(name):
    print(%s is running%name)
    time.sleep(5)
    print (%s is done%name)

def task2(name):
    print(%s is running%name)
    time.sleep(5)
    print (%s is done%name)
if __name__ == __main__:
    p1=Process(target=task1,args=(lisl,))
    p2=Process(target=task2,args=(唐三,))
    p1.daemon = True #设置为守护进程,默认False
    p1.start()
    p2.start()
    print()  #因为p2没有设置为守护进程,因此主进程执行完,还会等待p2子进程,而p1一定随着主进程结束而结束,无关p2子进程

  12 互斥锁----牺牲了效率,保证了数据安全

#由以下现象引出互斥锁
#多人抢夺多一张票,却显示多人抢票成功,现实只能有一人获得票
from multiprocessing import Process
import json
import time
import random
import os
def search():
    ‘‘‘查票‘‘‘
    time.sleep(random.randint(1,3))
    dic=json.load(open(db.txt,r,encoding=utf-8))
    print(%s 查看到剩余票数%s%(os.getpid(),dic[count]))

def get():
    ‘‘‘得票‘‘‘
    dic=json.load(open(db.txt,r,encoding=utf-8))
    if dic[count]>0:
        dic[count]-=1
        time.sleep(random.randint(1,3))
        json.dump(dic,open(db.txt,w,encoding=utf-8))
        print(%s 购票成功%os.getpid())
    else:
        print(%s 购票失败%os.getpid())

def task():
    search()
    get()


if __name__ == __main__:
    for i in range(10):
        p=Process(target=task)
        p.start()  #使用p.join()可以解决买错票问题,但是子程序串行执行,效率低。最佳解决办法是只把修改共享数据这一部分变成串行,而非整体,即加入互斥锁
#使用互斥锁解决一票多人获得情况
from multiprocessing import Process,Lock
import json
import time
import random
import os
def search():
    ‘‘‘查票‘‘‘
    time.sleep(random.randint(1,3))
    dic=json.load(open(db.txt,r,encoding=utf-8))
    print(%s 查看到剩余票数%s%(os.getpid(),dic[count]))

def get():
    ‘‘‘得票‘‘‘
    dic=json.load(open(db.txt,r,encoding=utf-8))
    if dic[count]>0:
        dic[count]-=1
        time.sleep(random.randint(1,3))
        json.dump(dic,open(db.txt,w,encoding=utf-8))
        print(%s 购票成功%os.getpid())
    else:
        print(%s 购票失败%os.getpid())

def task(mutex):
    search()
    mutex.acquire()     #获得一把锁,其他子进程需要等待锁释放,才可以访问get()函数
    get()
    mutex.release()     #释放一把锁


if __name__ == __main__:
    mutex=Lock()    #锁对象化,所有子进程共享一把锁,可锁住一份数据
    for i in range(10):
        p=Process(target=task,args=(mutex,))
        p.start()

 

技术分享图片
from multiprocessing import Process,Lock
import time
import os

def printer(mutex):
    mutex.acquire()     #加锁
    print(%s 打印1%os.getpid())
    time.sleep(1)
    print(%s 打印2%os.getpid())
    mutex.release()     #释放锁
    
if __name__ == __main__:
    mutex=Lock()
    p1=Process(target=printer,args=(mutex,))
    p2=Process(target=printer,args=(mutex,))
    p3=Process(target=printer,args=(mutex,))

    p1.start()
    p2.start()
    p3.start()
互斥锁简易使用

  13 实现进程之间的通信

#进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocess模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
#IPC全称:Inter-Process Communication,进程间通信

  13.1 队列

from multiprocessing import Queue

q= Queue(3)    #允许队列里有3个元素

q.put(hello)
q.put(1000)
q.put({"count":3})
# q.put(‘fourth‘) #第4个会被阻塞住,只能有第一个元素hello被取走,才能进入队列,将参数block值更改为False,则不阻塞而报错
#q.put_nowait(‘fourth‘)     #队列满就报错,等于q.put(‘fourth‘,block=False)

print(q.get())     #取走第一个元素hello
print(q.get())
print(q.get())
print(q.get())      #上面已取完所有元素,阻塞住,当有元素被存放队列就读取
# print(q.get(block=False))   #队列为空就报异常,等于q.get_nowait()
# print(q.get(block=True,timeout=3))   #等待3秒还是为空报异常

  13.2 生产者消费者模型

from multiprocessing import Process,Queue
import time
import random
def prodecer(name,food,q):
    for i in range(1,4):
        res=%s%s个%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print(厨师[%s]生产了<%s>%(name,res))

def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break    #消费者收到None,结束
        time.sleep(random.randint(1,3))
        print(吃货[%s]吃了<%s>%(name,res))

if __name__ == __main__:
    q=Queue()   #队列
    p1=Process(target=prodecer,args=(lisl,包子,q))    #生产者负责造
    c1=Process(target=consumer,args=(唐三,q))           #消费者负责吃
    p1.start()
    c1.start()

    p1.join()   #生产者生产完后,
    q.put(None) #生产者生产完后,主动放入一个结束标志,当消费者受到None,即结束
#多个生产者多个消费者,如果保证生产者执行完毕后,消费者消费完要结束?
#通过队列的JoinableQueue模块里的q.join()和q.task_done()与守护进程配合使用
from multiprocessing import Process,JoinableQueue
import time
import random
def prodecer(name,food,q):
    for i in range(1,4):
        res=%s%s个%(food,i)
        time.sleep(random.randint(1,3))
        q.put(res)
        print(厨师[%s]生产了<%s>%(name,res))

def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break    #消费者收到None,结束
        time.sleep(random.randint(1,3))
        print(吃货[%s]吃了<%s>%(name,res))
        q.task_done()   #每取走一个数据(q.get()),就向q.join()发送一个信号


if __name__ == __main__:
    q=JoinableQueue()   #队列
    p1=Process(target=prodecer,args=(lisl,包子,q))    #生产者负责造
    p2=Process(target=prodecer,args=(lxq,馒头,q))    #生产者负责造
    c1=Process(target=consumer,args=(唐三,q))           #消费者负责吃
    c2=Process(target=consumer,args=(小舞,q))           #消费者负责吃
    c3=Process(target=consumer,args=(沐白,q))           #消费者负责吃

    # 将消费者设置成守护进程,随主进程消失而消失,配合队列q.join()执行完毕,可保证消费者子进程已取完队列所有元素,可随主进程一起消失
    c1.daemon=True
    c2.daemon=True
    c3.daemon=True

    p1.start()
    p2.start()
    c1.start()
    c2.start()
    c3.start()

    p1.join()   #生产者生产完后,
    p2.join()   #生产者生产完后,
    q.join()    #等待队列无元素
    print()

#注:JoinableQueue具备Queue的基本功能

 

 

 

二、线程

  1 什么是线程

线程是一条流水线的工作过程,一个进程内至少有一个线程
进程是一个资源单位,而进程内的线程才是执行单位

  2 为什么要用线程(线程vs进程)

#1.同一进程下的多个线程共享该进程内的数据
#2.线程的创建开销要远远小于进程的

  3.开启线程的俩种方式

##1.直接调用Thread
import time
from threading import Thread
def task(name):
    print(%s is running%name)
    time.sleep(3)
    print(%s is done%name)

if __name__=="__main__":
    #Thread(target=task,args={‘name‘:‘任务1‘})
    # Thread在windows下必须在main下运行,否则会报错,因为在windows下子进程会执行父进程所有语句,
    # 获得所有名称空间。传参args里必须加逗号,才能表示args是元组,否则报错
    t=Thread(target=task,args=(任务1,))   
    t.start()   #只是给操作系统发送开启子线程的信号
    print()


###2.自定义类,继承Thread
from threading import Thread
import time
class MyThread(Thread):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):  # 必写
        print(%s is running % self.name)
        time.sleep(3)
        print(%s is done % self.name)


if __name__ == "__main__":
    t = MyThread(lisl)
    t.start()  # p.start()=p.run()
    print()

  4.线程的pid与ppid

同一进程下的多个线程属于同一个pid,线程之间不存在父线程p与儿子线程的说法,线程与线程之间是平等的,因此也不会存在所谓的僵尸线程

  5.线程下的join方法

##单个子线程
import time
from threading import Thread
def task(x):
    print(%s is running%x)
    time.sleep(3)
    print(%s is done%x)

if __name__=="__main__":
    t=Thread(target=task,args=(lisl,))
    t.start()
    t.join()    #主线程等待子线程运行完在往下执行
    print()
技术分享图片
##多个子进程
import time, os
from threading import Thread

def task(n):
    print(%s is running % os.getpid())
    time.sleep(n)
    print(%s is done % os.getpid())


if __name__ == "__main__":
    t1 = Thread(target=task, args=(1,))
    t2 = Thread(target=task, args=(2,))
    t3 = Thread(target=task, args=(3,))

    start_time = time.time()
    t1.start()
    t2.start()
    t3.start()

    t1.join()  # 等待1s
    t2.join()  # 等待1s
    t3.join()  # 等待1s

    stop_time = time.time()
    print(, stop_time - start_time)


##多个子进程(升级版)
from threading import Thread
import os
import time

def task(n):
    print(%s is running%os.getpid())
    time.sleep(n)
    print (%s is done%os.getpid())

if __name__ == __main__:
    t_l=[]
    start_time=time.time()
    for i in range(1,4):
        t=Thread(target=task,args=(i,))
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    end_time=time.time()

    print(主进程执行时间%s%(end_time-start_time))
多个子进程下的join方法

  6.线程实例化对象的方法

import time
import os
from threading import Thread
def task(x):
    print(%s is running%os.getpid())
    time.sleep(x)

if __name__ == __main__:
    t=Thread(target=task,args=(2,),name=子线程1)
    t.start()
    print(t.name)   #打印子线程1,默认是打印Thread-1,或者用t.getName()
    print(t.getName())
    print(t.is_alive()) #子线程是否存活
    t.setName(超级线程)   #设置子线程名
    print(t.getName())
   print (t.getpid())  #获取线程的PID
#在线程里查看当前线程名
import time
from threading import Thread,current_thread
def task(x):
    print(当前线程名:%s is running%current_thread().getName()) #查看当前线程名
    time.sleep(x)

if __name__ == __main__:
    t=Thread(target=task,args=(2,),name=子线程1)
    t.start()
    print(current_thread().getName())   #当前线程名为主线程MainThread

   6.守护线程

from threading import Thread
import time
def talk(name):
    print(===>)
    time.sleep(2)
    print(%s say hello%name)

if __name__ == __main__:
    t=Thread(target=talk,args=(lisl,))
    t.daemon=True   #设置为守护线程,随着所有非守护线程的结束而结束,关下一下例子《易迷糊例子》
    t.start()
    print(主线程)    #主线程结束
技术分享图片
from threading import Thread
import time
def talk1(name):
    print(talk1===>)
    time.sleep(1)
    print(%s say hello%name)


def talk2(name):
    print(talk2===>)
    time.sleep(3)
    print(%s say hello%name)

if __name__ == __main__:
    t1=Thread(target=talk1,args=(lisl,))
    t2=Thread(target=talk2,args=(唐三,))
    t1.daemon=True   #设置为守护线程

    t1.start()
    t2.start()
    print(主线程)    #主线程结束
易迷糊例子
  守护线程与守护进程的区别
守护进程是随主进程结束而结束
守护线程是随所有非守护线程结束而结束

  7.线程的互斥锁

from threading import Thread,Lock
import time

n=100
def change():
    global n
    with mutex:     #与mutex.acquire()和mutex.release()效果一样
        temp=n
        time.sleep(0.1)
        n=temp-1

if __name__ == __main__:
    start_time=time.time()
    t_l=[]
    mutex=Lock()    #进程互斥锁,因为进程共享数据,所以无需以参数传入子线程
    for i in range(100):
        t=Thread(target=change)
        t_l.append(t)
        t.start()

    for t in t_l:
        t.join()

    end_time=time.time()
    print(,n)
    print(run time is %s%(end_time-start_time))

 

  8.GIL--Global Interpreter Lock

GIL本质就是一把互斥锁,跟线程和进程的互斥锁一样,都是将并发运行变成串行,以此控制统一时间内共享数据只能被一个任务所修改,进而保证数据安全。
不同的是,GIL是解释器级的锁,而进程或线程互斥锁是程序级别的锁。

所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。

解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题,对于同一个数据100,可能线程1执行x=100的同时,
而垃圾回收执行的是回收100的操作,解决这种问题只能加锁处理,这就是所谓解释器级的锁,即GIL,保证python解释器统同一时间只能执行一个任务的代码

  8.1 计算密集型,多进程效率高

技术分享图片
from multiprocessing import Process
from threading import Thread
import os,time
def work():
    res=0
    for i in range(100000000):
        res*=i


if __name__ == __main__:
    l=[]
    start=time.time()
    for i in range(4):
        # p=Process(target=work) #耗时13s多
        p=Thread(target=work) #耗时23s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print(run time is %s %(stop-start))
计算密集型:多进程效率高

  8.2 I/O密集型,多线程效率高

技术分享图片
from multiprocessing import Process
from threading import Thread
import time
def work():
    time.sleep(2)

if __name__ == __main__:
    l=[]
    start=time.time()
    for i in range(400):
        # p=Process(target=work) #耗时23s多,大部分时间耗费在创建进程上
        p=Thread(target=work) #耗时2s多
        l.append(p)
        p.start()
    for p in l:
        p.join()
    stop=time.time()
    print(run time is %s %(stop-start))
I/O密集型:多线程效率高

  8.3 应用:

多线程用于IO密集型,如socket,爬虫,web
多进程用于计算密集型,如金融分析

   9.死锁

#俩个锁分别调用,容易引发死锁,如下
#死锁现象与递归锁
from threading import Thread,Lock
import time
mutexA=Lock()
mutexB=Lock()
class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(%s 拿到A锁%self.name)

        mutexB.acquire()
        print(%s 拿到B锁%self.name)
        mutexB.release()

        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(%s 拿到B锁 % self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print(%s 拿到A锁 % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == __main__:
    for i in range(10):
        t=MyThread()
        t.start()
#解决上面死锁办法:
#使用RLock,即加一个锁计数加1,递归锁在加1,每释放锁就减1,当锁的技术为0,才可供下一个其他线程调用
from threading import Thread,RLock
import time
mutexA=mutexB=RLock()

class MyThread(Thread):
    def run(self):
        self.f1()
        self.f2()

    def f1(self):
        mutexA.acquire()
        print(%s 拿到A锁%self.name)

        mutexB.acquire()
        print(%s 拿到B锁%self.name)
        mutexB.release()

        mutexA.release()

    def f2(self):
        mutexB.acquire()
        print(%s 拿到B锁 % self.name)
        time.sleep(0.1)
        mutexA.acquire()
        print(%s 拿到A锁 % self.name)
        mutexA.release()

        mutexB.release()

if __name__ == __main__:
    for i in range(10):
        t=MyThread()
        t.start()

   10.信号量

#信息量:理解为是钥匙,钥匙数量固定,使用钥匙的人结束交给没钥匙的人
from threading import Thread,Semaphore,current_thread
import time,random
sm=Semaphore(5)

def task():
    with sm:  #等价于sm.acquire()与sm.release()用法
        print(%s is comming%current_thread().getName())
        time.sleep(random.randint(1,3))
if __name__ == __main__:
    for i in range(20):
        t=Thread(target=task)
        t.start()

  11 Event事件

#用于一个进程下多个线程,一个线程需要上一个线程工作到某个特定阶段才可开始执行

event.isSet():返回event的状态值;

event.wait():如果 event.isSet()==False将阻塞线程;

event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;

event.clear():恢复event的状态值为False
from threading import Thread,Event,current_thread
import time

event=Event()    #事件对象化

def check():
    print(%s checking mysql%current_thread().getName())
    time.sleep(3)
    event.set()     #设置为True,event.wait()有阻塞状态进入非阻塞状态

def conn():
    print(waitting to  connect MySQL)
    event.wait()    #默认阻塞状态,当event.set()后进入非阻塞状态
    print(%s connect MySQL%current_thread().getName())

if __name__ == __main__:
    t1=Thread(target=check)
    t2=Thread(target=conn)
    t3=Thread(target=conn)
    t4=Thread(target=conn)

    t1.start()
    t2.start()
    t3.start()
    t4.start()

 

技术分享图片
#Event事件:用于一个进程下多个线程,一个线程需要上一个线程工作到某个特定阶段才可开始执行
from threading import Thread,Event,current_thread
import time
event=Event()
def check():
    print(%s checking MySQL%current_thread().getName())
    time.sleep(5)
    event.set()

def conn():
    count=1
    while not event.is_set():
        if count >3:
            raise TimeoutError(超时)
        print(%s try to  connect MySQL %s times%(current_thread().getName(),count))
        event.wait(2)       #2秒为超时时间,即等不到event.set(),直接进入非阻塞状态
        count +=1

    print(%s connect MySQL%current_thread().getName())

if __name__ == __main__:
    t1=Thread(target=check)
    t2=Thread(target=conn)
    t3=Thread(target=conn)
    t4=Thread(target=conn)

    t1.start()
    t2.start()
    t3.start()
    t4.start()
模拟尝试三次连接超时

  12 定时器

#定时器,此方式无法做到定点定时执行,比如想12点执行,不可以
from threading import Timer

def hello(name):
    print(%s say hello to you%name)

t=Timer(3,hello,args=(lisl,)) #3秒后开始运行
t.start()

  13线程queue

import queue
q=queue.Queue(3)    #队列先进先出
q.put(1)
q.put(2)
q.put(3)
# q.put(4)  #队列满,阻塞
# q.put_nowait(4) #队列满,直接报错
# q.put(4,block=False)    #队列满,直接报错
# q.put(4,block=True,timeout=3)    #队列满,3秒后超时报错

print(q.get())
print(q.get())
print(q.get())
# print(q.get_nowait())     
# print(q.get(block=False))
# print(q.get(block=True,timeout=3))

######################################################
q=queue.LifoQueue(3)    #堆栈,后进先出
q.put(1)
q.put(2)
q.put(3)

print (q.get())     #3
print (q.get())     #2
print (q.get())     #1
######################################################
q=queue.PriorityQueue(3)    #优先级队列
q.put((10,a)) #10为优先级,数字越小,优先级越高,‘a‘为数据,可为任意数据类型
q.put((2,b))
q.put((-5,c))

print(q.get())  #结果为(-5,‘b‘)

 

三、进程池与线程池

  1.什么是池的概念?

池指的是一个容器,该容器用来存放进程或线程,存放的数目是一定的

  2.为什么要用池?

是为了将并发的进程或线程数目控制在计算机可承受的范围内

  3.进程池与线程池的选用规则?

进程池:当任务是计算密集型的情况下应该用进程来利用多核优势
线程池:当任务是IO密集型的情况下应该用线程减少开销

  4.同步调用VS异步调用

所谓同步调用与异步调用指的是池提交任务的两种方式

#同步调用:
提交完任务后,就在原地等待任务执行完毕,拿到运行结果/返回值后再执行下一行代码,同步调用下任务的执行是串行执行

#异步调用:
提交完任务后,不会在原地等待任务执行完毕,直接执行下一行代码,任务执行结果可单独返回,或者配合回调函数进行下一步处理。异步调用下任务的执行是并发执行

  5.进程池异步调用示例

技术分享图片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random

def task(x):
    print(%s is running %os.getpid())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == __main__:
    #异步调用
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数,只允许4个进程同时运行,一个运行才可用同PID接着运行下一个任务
    futrues=[]
    for i in range(10):
        futrue=p.submit(task,i) #提交4个进程,task为任务函数,i为位置参数
        futrues.append(futrue)  #将提交任务的对象放入列表

    # 与老版本的pool.close()和 pool.join()连用效果一样
    p.shutdown(wait=True)   # 俩作用:1.shutdown为禁止新的任务进入进程池,2.wait=Ture等待进程池任务都执行完毕 。


    for futrue in futrues:
        print(futrue.result())  #打印任务返回结果
    print()
进程池异步调用示例

  6.进程池同步调用示例

技术分享图片
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random

def task(x):
    print(%s is running %os.getpid())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == __main__:

    #同步调用
    p=ProcessPoolExecutor() #不指定参数默认池的大写等于cpu的核数
    for i in range(10):
        res=p.submit(task,i).result()   #提交任务后,直接返回结果,串行效果,使用意义不大
        print(res)

    print()
进程池同步调用

  7.线程池异步调用示例

技术分享图片
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import os
import time
import requests


def get(url):
    print(%s GET %s %(current_thread().getName(),url))
    time.sleep(2)
    response=requests.get(url)
    if response.status_code == 200:
        res=response.text
        return res

def parse(res):
    print(%s 解析[url]结果是 %s % (os.getpid(), len(res)))

if __name__ == __main__:
    t=ThreadPoolExecutor(10)

    urls=[
        https://www.baidu.com,
        https://www.openstack.org,
        https://www.taobao.com,
        https://www.jd.com,
    ]
    obj_l=[]
    for url in urls:
        obj=t.submit(get,url)
        obj_l.append(obj)
    t.shutdown(wait=True)
    for obj in obj_l:
        res=obj.result()
        print(%s 解析[url]结果是 %s %(os.getpid(),len(res)))

    print()
线程池异步调用示例

  8.线程池同步调用示例

技术分享图片
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import os
import time
import random

def task(x):
    print(%s is running %current_thread().getName())
    time.sleep(random.randint(1,3))
    return x**2

if __name__ == __main__:

    #同步调用
    t=ThreadPoolExecutor() #不指定参数默认池的个数等于cpu的核数*5
    for i in range(10):
        res=t.submit(task,i).result()   #提交任务后,直接返回结果,串行效果,使用意义不大
        print(res)

    print()
线程池同步调用示例

  9.进程池配合回调函数

技术分享图片
# 回调函数
from concurrent.futures import ProcessPoolExecutor
import os
import time
import requests


def get(url):
    print(%s GET %s %(os.getpid(),url))
    time.sleep(2)
    response=requests.get(url)
    if response.status_code == 200:
        res=response.text
        return res

def parse(obj):
    res=obj.result()
    print(%s 解析[url]结果是 %s % (os.getpid(), len(res)))

if __name__ == __main__:
    p=ProcessPoolExecutor(10)

    urls=[
        https://www.baidu.com,
        https://www.python.org,
        https://www.openstack.org,
        https://www.taobao.com,
        https://www.jd.com,
    ]

    for url in urls:
        # 回调函数会在任务运行完毕后自动触发,并且接收该任务对象作为parser函数的位置参数
        p.submit(get,url).add_done_callback(parse)    #进程池提交任务触发回调函数,运行回调函数的是主进程
    print(,os.getpid())
进程池配合回调函数

  10.线程池配合回调函数

技术分享图片
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
import time
import random

def task(x):
    print(%s is running %current_thread().getName())
    time.sleep(random.randint(1,3))
    return x**2

def parse(obj):
    res=obj.result()
    print(%s 解析的结果为%s %(current_thread().getName(),res))

if __name__ == __main__:
    t=ThreadPoolExecutor(3) #线程池默认不写参数是cpu核数*5

    for i in range(10):
        t.submit(task,i).add_done_callback(parse) #线程池提交任务触发回调函数,运行回调函数的是各个线程
线程池配合回调函数示例

 

 

 

 

  

  




以上是关于十并发编程的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程原理与实战十九:AQS 剖析

golang代码片段(摘抄)

六万字Java高并发编程入门第十九篇:并发编程入门总览包教包会值得收藏

六万字Java高并发编程入门第十九篇:并发编程入门总览包教包会值得收藏

Java 并发编程 常见面试总结

Java 并发编程 常见面试总结