第十五章 并发编程

Posted gnaix

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第十五章 并发编程相关的知识,希望对你有一定的参考价值。

1.操作系统的发展史                                            

参考:http://www.cnblogs.com/Eva-J/articles/8253521.html

知识点

输入输出 -- 大部分时间都不会占用CPU,但会降低你程序的效率

操作系统的三种基本类型:多道批处理系统、分时系统、实时系统。

 现在操作系统

基于多道批处理系统和分时系统

多个程序、作业在遇到IO操作的时候,操作系统会帮助你进行切换

让CPU的利用率得到最大的提高

2.进程                                                        

初识进程

进程: 运行中的程序

操作系统 只负责管理调度进程

进程是操作系统中资源分配的最小单位

每一个运行中的程序都需要有自己的内存、资源

都分配给进程 记录执行的状态 管理自己的内存资源

在Python中,每一个运行的程序 都是一个进程

如何使用python来开启一个进程--我们使用multiprocessing模块,process-进程,multi多元的

并发编程例:

import os
import time
from multiprocessing import Process
def func(num):
    print(num,os.getpid())
    time.sleep(0.5)
    print(num,os.getpid())
    time.sleep(0.5)
    print(num, os.getpid())
    time.sleep(0.5)
    print(num, os.getpid())

# 创建一个进程
if __name__ == __main__:  # windows,开启子进程的时候,会执行import
    print(os.getpid())
    p = Process(target=func, args=(10,))
    p.start() # 开启进程
    print(os.getpid())
    time.sleep(1)
    print(os.getpid(),1)
    time.sleep(1)
    print(os.getpid(),2)

# 运行结果
#                  时间轴
# 17500              0
# 17500              0
# 10 12656           0+
# 10 12656           0.5
# 17500 1            1
# 10 12656           1+
# 10 12656           1.5+
# 17500 2            2

几个概念:

同步:一个时刻只能做一件事

异步:同时做多件事

异步可以有效地提高程序的效率

进程:主进程中开启的新的进程

主进程:运行的这个程序

父进程:创造出子进程的进程

注意:进程与进程之间都是异步的,而且开启一个进程是有时间开销的

进程的进阶

什么是进程:运行中的程序,计算机中最小的资源分配单位

程序开始执行就会产生一个主进程

python中可以主进程中用代码启动一个进程 -- 子进程

同时主进程也被称为父进程父子进程之间的代码执行是异步的,各自执行自己的

父子进程之间的数据不可以共享

例:

import time
from multiprocessing import Process

n = 100
def func():
    global n
    n = 0
    print(n)
    print(----------)

if __name__ == __main__:
    Process(target=func).start()
    time.sleep(1)
    print(n)
# 0
# ----------
# 100
# 说明:父进程中n的值并没有随着子进程的运行而改变

 

进程什么时候结束

主进程什么时候结束:主进程会等待子进程结束之后再结束,父进程会等待回收子进程所占用的资源

例:直接开启多个子进程

import time
from multiprocessing import Process

n = 100
def func(n):
    time.sleep(1)
    print(-*n)

if __name__ == __main__:
    Process(target=func, args=(1,)).start()
    Process(target=func, args=(2,)).start()
    Process(target=func, args=(3,)).start()
    Process(target=func, args=(4,)).start()

# 说明:主要看时间片的轮转,是一个随机事件
# -
# ----
# ---
# --

例:使用for循环开启多个子进程

import time
from multiprocessing import Process

n = 100
def func(n):
    time.sleep(1)
    print(-*n)

if __name__ == __main__:
    for i in range(10):
        Process(target=func, args=(i,)).start()
# -
# --
# --------
# ----
# ---
# -----
# ---------
# ------
# -------

阻塞

例:使用阻塞(join)开启子进程

import time
from multiprocessing import Process

n = 100
def func(n):
    time.sleep(1)
    print(-*n)

if __name__ == __main__:
    p = Process(target=func, args=(1,))
    p.start()
    print(子进程开始了)
    p.join() # 阻塞 直到子进程都执行完毕
    print(十条信息已经都发送完了)
# 子进程开始了
# -
# 十条信息已经都发送完了

例:使用阻塞(join)循环开启多个子进程

import time
from multiprocessing import Process

n = 100
def func(n):
    time.sleep(1)
    print(-*n)

if __name__ == __main__:
    lst = []
    for i in range(10):
        p = Process(target=func, args=(i,))
        p.start()
        lst.append(p)
    for p in lst:p.join()

    print(十条信息已经都发送完了)

# -----
# -
# ---
# -------
# --
# ---------
# ----
# --------
# ------
# 
# 十条信息已经都发送完了

守护进程

守护进程也是一个子进程

定义:当主进程的代码执行完毕之后自动结束的子进程叫做守护进程

例:

import time
from multiprocessing import Process
def deamon_func():
    while True:
        print(我还活着)
        time.sleep(0.5)


if __name__ == __main__:
    p = Process(target=deamon_func)
    # 开启守护进程
    p.daemon = True
    p.start()
    for i in range(3):
        print(i**)
        time.sleep(0.4)

# 我还活着
# *
# 我还活着
# **
# 我还活着
# 说明:正常情况下,父进程会等待子进程结束才结束
# 而设置守护进程后,父进程结束了,子进程的死循环即可停止

练习:分析程序请问会打印多少个我还活着

import time
from multiprocessing import Process
def deamon_func():
    while True:
        print(我还活着)
        time.sleep(0.5)

def wahaha():
    for i in range(10):
        time.sleep(1)
        print(i*#)

if __name__ == __main__:
    Process(target=wahaha).start()
    p = Process(target=deamon_func)
    p.daemon = True
    p.start()
    for i in range(3):
        print(i**)
        time.sleep(1)


# 我还活着
# 我还活着
# *
#
# 我还活着
# 我还活着
# **
# #
# 我还活着
# 我还活着
# ##
# ###
# ####
# #####
# ######
# #######
# ########
# #########
# 说明:因为‘我还活着‘是守护进程打印的,所以当父进程结束后,停止打印,3/0.5=6次
# 但程序不会停止,需要等待子进程wahaha结束

例:主进程添加阻塞

import time
from multiprocessing import Process
def deamon_func():
    while True:
        print(我还活着)
        time.sleep(0.5)

def wahaha():
    for i in range(10):
        time.sleep(1)
        print(i*#)

if __name__ == __main__:
    p2 = Process(target=wahaha)
    p2.start()
    p = Process(target=deamon_func)
    p.daemon = True
    p.start()
    for i in range(3):
        print(i**)
        time.sleep(1)
    p2.join()

# 我还活着
# 我还活着
# *
# 
# 我还活着
# 我还活着
# **
# #
# 我还活着
# 我还活着
# ##
# 我还活着
# 我还活着
# ###
# 我还活着
# 我还活着
# ####
# 我还活着
# 我还活着
# #####
# 我还活着
# 我还活着
# ######
# 我还活着
# 我还活着
# #######
# 我还活着
# 我还活着
# ########
# 我还活着
# 我还活着
# #########
# 我还活着

总结

# 开启一个子进程 start
# 子进程和主进程是异步的
# 如果在主进程中要等待子进程结束之后再执行某行代码:join
# 如果有多个子进程 不能在start一个进程之后就立刻join,把所有的进程放到列表中
# 等待所有的进程都start之后再逐一join
# 守护进程 -- 当主进程的代码执行完毕之后自动结束的子进程叫做守护进程

添加锁是为了保证程序的运行顺序,牺牲了效率但是保护数据的安全

例:未添加阻塞、锁

import os
import time
import random
from multiprocessing import Process,Lock

def work(n):
    print(%s: %s is running %(n,os.getpid()))
    time.sleep(random.random())
    print(%s:%s is done %(n,os.getpid()))

if __name__ == __main__:
    for i in range(3):
        p=Process(target=work,args=(i,  ))
        p.start()
# 0: 23192 is running
# 2: 6064 is running
# 1: 22560 is running
# 1:22560 is done
# 0:23192 is done
# 2:6064 is done
# 说明:在没有使用阻塞,锁的时候,因为时间片是随机事件,所以随机子进程开始

例:使用阻塞保证子进程的独立运行

import os
import time
import random
from multiprocessing import Process,Lock

def work(n):
    print(%s: %s is running %(n,os.getpid()))
    time.sleep(random.random())
    print(%s:%s is done %(n,os.getpid()))

if __name__ == __main__:
    for i in range(3):
        p=Process(target=work,args=(i,  ))
        p.start()
        p.join()

# 0: 13184 is running
# 0:13184 is done
# 1: 22868 is running
# 1:22868 is done
# 2: 22200 is running
# 2:22200 is done

例:使用锁

import os
import time
import random
from multiprocessing import Process,Lock

def work(n, lock):
    lock.acquire()
    print(%s: %s is running %(n,os.getpid()))
    time.sleep(random.random())
    print(%s:%s is done %(n,os.getpid()))
    lock.release()

if __name__ == __main__:
    lock = Lock()
    for i in range(3):
        p=Process(target=work,args=(i,lock))
        p.start()
        
# 0: 18864 is running
# 0:18864 is done
# 1: 23264 is running
# 1:23264 is done
# 2: 15820 is running
# 2:15820 is done

信号量

信号量本质是锁+计数器

from multiprocessing import Process,Semaphore
import time,random

def go_ktv(sem,user):
    sem.acquire()
    print(%s 占到一间ktv小屋 %user)
    time.sleep(random.randint(3,5)) #模拟每个人在ktv中待的时间不同
    sem.release()
    print(%s 走出ktv小屋 % user)

if __name__ == __main__:
    sem=Semaphore(4)
    p_l=[]
    for i in range(13):
        p=Process(target=go_ktv,args=(sem,user%s %i,))
        p.start()
        p_l.append(p)

    for i in p_l:
        i.join()
    print(============》)

# 信号量的本质就是 锁+计数器
# user12 占到一间ktv小屋
# user8 占到一间ktv小屋
# user0 占到一间ktv小屋
# user7 占到一间ktv小屋
# user12 走出ktv小屋
# user4 占到一间ktv小屋
# user6 占到一间ktv小屋
# user0 走出ktv小屋
# user8 走出ktv小屋
# user3 占到一间ktv小屋
# user7 走出ktv小屋
# user11 占到一间ktv小屋
# ******
# ============》
# 说明:程序设置Semaphore(4),表示同时只能有四个子进程存在,如果有结束的,再添加新的,知道所有子进程结束

事件

事件内部内置了一个标志
wait 方法 如果这个标志是True,那么wait == pass
wait 方法 如果这个标志是False,那么wait就会陷入阻塞,一直阻塞到标志从False变成True

一个事件在创建之初 内部的标志默认是False
False -> True set()
True -> False clear()

红绿灯模型

# 红绿灯模型
from multiprocessing import Process, Event
import time, random

def car(e, n):
    while True:
        if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
            print(33[31m红灯亮33[0m,car%s等着 % n)
            e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
            print(33[32m车%s 看见绿灯亮了33[0m % n)
            time.sleep(random.randint(3, 6))
            if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
                continue
            print(车开远了,car, n)
            break

def traffic_lights(e, inverval):
    while True:
        time.sleep(inverval)   # 先睡3秒
        if e.is_set():         # 标志是True
            print(######, e.is_set())
            e.clear()  # ---->将is_set()的值设置为False
        else:                 # 标志是False
            e.set()    # ---->将is_set()的值设置为True
            print(***********,e.is_set())


if __name__ == __main__:
    e = Event()   #e就是事件
    t = Process(target=traffic_lights, args=(e, 3))  # 创建一个进程控制红绿灯
    for i in range(10):
        p=Process(target=car,args=(e,i,))  # 创建是个进程控制10辆车
        p.start()
    t.start()

    print(============》)



# 10个进程 模拟车 :车的行走要依靠当时的交通灯
# 交通灯是绿灯 车就走
# 交通灯是红灯 车就停 停到灯变绿
# wait 来等灯
# set clear 来控制灯

队列

管道+锁 = 队列

管道也是一个可以实现进程之间通信的模型

但是管道没有锁,数据不安全

from multiprocessing import Queue,Process

def func(q,num):
    try:
        t = q.get_nowait()
        print("%s抢到票了"%num)
    except:pass

if __name__ == __main__:
    q = Queue()
    q.put(1)
    for i in range(10):
        Process(target=func,args=(q,i)).start()

3.线程                                                        

什么是进程 :是计算机资源分配的最小单位

什么是线程 :是CPU调度的最小单位

线程和进程的关系:

  每个进程中都至少有一个线程

多线程服务的时候才能感受到线程的存在

本质的区别:

  同一个进程的每个线程之间数据共享

  线程之间也是异步的

  线程是轻量级的:创建一个线程的时间开销要远远小于进程

  线程是CPU调度的最小单位

例:线程的开启

import os
import time
from threading import Thread
def func(i):
    print(os.getpid())
    time.sleep(1)
    print(thread,i)

Thread(target=func,args=(1,)).start()
print(123**)
print(main, os.getpid())
# 23280
# ***************************************************************************************************************************
# main 23280
# thread 

总结:

# 每一个进程里至少有一个主线程负责执行代码
# 在主线程中可以再开启一个新的线程
# 在同一个进程中就有两个线程同时在工作了
# 线程才是CPU调度的最小单位
# 多个线程之间的数据是共享的
# GIL锁 全局解释器锁
# 解释器的锅 Cpython解释器的问题
# 在同一个进程中 同一时刻 只能有一个线程被CPU执行
# 导致高计算型 代码 不适合用python的多线程来解决
# 用多进程或者分布式来解决高计算型代码

守护线程

例:

from threading import Thread
import time
def foo():
    while True:
        print(123)
        time.sleep(1)

def bar():
    print(456)
    time.sleep(3)
    print("end456")

t1 = Thread(target=foo)
t2 = Thread(target=bar)

t1.daemon = True
t1.start()
t2.start()
print("main-------")


# 主线程结束了之后守护线程也同时结束
# 守护线程会等待主线程完全结束之后才结束

例:

from threading import Thread,Lock
import time
def work():
    global n
    lock.acquire()
    temp=n
    time.sleep(0.1)
    n = temp-1
    lock.release()
if __name__ == __main__:
    lock=Lock()
    n=100
    l=[]
    for i in range(100):
        p=Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()
    print(n)

# 当你的程序中出现了取值计算再赋值的操作 数据不安全 —— 加锁

线程池

程序可以开启的进程数和线程数

CPU的个数+1 进程数

CPU的个数*5 线程数

池的介绍
#1 介绍
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

#2 基本方法
#submit(fn, *args, **kwargs)
异步提交任务

#map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

#shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

#result(timeout=None)
取得结果

#add_done_callback(fn)
回调函数

例:

import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def func(num):
    print(num)
    time.sleep(1)
    print(num)
if __name__ == __main__:
    t = ThreadPoolExecutor(20)
    for i in range(100):
        t.submit(func,i)
    t.shutdown()  # join整个池子,必须添加
    print(done)
# 说明:线程会以20个为一组开启

例:使用map取代了for+submit

import os,time,random
from concurrent.futures import ThreadPoolExecutor
def task(n):
    print(%s is runing %os.getpid(),n)
    time.sleep(random.randint(1,3))
    return n**2
if __name__ == __main__:
    executor=ThreadPoolExecutor(max_workers=3)
    executor.map(task,range(1,12)) #map取代了for+submit

callback回掉函数 - 当拿到返回值的时候,立刻进行某操作

我有10个http的网页请求,把这10个网页上的信息分析了

例:

import time
import random
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread
urls=[
        https://www.baidu.com,
        https://www.python.org,
        https://www.openstack.org,
        https://help.github.com/,
        http://www.sina.com.cn/
        http://www.cnblogs.com/
        http://www.sogou.com/
        http://www.sohu.com/
    ]

def analies(content):
    print(分析网页,current_thread())
    print(content.result())

def get_url(url):
    print(爬取网页,current_thread())
    time.sleep(random.uniform(1,3))
    # analies(url*10)
    return url*10

t = ThreadPoolExecutor(3)
print(主线程,current_thread())
for url in urls:
    t.submit(get_url,url).add_done_callback(analies)

concurrent.futures callback是由子线程做的

线程的其他知识与进程类似

4.协程                                                        

协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

进程 计算机中资源分配的最小单位 cpu+1

线程 CPU 调度最小单位 cpu*5

协程 把一个线程拆分成几个 500

进程 线程 都是操作系统在调度

协程 是程序级别调度

减轻了操作系统的负担、增强了用户对程序的可控性

例1:

from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat(name):
    print(%s eat 1 %name)
    time.sleep(2)
    print(%s eat 2 %name)

def play(name):
    print(%s play 1 %name)
    time.sleep(1)
    print(%s play 2 %name)


g1=gevent.spawn(eat,egon)
g2=gevent.spawn(play,name=egon)
g1.join()
g2.join()
# gevent.joinall([g1,g2])
print()
# egon eat 1
# egon play 1
# egon play 2
# egon eat 2
#

例2:

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print(GET: %s %url)
    response=requests.get(url)
    if response.status_code == 200:
        print(%d bytes received from %s %(len(response.text),url))


start_time=time.time()
gevent.joinall([
    gevent.spawn(get_page,https://www.python.org/),
    gevent.spawn(get_page,https://www.yahoo.com/),
    gevent.spawn(get_page,https://github.com/),
])
stop_time=time.time()
print(run time is %s %(stop_time-start_time))
# GET: https://www.python.org/
# GET: https://www.yahoo.com/
# GET: https://github.com/
# 48801 bytes received from https://www.python.org/
# 527368 bytes received from https://www.yahoo.com/
# 59420 bytes received from https://github.com/
# run time is 6.552013397216797












以上是关于第十五章 并发编程的主要内容,如果未能解决你的问题,请参考以下文章

并发编程之线程

Flask 学习笔记-第十五章-应用编程接口

《JAVA编程思想》学习笔记——第十五章 泛型

第十五章 原子变量和非阻塞同步机制

第十五章 Python多进程与多线程

C Primer Plus(第六版)第十五章 编程练习答案