Python 多线程进程协程上手体验

Posted llllllllll

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 多线程进程协程上手体验相关的知识,希望对你有一定的参考价值。

浅谈 Python 多线程、进程、协程上手体验


前言:浅谈 Python 很多人都认为 Python 的多线程是垃圾(GIL 说这锅甩不掉啊~);本章节主要给你体验下 Python 的几个库

  • Threading
  • Multiprocessing
  • Gevent

总结:多进程主要用于浮点

备注:Gevent 需要手动安装 [pip install gevent]

一.线程

Threading

Threading 模块建立在 _thread 模块之上。_thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

Demo:

import threading
from time import sleep

def A():
    \'\'\'打印 “A” 五次\'\'\'
    for x in range(5):
        print("AAA")
        sleep(1.0)

def B():
    for x in range(5):
        print("BBB")
        sleep(1.0)

def main():
    t1 = threading.Thread(target=A)
    t2 = threading.Thread(target=B)
    t1.start()  
    t2.start()

if __name__ == \'__main__\':
    main()

打印结果:

AAA
BBB
BBB
AAA
BBB
AAA
BBB
AAA
BBB
AAA
[Finished in 5.3s]

如果我们按照常规的方式执行 A()、B()方法,将耗时更多,结果如下:

AAA
AAA
AAA
AAA
AAA
BBB
BBB
BBB
BBB
BBB
[Finished in 10.3s]

对比下时间就知道多线程的重要性,简单来说就是花费更少时间做事得到最高的回报。

Threading 常用方法:

t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称 
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() : 父线程打印内容后便结束了,不管子线程是否执行完毕了
t.isDaemon() : 判断是否为守护线程
t.ident : 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() : 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() : 线程被cpu调度后自动执行线程对象的run方法

不一样的 _Thread

说明:Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。

Demo:

import _thread
import time

def print_time( threadName, delay):
    count = 0
    while count < 5:
        time.sleep(delay)
        count += 1
        print ("%s: %s" % ( threadName, time.ctime(time.time()) ))

try:
    _thread.start_new_thread( print_time, ("Thread-1", 2, ) )
    _thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
    print ("Error: 无法启动线程")
while True:
    pass

打印结果:

Thread-1: Wed Mar 20 15:36:32 2019
Thread-1: Wed Mar 20 15:36:34 2019
Thread-2: Wed Mar 20 15:36:34 2019
Thread-1: Wed Mar 20 15:36:36 2019
Thread-2: Wed Mar 20 15:36:38 2019
Thread-1: Wed Mar 20 15:36:38 2019
[Cancelled]

_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。

  • _Thread 模块已被废弃,Threading 身为它的接班人

二.进程

Multiprocessing

Multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块 threading 的编程接口类似。

Demo:

from multiprocessing import Process  
import time

def _proces(name):
    print("Process " + name)
    time.sleep(1.0)

if __name__ == "__main__": 
    p1 = Process(target=_proces, args=(\'A\',))
    p1.start()
    p1.join()
    p2 = Process(target=_proces, args=(\'B\',))
    p2.start()
    p2.join()

输出打印结果如下:

Process A
Process B
[Finished in 3.2s]

Multiprocessing 常用方法:

p.start():启动进程,并调用该子进程中的p.run() 
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,\'hexin\',)
kwargs表示调用对象的字典,kwargs={\'name\':\'hexin\',\'age\':18}
name为子进程的名称

三.协程

Gevent

Gevent 又称微线程,在单线程上执行多个任务,用函数切换,开销极小。不通过操作系统调度,没有进程、线程的切换开销。genvent,monkey.patchall

Demo:

from gevent import monkey
monkey.patch_all()
import gevent
import urllib.request
 
def run(url):
    print(\'run --> %s\' % url)
    try:
        response = urllib.request.urlopen(url)
        data = response.read()
        print(\'%d bytes received from %s.\' % (len(data), url))
    except Exception as e:
        print(e)
 
if __name__ == \'__main__\':
    urls = [\'https://www.baidu.com\',\'https://www.jd.com\',\'https://www.cnblogs.com/\']
    lis = [gevent.spawn(run, url) for url in urls]
    gevent.joinall(lis)

输出打印结果如下:

run --> https://www.baidu.com
run --> https://www.jd.com
run --> https://www.cnblogs.com/
227 bytes received from https://www.baidu.com.
111917 bytes received from https://www.jd.com.
47872 bytes received from https://www.cnblogs.com/.
[Finished in 0.7s]

四.实例

多线程Demo:

from threading import Thread
from queue import Queue
from lxml import etree
import requests

class douban(Thread):
    def __init__(self, url, q):
        # 重写写父类的__init__方法
        super(douban, self).__init__()
        self.url = url
        self.q = q
        self.headers={}

    def run(self):
        self.parse_page()

    def send_request(self,url):
        \'\'\'
        用来发送请求的方法
        :return: 返回网页源码
        \'\'\'
        # 请求出错时,重复请求3次,
        i = 0
        while i <= 3:
            try:
                html = requests.get(url=url,headers=self.headers).content
            except Exception as e:
                print(u\'[DEBUG] %s%s\'% (e,url))
                i += 1
            else:
                return html

    def parse_page(self):
        \'\'\'
        解析网站源码,并采用 xpath 提取 电影名称和平分放到队列中
        :return:
        \'\'\'
        response = self.send_request(self.url)
        html = etree.HTML(response)
        # 获取到一页的电影数据
        node_list = html.xpath("//div[@class=\'info\']")
        for move in node_list:
            # 电影名称
            title = move.xpath(\'.//a/span/text()\')[0]
            # 电影主题
            theme = str(move.xpath(\'.//div[@class="bd"]//span[@class="inq"]/text()\'))
            # 评分
            score = move.xpath(\'.//div[@class="bd"]//span[@class="rating_num"]/text()\')[0]
            # 将每一部电影的名称跟评分加入到队列
            self.q.put(title + "\\t |" + score +"\\t" + theme)

def main():
    # 创建一个队列用来保存进程获取到的数据
    q = Queue()
    base_url = \'https://movie.douban.com/top250?start=\'
    # 构造所有 url
    url_list = [base_url+str(num) for num in range(0,50+1,25)]

    # 保存线程
    Thread_list = []
    # 创建并启动线程
    for url in url_list:
        p = douban(url,q)
        p.start()
        Thread_list.append(p)

    # 让主线程等待子线程执行完成
    for i in Thread_list:
        i.join()

    while not q.empty():
        print(q.get())

if __name__=="__main__":
    main()

输出打印:

肖申克的救赎    |9.6  [\'希望让人自由。\']
霸王别姬     |9.6  [\'风华绝代。\']
这个杀手不太冷  |9.4  [\'怪蜀黍和小萝莉不得不说的故事。\']
阿甘正传     |9.4  [\'一部美国近现代史。\']
美丽人生     |9.5  [\'最美的谎言。\']
泰坦尼克号    |9.3  [\'失去的才是永恒的。 \']
千与千寻     |9.3  [\'最好的宫崎骏,最好的久石让。 \']
辛德勒的名单   |9.5  [\'拯救一个人,就是拯救整个世界。\']
    ..........(省略)
    ..........(省略)
[Finished in 0.9s]

多进程Demo:

from multiprocessing import Process, Queue
import time
from lxml import etree
import requests

class douban(Process):
    def __init__(self, url, q):
        # 重写写父类的__init__方法
        super(douban, self).__init__()
        self.url = url
        self.q = q
        self.headers = {}

    def run(self):
        self.parse_page()

    def send_request(self,url):
        \'\'\'
        用来发送请求的方法
        :return: 返回网页源码
        \'\'\'
        # 请求出错时,重复请求3次,
        i = 0
        while i <= 3:
            try:
                return requests.get(url=url,headers=self.headers).content
            except Exception as e:
                print(u\'[DEBUG] %s%s\'% (e,url))
                i += 1

    def parse_page(self):
        \'\'\'
        解析网站源码,并采用xpath提取 电影名称和平分放到队列中
        :return:
        \'\'\'
        response = self.send_request(self.url)
        html = etree.HTML(response)
        # 获取到一页的电影数据
        node_list = html.xpath("//div[@class=\'info\']")
        for move in node_list:
            # 电影名称
            title = move.xpath(\'.//a/span/text()\')[0]
            # 电影主题
            theme = str(move.xpath(\'.//div[@class="bd"]//span[@class="inq"]/text()\'))
            # 评分
            score = move.xpath(\'.//div[@class="bd"]//span[@class="rating_num"]/text()\')[0]
           
            # 将每一部电影的名称跟评分加入到队列
            self.q.put(title + "\\t |" + score +"\\t" + theme)


def main():
    # 创建一个队列用来保存进程获取到的数据
    q = Queue()
    base_url = \'https://movie.douban.com/top250?start=\'
    # 构造所有url
    url_list = [base_url+str(num) for num in range(0,50+1,25)]

    # 保存进程
    Process_list = []
    # 创建并启动进程
    for url in url_list:
        p = douban(url,q)
        p.start()
        Process_list.append(p)
    
    # 让主进程等待子进程执行完成
    for i in Process_list:
        i.join()

    while not q.empty():
        print(q.get())

if __name__=="__main__":
    main()

输出打印:

肖申克的救赎    |9.6  [\'希望让人自由。\']
霸王别姬     |9.6  [\'风华绝代。\']
这个杀手不太冷  |9.4  [\'怪蜀黍和小萝莉不得不说的故事。\']
阿甘正传     |9.4  [\'一部美国近现代史。\']
美丽人生     |9.5  [\'最美的谎言。\']
泰坦尼克号    |9.3  [\'失去的才是永恒的。 \']
千与千寻     |9.3  [\'最好的宫崎骏,最好的久石让。 \']
辛德勒的名单   |9.5  [\'拯救一个人,就是拯救整个世界。\']
盗梦空间     |9.3  [\'诺兰给了我们一场无法盗取的梦。\']
忠犬八公的故事  |9.3  [\'永远都不能忘记你所爱的人。\']
机器人总动员   |9.3  [\'小瓦力,大人生。\']
    ..........(省略)
    ..........(省略)
[Finished in 2.4s]

协程Demo:

from queue import Queue
import time
from lxml import etree
import requests
import gevent

from gevent import monkey
monkey.patch_all()

class douban(object):
    def __init__(self):
        # 创建一个队列用来保存进程获取到的数据
        self.q = Queue()
        self.headers = {}

    def run(self,url):
        self.parse_page(url)

    def send_request(self,url):
        \'\'\'
        用来发送请求的方法
        :return: 返回网页源码
        \'\'\'
        # 请求出错时,重复请求3次,
        i = 0
        while i <= 3:
            try:
                html = requests.get(url=url,headers=self.headers).content
            except Exception as e:
                print(u\'[DEBUG] %s%s\'% (e,url))
                i += 1
            else:
                return html

    def parse_page(self,url):
        \'\'\'
        解析网站源码,并采用xpath提取 电影名称和平分放到队列中
        :return:
        \'\'\'
        response = self.send_request(url)
        html = etree.HTML(response)
        # 获取到一页的电影数据
        node_list = html.xpath("//div[@class=\'info\']")
        for move in node_list:
            # 电影名称
            title = move.xpath(\'.//a/span/text()\')[0]
            # 电影主题
            theme = str(move.xpath(\'.//div[@class="bd"]//span[@class="inq"]/text()\'))
            # 评分
            score = move.xpath(\'.//div[@class="bd"]//span[@class="rating_num"]/text()\')[0]

            # 将每一部电影的名称跟评分加入到队列
            self.q.put(title + "\\t |" + score +"\\t" + theme)


    def main(self):
        base_url = \'https://movie.douban.com/top250?start=\'
        # 构造所有url
        url_list = [base_url+str(num) for num in range(0,225+1,25)]
        # 创建协程并执行
        job_list = [gevent.spawn(self.run,url) for url in url_list]
        # 让线程等待所有任务完成,再继续执行。
        gevent.joinall(job_list)

        while not self.q.empty():
            print(self.q.get())

if __name__=="__main__":
    douban = douban()
    douban.main()

输出打印:

肖申克的救赎    |9.6  [\'希望让人自由。\']
霸王别姬     |9.6  [\'风华绝代。\']
这个杀手不太冷  |9.4  [\'怪蜀黍和小萝莉不得不说的故事。\']
阿甘正传     |9.4  [\'一部美国近现代史。\']
美丽人生     |9.5  [\'最美的谎言。\']
泰坦尼克号    |9.3  [\'失去的才是永恒的。 \']
千与千寻     |9.3  [\'最好的宫崎骏,最好的久石让。 \']
辛德勒的名单   |9.5  [\'拯救一个人,就是拯救整个世界。\']
盗梦空间     |9.3  [\'诺兰给了我们一场无法盗取的梦。\']
忠犬八公的故事  |9.3  [\'永远都不能忘记你所爱的人。\']
机器人总动员   |9.3  [\'小瓦力,大人生。\']
三傻大闹宝莱坞  |9.2  [\'英俊版憨豆,高情商版谢耳朵。\']
    ..........(省略)
    ..........(省略)
[Finished in 1.0s]

总结

  • 多进程:计算密集型,需要充分使用服务器多核CPU资源,计算大量的并发请求时候,推荐 multiprocessing (多进程)
    缺陷:多个进程之间通信成本高,切换开销大,反正就是占用硬件资源高就对了。

  • 多线程\\协程:I/O密集型(网络I/O、磁盘I/O、数据库I/O、爬虫)比较合适多线程。推荐 threading.Thread、gevent、multiprocessing.dummy (多线程)
    缺陷:因 GIL锁挟持之下不能使用 CPU 多核心并行运算,也就是说你的代码永远只有一个核心在跑,不能做到高并行,但是可以做到高并发。
    解决方式:要么换编译器、要么换开发语言实现多线程

想要追求更有效率,多进程加异步速度会很快

下次更新各种细节与 Python 高级用法

 

以上是关于Python 多线程进程协程上手体验的主要内容,如果未能解决你的问题,请参考以下文章

Python爬虫案例演示:Python多线程多进程协程

多线程 多进程 协程 Queue(爬虫代码)

Python来实现并发的Web Server,其中采用了多进程多线程协程单进程单线程非阻塞的方式

python-协程多线程多进程性能比较

python之线程进程和协程

python-多线程+协程