Day 18 18.1 并发爬虫之协程实现

Posted dream-ze

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day 18 18.1 并发爬虫之协程实现相关的知识,希望对你有一定的参考价值。

并发爬虫之协程实现

  • 协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

  • 协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

  • 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

【1】yield与协程


def foo():
    print("OK1")
    yield 100  # 切换: 保存/恢复的功能
    print("OK2")
    yield 1000


def bar():
    print("OK3")
    yield 200
    print("OK4")
    yield 2000


gen = foo()
ret = next(gen)    # gen.__next__()
print(ret)

gen2 = bar()
ret2 = next(gen2)  # gen.__next__()
print(ret2)

ret = next(gen)    # gen.__next__()
print(ret)

ret2 = next(gen2)  # gen.__next__()
print(ret2)

【2】asyncio模块

  • asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。

  • 为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

  • asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

  • asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

import asyncio


async def task(i):
    print(f"task i start")
    await asyncio.sleep(1)
    print(f"task i end")


# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • async: 定义一个方法(函数),这个方法在后面的调用中不会被立即执行而是返回一个协程对象;

  • coroutine: 协程对象,也可以将协程对象添加到时间循环中,它会被事件循环调用;

  • event_loop: 事件循环,相当于一个无限循环,可以把一些函数添加到这个事件中,函数不会立即执行, 而是满足某些条件的时候,函数就会被循环执行;

  • await: 用来挂起阻塞方法的执行;

  • task: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()创建。

import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print(\'任务等待: 秒\'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print(\'任务在秒后返回结束运行\'.format(i, n))
    return i + n


start_time = time.time()  # 开始时间

tasks = [asyncio.ensure_future(work(1, 1)),
         asyncio.ensure_future(work(2, 2)),
         asyncio.ensure_future(work(3, 3))]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

print(\'运行时间: \', time.time() - start_time)
for task in tasks:
    print(\'任务执行结果: \', task.result())
  • future: 代表以后执行或者没有执行的任务,实际上和task没有本质区别;这里就不做代码展示;

2.1 异步IO示例

  • run_until_complete():

    • 阻塞调用,直到协程运行结束才返回。参数是future,传入协程对象时内部会自动变为future
  • asyncio.sleep():

    • 模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程。

提示:若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环。

import asyncio

async def test():
    print(\'异步任务test!\')
c = test()  # 调用异步函数,得到协程对象coroutine--> c
loop = asyncio.get_event_loop()  # 创建事件循环
loop.run_until_complete(c)  # 把协程对象丢给循环,并执行异步函数内部代码

import asyncio


async def test():
    print(\'异步任务test\')

if __name__ == \'__main__\':
    cor_test = test()   # 协程对象
    loop = asyncio.get_event_loop()
    # 创建任务
    # task = asyncio.ensure_future(cor_test)
    task = loop.create_task(cor_test)
    # 把任务注册到事件循环上
    loop.run_until_complete(task)
import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print(\'任务等待: 秒\'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print(\'任务在秒后返回结束运行\'.format(i, n))
    return i + n


start_time = time.time()  # 开始时间

tasks = [asyncio.ensure_future(work(1, 1)),
         asyncio.ensure_future(work(2, 2)),
         asyncio.ensure_future(work(3, 3))]
loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

print(\'运行时间: \', time.time() - start_time)
for task in tasks:
    print(\'任务执行结果: \', task.result())

2.2 py3.8版本+

运行协程的三种基本方式
async.run() 运行协程
async.create_task()创建task
async.gather()获取返回值

# 用create_task()创建task

import asyncio, time

async def work(i, n):  # 使用async关键字定义异步函数
    print(\'任务等待: 秒\'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print(\'任务在秒后返回结束运行\'.format(i, n))
    return i + n


tasks = []
async def main():
    global tasks
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    await asyncio.wait(tasks) # 阻塞


start_time = time.time()  # 开始时间
asyncio.run(main())
print(\'运行时间: \', time.time() - start_time)
for task in tasks:
    print(\'任务执行结果: \', task.result())

PS:

必须先通过asyncio.create_task将task创建到event loop中,

再通过await等待,

如果直接用await等待则会导致异步变同步

asyncio.create_task() 函数在 Python 3.7 中被加入。

# 用gather()收集返回值


import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print(\'任务等待: 秒\'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print(\'任务在秒后返回结束运行\'.format(i, n))
    return i + n


tasks = []


async def main():
    global tasks
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    await asyncio.wait(tasks)
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])  # 将task作为参数传入gather,等异步任务都结束后返回结果列表
    print(response)


start_time = time.time()  # 开始时间

asyncio.run(main())

print(\'运行时间: \', time.time() - start_time)

【3】aiohttp

  • 我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。

    • 本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。
    • 安装方式,pip install aiohttp。
  • aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。

    • asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,
    • aiohttp就是基于asyncio实现的http框架。
import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://httpbin.org/headers") as response:
            print(await response.text())

asyncio.run(main())

【4】应用案例(异步爬虫)

  • 基于asyncio异步爬取斗图网的表情包
import requests
import os
import re
from lxml import etree
from fake_useragent import UserAgent
import time
import asyncio
import aiohttp

fake_ua = UserAgent()

headers = 
    "User-Agent": fake_ua.random



async def get_every_page_url():
    page_urls = []
    for i in range(1, 4):
        if i == 1:
            page_url = "https://www.pkdoutu.com/photo/list/"
            page_urls.append(page_url)
        else:
            page_url = f"https://www.pkdoutu.com/photo/list/?page=i"
            # https://www.pkdoutu.com/photo/list/?page=2
            page_urls.append(page_url)
    return page_urls


async def get_first_page(page_url):
    print(f\'------当前页的page_url是:page_url------\')
    filename = \'dtb\'
    if not os.path.exists(filename):
        os.mkdir(filename)
    async with aiohttp.ClientSession() as session:
        async with session.get(page_url, ssl=False, headers=headers) as response:
            # response.encoding = \'utf-8\'
            page_text = await response.content.read()
            tree = etree.HTML(page_text)
            # img_urls = tree.xpath(\'//li[@class="list-group-item"]/div/div/a/img[@data-backup]/@data-backup\')
            a_lists = tree.xpath(\'//*[@id="pic-detail"]/div/div[2]/div[2]/ul/li/div/div/a\')
            img_urls = []
            for a in a_lists:
                img_src = a.xpath(\'./img/@data-backup\')[0]
                img_urls.append(img_src)

    return img_urls


async def download_img(url):
    img_title = os.path.basename(url)
    filepath = os.path.join(\'dtb\', img_title)
    async with aiohttp.ClientSession() as session:
        async with session.get(url, ssl=False) as response:
            response.encoding = \'utf-8\'
            with open(filepath, \'wb\') as f:
                f.write(await response.content.read())
                print(f\'当前图片img_title下载完成\')


async def main():
    # async版本
    page_urls = await get_every_page_url()
    for page_url in page_urls:
        img_urls = await get_first_page(page_url)
        tasks = []
        for url in img_urls:
            t = asyncio.create_task(download_img(url))
            tasks.append(t)
        await asyncio.wait(tasks)


async def main1():
    start = time.time()
    page_urls = await get_every_page_url()
    for page_url in page_urls:
        img_urls = await get_first_page(page_url)
        print(img_urls)


if __name__ == \'__main__\':
    start = time.time()
    # asyncio.run(main1())

    asyncio.run(main())
    print(f\'当前总耗时为time.time() - starts\')
    # 当前总耗时为45.654601097106934s

以上是关于Day 18 18.1 并发爬虫之协程实现的主要内容,如果未能解决你的问题,请参考以下文章

python----单线程实现并发之协程

Python并发之协程

Python多任务实现 之协程并发下载多图片

并发之协程

python并发编程之协程

并发编程之协程