Spider理论系列--协程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spider理论系列--协程相关的知识,希望对你有一定的参考价值。
一、协程
概念
- 协程
又称微线程(纤程),是一种用户态的轻量级线程 - 子程序
在所有的语言中都是层级调用的,比如A中调用B,B在执行过程中调用C,C执行完返回,B执行完返回,最后是A执行完毕。这是通过栈实现的,一个函数就是一个执行的子程序,子程序的调用总是有一个入口、一次返回,调用的顺序是明确的 - 理解协程
普通理解:线程是系统级别的,它们是由操作系统调度。协程是程序级别,由程序员根据需求自己调度。我们把一个线程中的一个个函数称为子程序,那么一个子程序在执行的过程中可以中断去执行别的子程序,这就是协程。也就是说同一个线程下的一段代码1执行执行着就中断,然后去执行另一段代码2,当再次回来执行代码1时,接着从之前的中断的位置继续向下执行 - 优点
a、最大的优势就是协程极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
b、不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。 - 缺点
a、无法利用多核CPU,协程的本质是单个线程,它不能同时将多个CPU的多个核心使用上,失去了标准线程使用多CPU的能力。
b、进行阻塞操作(操作IO)会阻塞整个程序
二、同步与异步
1、同步与异步的概念
- 前言
python由于GIL(全局锁)的存在,不能发挥多核的优势,其性能一直饱受诟病。然而在IO密集型的网络编程里,异步处理比同步处理能提升成百上千倍的效率
IO密集型就是磁盘的读取数据和输出数据非常大的时候就是属于IO密集型 由于IO操作的运行时间远远大于cpu、内存运行时间,所以任务的大部分时间都是在等待IO操作完成,IO的特点是cpu消耗小,所以,IO任务越多,cpu效率越高,当然不是越多越好,有一个极限值。 - 同步
指完成事务的逻辑,先执行第一个事务,如果阻塞了,会一直等待,直到这个事务完成,再执行第二个事务,顺序执行 - 异步
是和同步相对的,异步是指在处理调用这个事务的之后,不会等待这个事务的处理结果,直接处理第二个事务去了,通过状态、通知、回调来通知调用者处理结果
2、同步与异步代码
- 同步
import time
def run(index):
print("lucky is a good man", index)
time.sleep(2)
print("lucky is a nice man", index)
for i in range(1, 5):
run(i)
- 异步
import time
import asyncio
async def run(i):
print("Lee is a good man", i)
# 模拟一个耗时IO
await asyncio.sleep(2)
print("Lee is a nice man", i)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = []
t1 = time.time()
for url in range(1, 5):
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
tasks.append(task)
loop.run_until_complete(asyncio.wait(tasks))
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
三、asyncio模块
1、概述
- asyncio模块
是python3.4版本引入的标准库,直接内置了对异步IO的操作 - 编程模式
是一个消息循环,我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO - 说明
到目前为止实现协程的不仅仅只有asyncio,tornado和gevent都实现了类似功能 - 关键字的说明
关键字 | 说明 |
event_loop | 消息循环,程序开启一个无限循环,把一些函数注册到事件循环上,当满足事件发生的时候,调用相应的协程函数 |
coroutine | 协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用 |
task | 任务,一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含了任务的各种状态 |
async/await | python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口 |
2、asyncio基本使用
- 定义一个协程
import asyncio
import time
# 通过async关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中
async def run(x):
print("waiting:%d"%x)
await asyncio.sleep(x)
print("结束run")
#得到一个协程对象
coroutine = run(2)
asyncio.run(coroutine)
- 等同于
import asyncio
import time
# 通过async关键字定义了一个协程,协程是不能直接运行的,需要将协程放到消息循环中
async def run(x):
print("waiting:%d"%x)
await asyncio.sleep(x)
print("结束run")
#得到一个协程对象
coroutine = run(2)
创建一个消息循环
loop = asyncio.get_event_loop()
#将协程对象加入到消息循环 loop.run_until_complete(coroutine)
import asyncio
import time
async def run(x):
print("waiting:%d"%x)
await asyncio.sleep(x)
print("结束run")
coroutine = run(2)
#创建任务
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
# 将任务加入到消息循环
loop.run_until_complete(task)
- 阻塞和await
async可以定义协程,使用await可以针对耗时操作进行挂起,就与生成器的yield一样,函数交出控制权。协程遇到await,消息循环会挂起该协程,执行别的协程,直到其他协程也会挂起或者执行完毕,在进行下一次执行 - 获取返回值
import time
import asyncio
async def run(url):
print("开始向%s要数据……"%(url))
# 向百度要数据,网络IO
await asyncio.sleep(5)
data = "%s的数据"%(url)
print("给你数据")
return data
# 定义一个回调函数
def call_back(future):
print("call_back:", future.result())
coroutine = run("百度")
# 创建一个任务对象
task = asyncio.ensure_future(coroutine)
# 给任务添加回调,在任务结束后调用回调函数
task.add_done_callback(call_back)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
3、多任务
- 同步
同时请求"百度", "阿里", "腾讯", "新浪"四个网站,假设响应时长均为2秒
import time
def run(url):
print("开始向%s要数据……"%(url))
# 向百度要数据,网络IO
time.sleep(2)
data = "%s的数据"%(url)
return data
if __name__ == "__main__":
t1 = time.time()
for url in ["百度", "阿里", "腾讯", "新浪"]:
print(run(url))
t2 = time.time()
print("总耗时:%.2f"%(t2-t1))
- 异步
同时请求"百度", "阿里", "腾讯", "新浪"四个网站,假设响应时长均为2秒
使用ensure_future创建多任务
在这里的call_back函数,其实博主自己在开始看到这个的时候是很突兀的,看到这里有点懵,回调函数就是我们把一个函数作为参数传入到python本身的回调函数的调用中,也就是说,我们想使用任务的回调函数,其实也就是自定义事件
import time
import asyncio
async def run(url):
print("开始向%s要数据……"%(url))
await asyncio.sleep(2)
data = "%s的数据"%(url)
return data
def call_back(future):
print("call_back:", future.result())
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = []
t1 = time.time()
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
# 同时添加4个异步任务
# asyncio.wait(tasks) 将任务的列表又变成 <coroutine object wait at 0x7f80f43408c0>
loop.run_until_complete(asyncio.wait(tasks))
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
- 封装成异步函数
import time
import asyncio
async def run(url):
print("开始向%s要数据……" % (url))
await asyncio.sleep(2)
data = "%s的数据" % (url)
return data
def call_back(future):
print("call_back:", future.result())
async def main():
tasks = []
t1 = time.time()
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = asyncio.ensure_future(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
# 同时添加4个异步任务
await asyncio.wait(tasks)
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
使用loop.create_task创建多任务
import time
import asyncio
async def run(url):
print("开始向%s要数据……" % (url))
await asyncio.sleep(2)
data = "%s的数据" % (url)
return data
def call_back(future):
print("call_back:", future.result())
if name == "main":
loop = asyncio.get_event_loop()
tasks = []
t1 = time.time()
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
# 同时添加4个异步任务
loop.run_until_complete(asyncio.wait(tasks))
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
- 封装成异步函数
import time
import asyncio
async def run(url):
print("开始向%s要数据……" % (url))
await asyncio.sleep(2)
data = "%s的数据" % (url)
return data
def call_back(future):
print("call_back:", future.result())
async def main():
tasks = []
t1 = time.time()
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = loop.create_task(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
# 同时添加4个异步任务
await asyncio.wait(tasks)
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
if __name__ == "__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
使用asyncio.create_task创建多任务
import time
import asyncio
async def run(url):
print("开始向%s要数据……" % (url))
await asyncio.sleep(2)
data = "%s的数据" % (url)
return data
def call_back(future):
print("call_back:", future.result())
async def main():
tasks = []
t1 = time.time()
for url in ["百度", "阿里", "腾讯", "新浪"]:
coroutine = run(url)
task = asyncio.create_task(coroutine)
task.add_done_callback(call_back)
tasks.append(task)
# 同时添加4个异步任务
await asyncio.wait(tasks)
t2 = time.time()
print("总耗时:%.2f" % (t2 - t1))
if __name__ == "__main__":
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
4、Task 概念及用法
- Task,是 python 中与事件循环进行交互的一种主要方式。
创建 Task,意思就是把协程封装成 Task 实例,并追踪协程的 运行 / 完成状态,用于未来获取协程的结果。 - Task 核心作用: 在事件循环中添加多个并发任务;
具体来说,是通过 asyncio.create_task() 创建 Task,让协程对象加入事件循环中,等待被调度执行。
注意:Python 3.7 以后的版本支持 asyncio.create_task() ,在此之前的写法为 loop.create_task() ,开发过程中需要注意代码写 法对不同版本 python 的兼容性。 - 需要指出的是,协程封装为 Task 后不会立马启动,当某个代码 await 这个 Task 的时候才会被执行。
当多个 Task 被加入一个 task_list 的时候,添加 Task 的过程中 Task 不会执行,必须要用 await asyncio.wait()
或 await asyncio.gather()
将 Task 对象加入事件循环中异步执行。 - 一般在开发中,常用的写法是这样的:
-- 先创建 task_list 空列表; -- 然后用 asyncio.create_task() 创建 Task;
-- 再把 Task 对象加入 task_list ;
-- 最后使用 await asyncio.wait 或 await asyncio.gather 将 Task 对象加入事件循环中异步执行。
注意: 创建 Task 对象时,除了可以使用 asyncio.create_task() 之外,还可以用最低层级的 loop.create_task() 或 asyncio.ensure_future() ,他们都可以用来创建 Task 对象,其中关于 ensure_future 相关内容本文接下来会一起讲。 - Task 简单用法
import asyncio
async def func():
print(1)
await asyncio.sleep(2)
print(2)
return "test"
async def main():
print("main start")
# python 3.7及以上版本的写法
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())
# python3.7以前的写法
# task1 = asyncio.ensure_future(func())
# task2 = asyncio.ensure_future(func())
print("main end")
ret1 = await task1
ret2 = await task2
print(ret1, ret2)
# python3.7以后的写法
asyncio.run(main())
# python3.7以前的写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
"""
在创建task的时候,就将创建好的task添加到了时间循环当中,所以说必须得有时间循环,才可以创建task,否则会报错
"""
- task用法实例
import asyncio
import arrow
def current_time():
获取当前时间
:return:
cur_time = arrow.now().to(Asia/Shanghai).format(YYYY-MM-DD HH:mm:ss)
return cur_time
async def func(sleep_time):
func_name_suffix = sleep_time # 使用 sleep_time (函数 I/O 等待时长)作为函数名后缀,以区分任务对象
print(f"[current_time()] 执行异步函数 func.__name__-func_name_suffix")
await asyncio.sleep(sleep_time)
print(f"[current_time()]函数func.name-func_name_suffix 执行完毕")
return f"【[current_time()] 得到函数 func.name-func_name_suffix 执行结果】"
async def run():
task_list = []
for i in range(5):
task = asyncio.create_task(func(i))
task_list.append(task)
done, pending = await asyncio.wait(task_list)
for done_task in done:
print((f"[current_time()]得到执行结果 done_task.result()"))
def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
if name == main:
main()
- 代码执行结果如下:
D:\\Tools\\Tool\\python\\python.exe D:/Tools/Workspace/PyWorkspace/dateProject/demo07-协程/task.py
[2023-02-06 12:05:49] 执行异步函数 func-0
[2023-02-06 12:05:49]函数func-0 执行完毕
[2023-02-06 12:05:49]得到执行结果 【[2023-02-06 12:05:49] 得到函数 func-0 执行结果】
[2023-02-06 12:05:49] 执行异步函数 func-1
[2023-02-06 12:05:50]函数func-1 执行完毕
[2023-02-06 12:05:50]得到执行结果 【[2023-02-06 12:05:50] 得到函数 func-1 执行结果】
[2023-02-06 12:05:50]得到执行结果 【[2023-02-06 12:05:49] 得到函数 func-0 执行结果】
[2023-02-06 12:05:50] 执行异步函数 func-2
[2023-02-06 12:05:52]函数func-2 执行完毕
[2023-02-06 12:05:52]得到执行结果 【[2023-02-06 12:05:52] 得到函数 func-2 执行结果】
[2023-02-06 12:05:52]得到执行结果 【[2023-02-06 12:05:50] 得到函数 func-1 执行结果】
[2023-02-06 12:05:52]得到执行结果 【[2023-02-06 12:05:49] 得到函数 func-0 执行结果】
[2023-02-06 12:05:52] 执行异步函数 func-3
[2023-02-06 12:05:55]函数func-3 执行完毕
[2023-02-06 12:05:55]得到执行结果 【[2023-02-06 12:05:52] 得到函数 func-2 执行结果】
[2023-02-06 12:05:55]得到执行结果 【[2023-02-06 12:05:50] 得到函数 func-1 执行结果】
[2023-02-06 12:05:55]得到执行结果 【[2023-02-06 12:05:55] 得到函数 func-3 执行结果】
[2023-02-06 12:05:55]得到执行结果 【[2023-02-06 12:05:49] 得到函数 func-0 执行结果】
[2023-02-06 12:05:55] 执行异步函数 func-4
[2023-02-06 12:05:59]函数func-4 执行完毕
[2023-02-06 12:05:59]得到执行结果 【[2023-02-06 12:05:50] 得到函数 func-1 执行结果】
[2023-02-06 12:05:59]得到执行结果 【[2023-02-06 12:05:59] 得到函数 func-4 执行结果】
[2023-02-06 12:05:59]得到执行结果 【[2023-02-06 12:05:49] 得到函数 func-0 执行结果】
[2023-02-06 12:05:59]得到执行结果 【[2023-02-06 12:0鬼吹灯主要是以协程为主来爬取小说得章节内容,协程爬取不懂得小伙伴可以先关注我一手,后续会整理理论的知识放在专栏里
整体思路
- 得到鬼吹灯页面的源码
- 解析源码得到每一个章节的url
- 得到书名,这个书名通过切片得到
- 通过url得到一个页面的内容
- 使用并发执行多个任务下载
代码实现
导包
import asyncio
import os
from aiohttp import ClientSession
import requests
import aiofiles
from bs4 import BeautifulSoup得到页面源码的方法
参数是传入的url
返回出页面的源码
def get_page_source(url):
"""
获取页面源码的方法
:param url: 传入的url
:return: 返回的是页面的源码
"""
headers=
user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36 Edg/109.0.1518.78
res = requests.get(url,headers=headers)
data = res.content.decode()
return data解析页面得到url
在这里,我使用了集合来存储每一个章节的url,使用xpath来得到章节url,我个人是比较喜欢使用xpath,在这里给出另一种写法,使用的是的beautifulSoup
在页面F12查看,我们找到的是div下的ul下的li下的a标签的属性href
写法一:使用xpath
def parse_page_source(html):
"""
对页面进行解析,得到我们每一个章节的url
:param html: 传入的页面源码
:return: 返回的是一个带有所有章节url的集合
"""
book_list=[]
tree = etree.HTML(html)
mulu_list = tree.xpath(//div[@class="mulu-list quanji"])
for mulu in mulu_list:
# 抓取整个页面下章节的url
a = mulu.xpath(./ul/li/a/@href)
# 注意这里一定要在for循环里添加集合
book_list.append(a)
return book_list写法二:bs4
def parse_page_source(html):
"""
对页面进行解析,得到我们每一个章节的url
:param html: 传入的页面源码
:return: 返回的是一个带有所有章节url的集合
"""
book_list = []
soup = BeautifulSoup(html, html.parser)
a_list = soup.find_all(div, attrs=class: mulu-list quanji)
for a in a_list:
a_list = a.find_all(a)
for href in a_list:
chapter_url = href[href]
book_list.append(chapter_url)
return book_list得到和章节名
通过传入的章节url,进行切片以下面一个链接为例
https://www.51shucheng.net/daomu/guichuideng/jingjuegucheng/2464.html
我们按照/切分就有了[https:, , www.51shucheng.net, daomu, guichuideng, jingjuegucheng, 2464.html] 然后我们取倒数第二个元素就有了b_name
def get_book_name(chapter_url):
"""
得到名称,为了后续下载好分辨
:param chapter_url:
:return:
"""
book_chapter_name = chapter_url.split(/)[-2]
return book_chapter_name下载一个章节的内容
使用xpath来写
async def aio_download_one_content(chapter_url, single):
"""
下载一个章节内容
:param chapter_url: 传入得到的章节url
:param single: 使用async with single就是10个并发
:return:
"""
c_name = get_book_name(chapter_url)
for i in range(10):
try:
async with single:
async with ClientSession() as session:
async with session.get(chapter_url) as res:
# 得到章节内容的页面的源码
page_source = await res.content.read()
tree = etree.HTML(page_source)
# 章节名称
base_title = tree.xpath(//h1/text())[0]
if(:in base_title):
number = base_title.split(:)[0]
con = base_title.split(:)[1]
title = number+con
else:
title=base_title
# 章节内容
content = tree.xpath(//div[@class="neirong"]/p/text())
chapter_content = \\n.join(content)
if not os.path.exists(fbook_name/c_name):
os.makedirs(fbook_name/c_name)
async with aiofiles.open(fbook_name/c_name/title.txt, mode="w",
encoding=utf-8) as f:
await f.write(chapter_content)
print(chapter_url, "下载完毕!")
return ""
except Exception as e:
print(e)
print(chapter_url, "下载失败!, 重新下载. ")
return chapter_url这段代码单独拿出来是因为有的章节名称是这样的<<第19章 : 考古队>>,这样的数据是不对的,放在文件里无法命名,这就导致了后续能写入文件的只有章节名没有:的内容,所以我对第一次筛选出的数据进行切片,如果遇到:就把前面和后面的数据切出来在组合,如果没遇到就让一个新的变量来接收base_title
# 章节名称
base_title = tree.xpath(//h1/text())[0]
if(:in base_title):
number = base_title.split(:)[0]
con = base_title.split(:)[1]
title = number+con
else:
title=base_title使用bs4来写
async def aio_download_one(chapter_url, signal):
"""
下载一个章节内容
:param chapter_url: 传入得到的章节url
:param single: 使用async with single就是10个并发
:return:
"""
c_name = get_book_name(chapter_url)
for c in range(10):
try:
async with signal:
async with aiohttp.ClientSession() as session:
async with session.get(chapter_url) as resp:
# 得到章节内容的页面的源码
page_source = await resp.text()
soup = BeautifulSoup(page_source, html.parser)
# 章节名称
base_title = soup.find(h1).text
if (: in base_title):
number = base_title.split(:)[0]
con = base_title.split(:)[1]
title = number + con
else:
title = base_title
# 章节内容
p_content = soup.find(div, attrs=class: neirong).find_all(p)
content = [p.text + \\n for p in p_content]
chapter_content = \\n.join(content)
if not os.path.exists(fbook_name/c_name):
os.makedirs(fbook_name/c_name)
async with aiofiles.open(fbook_name/c_name/title.txt, mode="w",
encoding=utf-8) as f:
await f.write(chapter_content)
print(chapter_url, "下载完毕!")
return ""
except Exception as e:
print(e)
print(chapter_url, "下载失败!, 重新下载. ")
return chapter_url协程下载
async def aio_download(url_list):
# 创建一个任务列表
tasks = []
# 设置最多10个任务并行运作
semaphore = asyncio.Semaphore(10)
for h in url_list:
tasks.append(asyncio.create_task(aio_download_one_content(h, semaphore)))
await asyncio.wait(tasks)主函数运行
主函数运行就没什么可说的了,这里注意一点就是最后不要loop.close(),这样的话会导致你还没有爬取完数据,loop.close()就会关闭,情况如下,还剩一点就爬完了,结果报错了
if __name__ == __main__:
url = https://www.51shucheng.net/daomu/guichuideng
book_name = 鬼吹灯
if not os.path.exists(book_name):
os.makedirs(book_name)
source = get_page_source(url)
href_list = parse_page_source(source)
loop = asyncio.get_event_loop()
loop.run_until_complete(aio_download(href_list))完成效果图
我就不一一截图了
总结
为什么我在这里比对了xpath和bs4两种代码,小伙伴可以仔细看一下,在xpath中,我想拿到数据,找到它,大量的使用了//这种,这样的话就会从源码内全局检索,这就导致了我想爬取文章内容会很慢,有些时候还会超时导致报错.所以我们使用xpath的时候,想让他的速度提高,最好有一个指定的点 到了再//
可以理解为下面这种情况
def getList():
url = https://www.xiurenba.cc/XiuRen/
response = requests.get(url, headers=headers)
data = response.c
ontent.decode()
# print(data)
tree = etree.HTML(data)
li_list = tree.xpath(//ul[@class="update_area_lists cl"]/li)
return li_listdef get_single_url():
li_list = getList()
for li in li_list:
single_url = https://www.xiurenba.cc + li.xpath(./a/@href)[0]还有就是遇到了特殊符号要把它干掉,或者替换掉,这样就可以正常爬取数据
如果有小伙伴想要直接拿取源码的话,可以顺着代码实现一步步粘贴过去
以上是关于Spider理论系列--协程的主要内容,如果未能解决你的问题,请参考以下文章
Spider理论系列-requests模块的Cookie使用
机器学习|数学基础Mathematics for Machine Learning系列之矩阵理论(16):向量和矩阵的极限