进程与线程

Posted linglinglingling

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了进程与线程相关的知识,希望对你有一定的参考价值。

一、进程和线程概念和关系

概念:

进程:

进程即正在运行的一个过程,进程是对正在执行的程序的一个抽象概念。

 

进程是概念起源与操作系统,是操作系统最核心的概念,也是操作系统

操作系统提供的最古老,也是最重要的抽象概念之一,操作系统的其它

所用内容都是围绕进程的概念展开的。

 

进程:进程指正在运行的程序,确切的来说,当一个程序进入内存运行,

即变成一个进程,进程是处于运行中的程序,并且具有一定的独立功能。

 

进程是计算机中的程序关于某数据集上的一次运行活动(是代码执行的过程),

是系统进行分配和调度的基本单位,是操作系统结构的基础,或者说进程是具

有一定独立功能的程序关于某个数据集上的一次活动,进程是系统进行资源

分配和调度的一个独立单位。(进程是一个资源单位)

 

进程就是一个程序在一个数据集上的一次动态执行过程,进程一般有程序、数据集、

进程控制块三部分组成。

 

线程:

线程则是一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的

基本单位。(线程就是一个执行单位)

 

线程也叫轻量级进程,它是一个基本的CPU单位,也是程序执行最小单位,

由线程 ID、程序计算器、寄存器集合和堆栈共同组成。

 

线程的引入减小了程序并发执行的开销,提高了操作系统的并发性能,线程

没有自己的系统资源。

 

一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程,

资源分配给进程,同一个进程的所有线程共享该进程的所有资源CPU分给线程,

即真正在CPU运行的是线程。

 

理论知识的扩展:

操作系统的作用:

1、隐藏丑陋复杂的硬件接口,提供良好的抽象接口

2、管理、调度进程、并且将多个进程对硬件的竞争变得有序

 

多道技术:
产生背景:针对单核,实现并发

【强调】现在的主机一般是多核,那么每个核会利用多道技术

【重点】有4个CPU,运行CPU1的某个程序遇到IO阻塞,会等到IO结束再

重新调度,被调度到4个CPU中的任意一个,具体由操作系统调度算法决定。

【空间上的复用】如内存中同时有多道程序

【时间的复用】复用一个CPU的时间片

【强调】遇到 IO 切,占用 CPU 时间过长也切,核心在于切这前将进程的状态

保存下来,这样才能保证下次切换回来时,能基于上次切走的位置继续运行。

 

二、多线程中的Thread.join()方法和Thread.setDaemon(True)方法

threading.Thread.join() 线程阻塞,只有当线程运行结束后才会继续执行后续语句

join()的作用是阻塞进程直到线程执行完毕。

import threading
import time

def A(a):
    print("%s开始运行"%a)
    time.sleep(3)
    print("%s执行结束"%a)

def B(b):
    print("%s开始运行"%b)
    time.sleep(5)
    print("%s结束运行"%b)

c = []

t1 = threading.Thread(target=A,args="A")
t2 = threading.Thread(target=B,args="B")

c.append(t1)
c.append(t2)

if __name__ == "__main__":
    for t in c:
        t.start()
    t2.join()
    print("主线程执行完毕")

 此处jion的原理就是依次检验线程池中的线程是否结束,没有结束就阻塞直到线程结束,

如果结束则跳转执行下一个线程的join函数。而python的join函数还有一个特殊的功能,

就是可以设置超时,threading.Thread.join([timeout])

如下:

import threading
import time

def A(a):
    print("%s开始运行"%a)
    time.sleep(3)
    print("%s执行结束"%a)

def B(b):
    print("%s开始运行"%b)
    time.sleep(5)
    print("%s结束运行"%b)

c = []

t1 = threading.Thread(target=A,args="A")
t2 = threading.Thread(target=B,args="B")

c.append(t1)
c.append(t2)

if __name__ == "__main__":
    for t in c:
        t.start()
    t2.join(2)
    print("主线程执行完毕")

 也就是通过传给join一个参数来设置时,也就是超过时间join就不在阻塞进程。

而在实际应用测试的时候发现并不是所有线程在超时时间内都结束的,而是顺

序执行检验是否在time_out时间内超时,例如,超时时间设置成2s,前面一个线程

没有完成的情况下,后面线程执行join会从上一个线程结束时间起在设置2s的超时。

 

setDaemon(True)

  将线程声明为守护线程,必须在start()方法之前设置,如果不设置为守护线程程序会被

无限挂起,这个方法基本和join是相反的,当我们在程序运行中,执行一个主线程又创建

一个子线程,主线程和子线程就兵分两路,分别运行,那么主线程完成想退出时,会检验

子线程是否完成,如果子线程未完成,则主线程会等待子线程完成后退出。但是有时候我们

需要的是只要主线程完成了,不过子线程是否完成,都要和主线程一起退出,这是就可以用setDaemon方法

import threading
import time

def A(a):
    print("%s开始运行"%a)
    time.sleep(3)
    print("%s执行结束"%a)

def B(b):
    print("%s开始运行"%b)
    time.sleep(5)
    print("%s结束运行"%b)

c = []

t1 = threading.Thread(target=A,args="A")
t2 = threading.Thread(target=B,args="B")

c.append(t1)
c.append(t2)

if __name__ == "__main__":
    t2.setDaemon(True)
    for t in c:
        t.start()
    print("主线程执行完毕")

 其它方法:

run() 用来表示线程活动的方法

statr() 启动线程活动

isAlive() 返回线程是否活动

getnName() 返回线程名

setName() 设置线程名

threading模块提供的一些方法:

threading.currentThread() 返回当前线程变量

threading.enumerate() 返回一个包含运行的线程list,正在运行指线程启动后,结束前,不包括启

动前和终止后的线程

threading.activeCount() 返回正在运行的线程数量,与len(threading.enumerate()) 有相同结果

 

# 用面向对象的方式来创建线程
import threading
import time


class Mysb(threading.Thread):
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def run(self):
        print("%s 开始运行" % self.name)
        time.sleep(3)
        print("%s 结束运作" % self.name)


if __name__ == "__main__":
    t1 = Mysb("A")
    t2 = Mysb("B")
    t1.start()
    t2.start()
    print("主线程执行完毕")

 三、并发与并行

并发:是指一个系统具有处理多个任务(动作)的能力,(一个CPU进行切换就可以完成的)

并行:指一个系统具有同时(唯一时刻)处理多个任务(动作)的能力,(多核,4核,4个CPU可同时处理1-4个任务的能力)

并行是并发的子集。(并行是并发的子集。比如在单核CPU系统上,只可能存在并发而不可能存在并行)

技术图片

 

四、同步与异步

同步执行:当进程执行到一个 IO (阻塞的时候),出现等待,这个就是同步。

异步执行:当进程执行到一个 IO(阻塞的时候),不会等待,直到数据到来之后再回过头来处理,这就是异步。

 五、GIL(全局解释锁)

1、Python语言和GIL没有半毛钱关系。仅仅是由于历史原因在Cpython虚拟机(解释器),难以移除CIL。

2、GIL:全局解释器。每个线程在执行的过程都要需要先获取GIL,保证同一时刻只有一个线程执行代码。

3、线程释放GIL锁的情况:在 IO 操作等可能会引起阻塞的 system call之前,可以暂时释放GIL python 3.x使用计时器(执行时间达到阈值后,当前线程释放GIL)

或python 2.x tickets计数达到100

4、python 使用多进程是可以利用多核的CPU资源的

5、多线程爬取比单线程性能有提升,因为遇到IO阻塞会自动释放GIL锁

6、无论你启动多少个线程,你有多少个CPU,python在执行一个进程的时候会淡定的在同一时

刻只允许一个线程运行,所以,python是无法利用多核CPU实现多线程的。这样,python对于

计算密集型的任务开多线程的效率甚至不如串行(没有大量切换),但是,对于IO密集型任务

效率还是有显著提升的。

技术图片

小结:

1、在计算密集型这类需要持续使用CPU的任务的时候,单线程比多进程快

2、在 IO 密集型 操作等可能引起阻塞的这类任务的时候,多线程会比多进程快,因为GIL的存在

只有 IO Bound场景下的多线程会得到比较好的性能提升,如果对于并行计算性能较高的程序可

一考虑把核心部分变成c模块或者索性用其它语言实现,GIL在较长一段时间内将会继续存在但是会不断对其进行改进。

# 计算密集型
import threading
import time

def A():
    a = 0
    for i in range(100000000):
        a += 1
    print(a)
    return True

def main():
    l  = []
    z = time.time()
    for i in range(2):
        t = threading.Thread(target=A)
        t.start()
        l.append(t)


    for i in l:
        i.join()
    c = time.time()
    print("时间%s" % (c - z))
if __name__ == "__main__":
    main()

# python 3.6
# 并发:13.749624490737915
# 串行:7.290215253829956
#
# python 2.7
# 并发:12.26799988747
# 串行:8.17599987984
#
# 计算密集型,多线程并发相比串行,没有显著优势

 六、同步锁

多线程和多进程最大的不同在于;多线程中,同一个变量,各自有一份拷贝存在于每个进程中,

互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个

线程修改,因此,线程之间共享数据最大的危险在于多线程同时修改一个变量,把内容给改乱了

为了处理多线程中对于公共资源的引用问题,引出了锁的概念。

import threading
import time

def A():
    global mun # 在每个线程中获取这个全局变量
    temp = mun
    time.sleep(0.1)
    mun = temp - 1 # 对此全局变量进行 - 1 操作

mun = 100 # 设定一个全局变量

thread_list = []

for i in range(100):
    t = threading.Thread(target=A)
    thread_list.append(t)
    t.start()

for t in thread_list: # 等待所有线程执行完毕
    t.join()

print("mun == %s"%mun)

# 多线程共享变量,无法保证变量安全

 如上实例,在一个进程内,设置全局变量mun==100,然后创建100个线程,执行mun-=1的操作

但是,由于在函数A中存在time.sleep(0.1),该语句等价于IO操作。于是在这短短的0.1秒的时间

内,所有线程已经创建并启动,拿到mun==100的变量,等待0.1秒过后,最终得到mun其实是99

 

锁通常被用来实现对共享资源的同步访问,为每一个共享资源创建一个Lock对象,当你需要访问

该资源时,调用acquire()方法来获取锁对象(如果其它线程以获得了该锁,则当前线程需等待

被释放),待资源访问完后,在调用release()方法释放锁:

import threading
import time

def A():
    global mun 
    lock.acquire()
    temp = mun
    time.sleep(0.1)
    mun = temp - 1
    lock.release()

mun = 100 

thread_list = []

lock = threading.Lock()
for i in range(100):
    t = threading.Thread(target=A)
    thread_list.append(t)
    t.start()

for t in thread_list: 
    t.join()

print("mun == %s"%mun)

# 应用Lock方法,保证变量安全

 lock.acquire() 于 lock.release() 包起来代码段,保证同一时刻只允许一个线程引用。

import threading

a = threading.Lock()

a.acquire()

# 对公共数据的操作

a.release()

 七、死锁和递归锁

死锁:

所谓死锁:是指两个或以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的

现象,若无外力作用,它们都无法推进下去,此时称为系统处于死锁状态或系统产生了死锁,

这些在互相等待的进程称为死进程。

import threading
import time


class Mysb(threading.Thread):
    def suoA(self):
        A.acquire()
        print(self.name, "锁A", time.time())
        time.sleep(2)
        B.acquire()  # 如果锁被占用,则阻塞在这里

        print(self.name, "锁B", time.time())
        time.sleep(1)

        B.release()
        A.release()


    def suoB(self):
        B.acquire()
        print(self.name, "锁A", time.time())
        time.sleep(2)
        A.acquire()

        print(self.name, "锁B", time.time())
        time.sleep(1)

        A.release()
        B.release()


    def run(self):
        self.suoA()
        self.suoB()


l = []

if __name__ == "__main__":
    A = threading.Lock()
    B = threading.Lock()
    for i in range(5):
        t = Mysb()
        l.append(t)
        t.start()
    for t in l:
        t.join()

    print("ending...")

在python中为了支持同一线程中多次请求同一资源,python提供了可重新入锁的RLock和一个

counter变量,counter变量记录了acquire的次数,从而使得资源可以被多次require直到一个线程

所有acquire都被release,其它的线程才能获得资源,上面的例子如果使用RLock代替Lock则不会发生死锁。

递归锁解决死锁:

import threading
import time

class Mysb(threading.Thread):
    def suoA(self):
        losk.acquire()
        print(self.name, "锁A", time.time())
        time.sleep(2)

        losk.acquire()
        print(self.name, "锁B", time.time())
        time.sleep(1)

        losk.release()
        losk.release()

    def suoB(self):
        losk.acquire()
        print(self.name, "锁B", time.time())
        time.sleep(2)

        losk.acquire()
        print(self.name, "锁A", time.time())
        time.sleep(1)

        losk.release()
        losk.release()

    def run(self):
        self.suoA()
        self.suoB()

l = []
if __name__ == ‘__main__‘:
    losk = threading.RLock()
    for i in range(5):
        t = Mysb()
        l.append(t)
        t.start()
    for t in l:
        t.join()

    print("线程之间是竞争关系,递归锁解决死锁")

 八、event对象

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其它线程需要通过

判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常刺手,为了解决

这些问题,我们需要使用threading库中的Event对象。对象包含一个可有线程设置的信号标志,

它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置成False,那么

这个线程将会一直阻塞至该标志位True。一个线程如果将一个Event对象的信号设置为True,

它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经设置为真的Event对象,

那么它将忽略这个事情,继续执行。

import threading
import time

class Boss(threading.Thread):
    def run(self):
        print("Boss:今晚大家都要加班到22:00.")
        print(event.isSet()) # 执行结果为False
        event.set()
        time.sleep(5)
        print("Boss:<22:00>可以下班了。")
        print(event.isSet())
        event.set() # 此方法就是在设定 类似于(标志位)的东西

class Worker(threading.Thread):
    def run(self):
        event.wait() # (在Boss类中有一个set设定后就不会等待了,
        # 一旦这个东西被设定了就不会再等待了,
        # 一旦event被设定,等同于pass;反之若没有被设定就相当于一个阻塞的状态)
        print("Worker:哎......命苦啊!")
        time.sleep(1)
        event.clear() # 用来清空 此 标志位
        event.wait() # 此时标志位被清空,没有被设定,所以又在等待
        print("Worker: OhYeah!")

l = []

if __name__ == "__main__":
    event = threading.Event() # 先创建一个event同步对象
    for i in range(5):
        l.append(Worker()) # 将五个Worker类的实例对象添加到列表 l 中去
    l.append(Boss())  # 创建一个老板对象

    for t in l:
        t.start()
    for t in l:
        t.join()
########5个工人对象加一个老板对象再加一个主线程对象,这七个线程一起进行运行#########
    print("ending.....")

 evnet.isSet() 返回event的状态值

event.wait() 如果event.isSet() == False 将阻塞线程

event.set() 设置event的状态值为True,所有阻塞池激活进入就绪状态,等待操作系统调度

evnet.clear() 恢复event的状态值为False

技术图片

 

九、信号量

信号量用来控制线程并发数的,BoundedSemaphore或Semaphore管理一个内置的计数器,每当

调用acquire()时-1,调用release()时 +1,计数器不能小于0,当计数器为0时,acquire()将阻塞线程

至同步状态,直到其它线程调用release()。(类似于停车场的概念)

BoundedSemaphore或Semaphore的唯一区别在于将在调用release()时检查计数器是否超过了计数器

的初始值,如果超过了将抛出一个异常。

import threading
import time

class A(threading.Thread):
    def run(self):
        print(self.name)
        time.sleep(1)
        c.release()

l = []

if __name__ == "__main__":
    c = threading.Semaphore(5)
    for i in range(100):
        l.append(A())
    for i in l:
        i.start()

 

十、队列(一种数据结构)

import queue

q = queue.Queue(5) #每条get()语句后面都需要一条task_done(表示某项任务完成)
q.put(20)
q.put(22)

print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join() # 等到队列为空在执行别的操作
print("ending......")

# # 队列存取数据的几种模式
# # 先进先出模式
import queue

q = queue.Queue(5) # FIFO模式(先进先出)

q.put(20)
q.put("hello")
q.put("name":"li")
q.put_nowait(22)

print(q.qsize()) # 返回队列的大小
print(q.empty()) # 检查队列是否为空
print(q.full()) # 检查队列是否被填满

while 1:
    data = q.get()
    print(data)
    print("---")

# # 队列先进后出模式
import queue

q = queue.LifoQueue()

q.put(20)
q.put(22)
q.put("l")

while 1:
    data = q.get()
    print(data)
    print("---")

# 按优先级顺序读取
import queue

q = queue.PriorityQueue()

q.put([4,22])
q.put([1,"sb"])
q.put([3,"hello"])

while 1:
    data = q.get()
    print(data[1])
    print("---")

 十一、生产者和消费者模型

为什么要使用生产者和消费者模型

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发中,

如果生产者处理速度很快,消费者处理速度很慢,那么生产者就必须等待消费者处理完,

才能继续生产数据,同样的道理。如果消费者的处理能力大于生产者,那么消费者就必须

等待生产者,为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决,生产者和消费者的强藕合问题。生产者和消费者

彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者完成数据后不再等待消费者处理,

直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个

缓冲区,平衡了生产者和消费者的处理能力。

这就像,在餐厅,厨师做好菜,不需要直接和客户交流,而客户去饭菜也不需要找厨师,直接去

前台领取即可,这也是一个解藕的过程。

# 生产者消费者模型1
import threading
import time
import queue
import random

q = queue.Queue()
def Producer(name):
    count = 0
    while count < 10:
        print("开始生产包子")
        time.sleep(random.randrange(3))
        q.put(count)
        print(‘ %s 生产出 %s 包子‘ % (name, count))
        count +=1
        print("ok...")

def Consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(5))
        if not q.empty():
            data = q.get()
            print(‘%s 吃掉 %s 包子‘ % (name, data))
            count += 1
        else:
            print("没包子")

a = threading.Thread(target=Producer,args="A")
b = threading.Thread(target=Consumer,args="B")
a.start()
b.start()

 

# 生产者消费者模型2
# 生产者消费者模型1
import threading
import time
import queue
import random

q = queue.Queue()
def Producer(name):
    count = 0
    while count < 10:
        print("开始生产包子")
        time.sleep(random.randrange(3))
        q.put(count)
        print(‘ %s 生产出 %s 包子‘ % (name, count))
        q.task_done()
        count +=1
        print("ok...")

def Consumer(name):
    count = 0
    while count < 10:
        time.sleep(random.randrange(5))
        q.join()
        data = q.get()
        print(‘%s 吃掉 %s 包子‘ % (name, data))
        count += 1

a = threading.Thread(target=Producer,args="A")
b = threading.Thread(target=Consumer,args="B")
c = threading.Thread(target=Consumer,args="C")
d = threading.Thread(target=Consumer,args="D")
a.start()
b.start()
c.start()
d.start()

 十二、多进程

  由于GIL的存在,python中的多线程其实并不是真正的多线程如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多线程。

multiprocessing包是python中的多线程管理包,与threading.Thread类似,它可以利用multiprocessing对象来创建一个进程。该进程可以运行在

python程序内部编写的函数,该Process对象与Thread对象的用法相同,也有start(),run(),join()的方法,此外multiprocessing包中也有Lock/Event/

Semaphore/Condition类(这些对象可以多线程那样,通过参数传递给各个进程)用以同步进程,其用法与threading包中的同名类一致。所以,

multiprocesssing的很大一部分与threading使用同一套API,只不过换到了多进程的情境。

1.创建方式:

#################第一种创建方式###################
# Process 类调用方式

from multiprocessing import Process
import time


def A(name):
    print("hello", name, time.ctime())
    time.sleep(1)


l = []
if __name__ == "__main__":
    for i in range(3):
        p = Process(target=A, args=("l",))
        l.append(p)
        p.start()

    for t in l:
        t.join()

    print("ending...................",time.ctime())

 

#######################继承Process类调用#######################
from multiprocessing import Process
import time

class My(Process):
    def __init__(self,a):
        super().__init__()
        self.a = a
        
    def run(self):
        print("hello",self.name,self.a,time.ctime())

l = []

if __name__ == "__main__":
    for i in range(3):
        p = My("A")
        l.append(p)
        p.start()

    for t in l:
        t.join()

    print("ending................",time.ctime())

 Process类

构造方法:

Process([group [,target [,name [, args [,kwargs]]]]])

group:线程组,目前还未实现,库引用中提示必须是None

target:要执行的方法

name:进程名

args/kwargs:要传入方法的参数

实例方法:

is_alive():返回进程是否在运行

join([timeout]):阻塞当前上下环境的进程,直到调用此方法的进程终止或到达指定的timeout(可选参数)

start():进程准备就绪,等待CPU调度

run():start()调用run方法,如果实例进程时未指定传入target,这start执行它默认的run()方法。、terminate():不管任务是否完成,立即停止工作进程

属性:

daemon:和线程的setDaemon功能一样

name:进程名字

pid:进程名

 

2、进程通讯的方式:

#三种通讯方式
###########通过进程队列来实现进程之间的通信###########
import multiprocessing
import time

def A(q):
    time.sleep(1)
    q.put(20)
    q.put("lwj")
    print("son process is",id(q))

l = []

if __name__ == "__main__":
    q = multiprocessing.Queue() # 创建一个进程队列
    print("main process is",id(q))
    for i in range(10):
        t = multiprocessing.Process(target=A,args=(q,)) # 将进程队列传递到子进程中去
        l.append(t)
        t.start()

    for t in l:
        t.join()

    print(q.get())
    print(q.get())



###########通过双向管道的形式进行通信###########
from multiprocessing import Process,Pipe

def A(conn):
    conn.send("lwj")
    data = conn.recv()
    print(data)
    print("q_ID2",id(conn))

l = []
if __name__ == "__main__":
    conn,addr = Pipe() #双向管道(即可以发信息,也可以接受信息)
    print("q_ID1",id(addr))
    t = Process(target=A,args=(conn,)) # 创建子进程,将对象子进程的管道也传输给子进程,
# 实现子进程与主进程之间的通信
    l.append(l)
    t.start()
    data = addr.recv()
    print(data) # "lwj"
    addr.send("lll")


###########通过 manager 实现通信及数据共享###########
from multiprocessing import Process, Manager

def A(d, l, n):
    d[n] = 1
    d["2"] = 20
    l.append(n)


if __name__ == "__main__":
    with Manager() as manager: # 创建一个meanager对象
        d = manager.dict() #  通过自己的方式创建一个meanager类型的字典
        l = manager.list(range(5)) # [0,1,2,3,4]

        li = []
        for i in range(10):
            t = Process(target=A, args=(d,l,i))
            t.start()
            li.append(t)

        for t in li:
            t.join()

        print(d)
        print(l)

 3、进程池

############# 进程池##############
from multiprocessing import Pool
import time


def A(i):
    time.sleep(1)
    print(i)


if __name__ == "__main__":
    p = Pool(5)  # 池中限定的最大进程数
    for i in range(100):
        p.apply_async(func=A, args=(i,))  # 从池子中取去一个异步进程并执行

    p.close()  # 等待子进程执行完毕后关闭线程池
    p.join()


############# 进程池2##############
from multiprocessing import Process,Pool
import time,os

def Aoo(i):
    time.sleep(1)
    print(i)
    print("son",os.getpid())
    return "lwj %s"%i

def Bar(arg):
    print(arg)
    print("hello")
    print("Bar:",os.getppid())

if __name__ == "__main__":
    p = Pool(5) #进程池中最多有5个进程
    print("main pid",os.getpid())
    for i in range(100):
        # p.apply(func=Aoo,args=(i,)) # 同步接口
        #p.apply_async(func=Aoo,args=(i,))

        # 回调函数:就是某个动作或者函数执行成功后再去执行的函数
        p.apply_async(func=Aoo,args=(i,),callback=Bar)
        

    p.close()
    p.join()  # join与close调用顺序是固定的      

 4、进程同步

from multiprocessing import Process,Lock
import time

def A(lock,i):
    lock.acquire()
    time.sleep(1)
    print("hello %s"%i)
    lock.release()

l = []

if __name__ == "__main__":
    lock = Lock()
    for i in range(10):
        t = Process(target=A,args=(lock,i))
        l.append(t)
        t.start()

 十三、协程

回顾一下线程和进程,线程是无法利用多核,so:无法实现真正意义上的并行效果。但是处理IO密集型任务(网络传输,web应用),有显著效果。

协程:顾名思义就是协作的意思,(非占用式的),它的内部也存在切换不过自己可以控制何时切换。

一、实现方式:

1、通过迭代器实现协程

import time

def A(name): # A 此时是一个生成器
    print("等待中")
    while True:
        x = yield
        print("%s 吃了 %s 包子"%(name,x))
        time.sleep(1)

def B():
    r = conn1.__next__() # 此时执行的conn1的next方法,就会执行此生成器
    r1 = conn2.__next__() # 此时执行的conn2的next方法,就会执行此生成器
    n = 0
    while True:
        time.sleep(1)
        print("制作出包子 %s 包子 %s"%(n,n+1))
        conn1.send(n) # 通过 send() 就可以切换到 A 进行执行
        conn2.send(n+1)
        n +=2

if __name__ == "__main__":
    conn1 = A("A") # 生成了一个生成器对象conn1,只是一个对象,什么作用也没有(想使用就要使用其next方法)
    conn2 = A("B")# 生成了一个生成器对象conn1,只是一个对象,什么作用也没有(想使用就要使用其next方法)
    B()

 2、greenlet

greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next() 或 send() 操作进行恢复为止。

可以使用一个调度器循环在一组生成器函数之间协作多个任务。greenlet是python中实现我们所谓的“Coroutine(协程)”的一个基本库

from greenlet import greenlet

def A():
    print("123")
    gr2.switch()
    print("789")
    gr2.switch()

def B():
    print("456")
    gr1.switch()
    print("10 11 12")

if __name__ == "__main__":
    gr1 = greenlet(A)  # gr1 绑定 函数 A
    gr2 = greenlet(B) # gr2 绑定 函数B

    print(gr1) # <greenlet.greenlet object at 0x00AC6A30>
    gr1.switch() # switch 是启动切换的意思

 3、基于greenlet的框架---gevent

gevent模块实现协程:

python通过yield提供了对协程的基本支持,但是不完全。而第三方gevent为python提供了比较完善的协程支持。

gevent是第三方库,通过greenlet实现协程,其基本思想是:

  当一个greenlet遇到IO操作时,比如访问网络,就会自动切换到其它的greenlet,等待IO操作完成,再适当的时候切换回来继续执行,

由于IO操作非常耗时,经常是程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。由于切换

是在IO操作时自动完成,所以gevent需要修改python自带的一些标准库,这一过程在启动通过mkey patch完成:

import gevent
import requests
import time

start = time.time()

def A(url):
    print("GET: %s"%url)
    resp = requests.get(url)
    data = resp.text
    print("%d bytes received from %s"%(len(data,),url))

##############使用串行的方式执行##############
# A(‘https://www.python.org/‘)
# A(‘https://www.yahoo.com/‘)
# A(‘https://www.baidu.com/‘)
# A(‘https://www.sina.com.cn/‘)
# A("http://www.xiaohuar.com/hua/")

##############使用协程的方式执行##############
gevent.joinall([
         gevent.spawn(A, ‘https://www.python.org/‘),
         gevent.spawn(A, ‘https://www.yahoo.com/‘),
         gevent.spawn(A, ‘https://www.baidu.com/‘),
         gevent.spawn(A, ‘https://www.sina.com.cn/‘),
         gevent.spawn(A, ‘http://www.xiaohuar.com/hua/‘),
])
print("cost time:",time.time()-start)

 

以上是关于进程与线程的主要内容,如果未能解决你的问题,请参考以下文章

Android中线程与线程,进程与进程之间如何通信?

JAVA线程与线程进程与进程间通信

OS——进程与线程

OS——进程与线程

OS——进程与线程

OS——进程与线程