饮冰三年-人工智能-Python-38 爬虫之并发
Posted 逍遥小天狼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了饮冰三年-人工智能-Python-38 爬虫之并发相关的知识,希望对你有一定的参考价值。
一、多线程实现并发
from concurrent.futures import ThreadPoolExecutor import requests import time # 把大象放冰箱一共分几步 #1:找一个冰箱 导入并创建线程池(ThreadPoolExecutor) #2:大象放进去 pool.submit(task, url) #3:关门 pool.shutdown(wait=True) def task(url): # 创建get请求 response = requests.get(url) print(url, response) # 创建一个线程池 pool = ThreadPoolExecutor(7) url_list = [ \'http://www.cnblogs.com/wupeiqi\', \'http://huaban.com/favorite/beauty/\', \'http://www.bing.com\', \'http://www.zhihu.com\', \'http://www.sina.com\', \'http://www.baidu.com\', \'http://www.autohome.com.cn\', ] for url in url_list: pool.submit(task, url) pool.shutdown(wait=True)
# 并发--线程 from concurrent.futures import ThreadPoolExecutor import requests import time # 放好大象你说一声啊 #1:找一个冰箱 导入并创建线程池(ThreadPoolExecutor) #2:大象放进去 pool.submit(task, url) #3:关门 pool.shutdown(wait=True) def task(url): response=requests.get(url) return response def done(future,*args,**kwargs): response= future.result() print(response.status_code,response.content) pool = ThreadPoolExecutor(7) url_list = [ \'http://www.cnblogs.com/wupeiqi\', \'http://huaban.com/favorite/beauty/\', \'http://www.bing.com\', \'http://www.zhihu.com\', \'http://www.sina.com\', \'http://www.baidu.com\', \'http://www.autohome.com.cn\', ] for url in url_list: v = pool.submit(task,url) v.add_done_callback(done) pool.shutdown(wait=True)
二、多进程并发和多线程并发代码类似,只是引入的包不同
from concurrent.futures import ProcessPoolExecutor import requests import time def task(url): response = requests.get(url) print(url,response) # 写正则表达式 if __name__ == "__main__": pool = ProcessPoolExecutor(7) url_list = [ \'http://www.cnblogs.com/wupeiqi\', \'http://huaban.com/favorite/beauty/\', \'http://www.bing.com\', \'http://www.zhihu.com\', \'http://www.sina.com\', \'http://www.baidu.com\', \'http://www.autohome.com.cn\', ] for url in url_list: pool.submit(task, url) pool.shutdown(wait=True)
from concurrent.futures import ProcessPoolExecutor import requests import time def task(url): response = requests.get(url) return response def done(future,*args,**kwargs): response = future.result() print(response.status_code,response.content) if __name__ == "__main__": pool = ProcessPoolExecutor(7) url_list = [ \'http://www.cnblogs.com/wupeiqi\', \'http://huaban.com/favorite/beauty/\', \'http://www.bing.com\', \'http://www.zhihu.com\', \'http://www.sina.com\', \'http://www.baidu.com\', \'http://www.autohome.com.cn\', ] for url in url_list: v=pool.submit(task, url) v.add_done_callback(done) pool.shutdown(wait=True)
三、协程 + 异步IO
需要安装一下包 pip install aiohttp、requests、greenlet、gevent、grequests
import asyncio @asyncio.coroutine def task(): print(\'before...task...\') yield from asyncio.sleep(5) #仅支持TCP获取结果,大象内部携程并发 print(\'end...task...\') tasks=[task(),task()] loop = asyncio.get_event_loop() #打开冰箱门 loop.run_until_complete(asyncio.gather(*tasks)) #大象放进去 loop.close() #关门
import asyncio @asyncio.coroutine def task(host,url=\'/\'): print(\'before...task...\',host,url) reader,writer= yield from asyncio.open_connection(host,80) #仅支持TCP获取结果,大象内部携程并发 request_header_content = "GET %s HTTP/1.0\\r\\nHost: %s\\r\\n\\r\\n" % (url, host,) request_header_content = bytes(request_header_content, encoding=\'utf-8\') writer.write(request_header_content) yield from writer.drain() text = yield from reader.read() print(\'end\', host, url, text) writer.close() tasks=[task(\'www.cnblogs.com\', \'/Yk2012/\'), task(\'www.baidu.com\')] loop = asyncio.get_event_loop() #打开冰箱门 loop.run_until_complete(asyncio.gather(*tasks)) #大象放进去 loop.close() #关门
import aiohttp import asyncio @asyncio.coroutine def fetch_async(url): print(url) response = yield from aiohttp.request(\'GET\', url) print(url, response) response.close() tasks = [fetch_async(\'http://www.baidu.com/\'), fetch_async(\'http://www.chouti.com/\')] event_loop = asyncio.get_event_loop() results = event_loop.run_until_complete(asyncio.gather(*tasks)) event_loop.close()
import asyncio import requests @asyncio.coroutine def task(func, *args): print(func,args) loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) # requests.get(\'http://www.cnblogs.com/wupeiqi/\') response = yield from future print(response.url, response.content) tasks = [ task(requests.get, \'http://www.cnblogs.com/wupeiqi/\'), task(requests.get, \'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091\') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
import grequests import grequests request_list = [ grequests.get(\'https://baidu.com/\'), ] # ##### 执行并获取响应列表 ##### response_list = grequests.map(request_list,size=5) print(response_list)
四、自定义异步IO
import socket import select sk = socket.socket() #1、连接 sk.connect({\'www.baidu.com\',80,}) #IO阻塞 #2、发送消息 sk.send(b\'GET / HTTP/1.0\\r\\nHost:www.baidu.com\\r\\n\\r\\n\') #3、等待服务端响应 data = sk.recv(8096) #IO阻塞 #4、关闭连接 sk.close()
import socket import select class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b\'\\r\\n\\r\\n\', 1) self.body = body header_list = headers.split(b\'\\r\\n\') for h in header_list: h_str = str(h,encoding=\'utf-8\') v = h_str.split(\':\',1) if len(v) == 2: self.header_dict[v[0]] = v[1] class HttpRequest: def __init__(self,socket,host,callback): self.socket = socket self.host=host self.callback = callback #会掉函数 def fileno(self): \'\'\'有了fileno方法,那么这个对象也可以select了\'\'\' return self.socket.fileno() class AsyncRequest: def __init__(self): self.conn = [] # 用于检测消息是否接收完成 self.connection = [] # 用于检测是否已经连接成功 def add_request(self,host,callback): try: sk = socket.socket() #创建sk, sk.setblocking(0) #关闭阻塞 sk.connect((host,80,)) #sk连接 except BlockingIOError as e: pass # 创建一个Request对象(sk, host) request = HttpRequest(sk,host,callback) # 把这个对象放到两个属性中 self.conn.append(request) self.connection.append(request) def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) for w in wlist: print(w.host, \'连接成功...\') # 只要能循环到,表示socket和服务器端已经连接成功 发送消息 tpl = "GET / HTTP/1.0\\r\\nHost:%s\\r\\n\\r\\n" %(w.host,) w.socket.send(bytes(tpl,encoding=\'utf-8\')) self.connection.remove(w) #链接成功后,从connection中删除 for r in rlist: recv_data = bytes() #为了防止数据丢失,需要while True一直接收 while True: try : chunck = r.socket.recv(8096) recv_data += chunck # 接收消息 except Exception as e: break # 接收完消息,赋给HttpResponse resonse = HttpResponse(recv_data) r.callback(resonse) #接收完响应,调用回调函数 r.socket.close() #r接收完响应,关闭请求。 self.conn.remove(r) #接收完响应,从conn中移除 # 数据接收完成后,conn为空。结束循环 if len(self.conn) == 0: break def f1(response): print(\'保存到文件\',response.header_dict) def f2(response): print(\'保存到数据库\', response.header_dict) url_list = [ {\'host\':\'www.baidu.com\',\'callback\': f1}, {\'host\':\'cn.bing.com\',\'callback\': f2}, {\'host\':\'www.cnblogs.com\',\'callback\': f2}, ] req = AsyncRequest() for item in url_list: req.add_request(item[\'host\'],item[\'callback\']) req.run()
以上是关于饮冰三年-人工智能-Python-38 爬虫之并发的主要内容,如果未能解决你的问题,请参考以下文章
饮冰三年-人工智能-Python-17Python基础之模块与包
饮冰三年-人工智能-Python-10之C#与Python关系
饮冰三年-人工智能-Python-46 Python之 初识ECMAScript6