爬虫基础
Posted pythoncui
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了爬虫基础相关的知识,希望对你有一定的参考价值。
模块:
requests:伪造浏览器发起Http请求
bs4:将html格式的字符串解析成对象, 对象.find / find_all
示例1:爬取汽车之家新闻 (什么都不需要)
import requests from bs4 import BeautifulSoup response = requests.get(‘https://www.autohome.com.cn/news/‘) response.encoding = ‘gbk‘ # print(response.text) soup = BeautifulSoup(response.text, ‘html.parser‘) div = soup.find(name=‘div‘, attrs={‘id‘: ‘auto-channel-lazyload-article‘}) li_list = div.find_all(name=‘li‘) for li in li_list: title = li.find(name=‘h3‘) if not title: continue p = li.find(name=‘p‘) a = li.find(name=‘a‘) print(title.text) print(‘https:‘+a.attrs.get(‘href‘)) print(p.text) img = li.find(name=‘img‘) src = ‘https:‘+img.get(‘src‘) print(‘图片地址:‘, src) # 再次发起请求,下载图片 file_name = src.rsplit(‘/‘, maxsplit=1)[1] ret = requests.get(src) with open(file_name, ‘wb‘) as f: f.write(ret.content)
Java/C#: - 接口:约束子类中必须包含某个方法 Interface IMessage: def func1(self): pass def func2(self): pass class Msg(IMessage): # 只能继承 def func1(self): print(‘func1‘) def func2(self): print(‘func2‘) - 抽象方法/抽象类,约束子类中必须包含某个方法 class abstract IMessage: def abstract func1(self): pass def abstract func2(self): pass def func3(self): print(‘func3‘) class Msg(IMessage): def func1(self): # 继承 print(‘func1‘) def func2(self): print(func3()) # 可调用非抽象类 Python: 接口:无 抽象方法/抽象类(有,ABC) 类继承+异常 class IMessage(object): def func1(self): raise NotImplemented(‘子类没有实现func1方法‘) class Msg(IMessage): def func1(self): print(‘123‘) obj = Msg() obj.func1() 有什么用? 用于告知其他人以后继承时候,需要实现那个方法,如: class BaseAuthentication: def authenticate(self, request): raise NotImplementedError(".authenticate() must be overridden.") def authenticate_header(self, request): pass 以后自己开发时,如何使用? class BaseMessage(object): def send(self): raise NotImplemented(‘必须实现send方法‘) class Msg(BaseMessage): def send(self): # 必须重写send print(‘发生短信‘) class WeChat(BaseMessage): def send(self): print(‘发生微信‘)
示例2:爬抽屉新热榜
- 带请求头 - 带cookie - 登录: - 获取cookie - 登录:携带cookie做授权 - 带cookie访问
代码:
import requests # 1.查看首页 r1 = requests.get( url=‘https://dig.chouti.com/‘, headers={ ‘User-Agent‘:‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36‘ } ) print(r1.cookies.get_dict()) # 2.提交用户名密码 r2 = requests.post( url=‘https://dig.chouti.com/login‘, headers={ ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36‘ }, data={ ‘phone‘: ‘ ‘, ‘password‘: ‘ ‘, ‘loginType‘: 2 }, cookies=r1.cookies.get_dict() ) print(r2.cookies.get_dict()) # r2的cookie是为了混淆用户 # 3. 点赞 r3 = requests.post( url=‘https://dig.chouti.com/link/vote?linksId=213513‘, headers={ ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36‘ }, cookies=r1.cookies.get_dict() )
示例三:爬取github (存在cookie和隐藏的token)
- 带请求头 - 带cookie - 请求体中: commit: Sign in authenticity_token: fwExDGfDocMu618vzVz/BShvnGl38bJyFUHDRYc5QwIHFcbceG3aUZP2Dpi1jfbiNXftayc66WmekTtPzBqaEQ== ga_id: 80462516.1585963792 login: ax password: ax
代码:
import requests from bs4 import BeautifulSoup ################# 示例三:自动登陆github ################ # 1. GET形式 访问登陆页面 ‘‘‘ 去HTML中找隐藏的Input标签获取csrf token 获取cookie ‘‘‘ i1 = requests.get(‘https://github.com/login‘) soup1 = BeautifulSoup(i1.text, ‘html.parser‘) tag = soup1.find(name=‘input‘, attrs={‘name‘: ‘authenticity_token‘}) authenticity_token = tag.get(‘value‘) c1 = i1.cookies.get_dict() # 2.POST 用户名和密码 ‘‘‘ 发送数据: csrf 用户名 密码 携带cookie ‘‘‘ form_data={ ‘authenticity_token‘: authenticity_token, ‘utf8‘:‘‘, ‘commit‘: ‘Sign in‘, ‘login‘: ‘1013570964@qq.com‘, ‘password‘: ‘cuike1219‘ } i2 = requests.post(‘https://github.com/session‘, data=form_data, cookies=c1) c2 = i2.cookies.get_dict() c1.update(c2) # 3. GET 访问主页地址 ‘‘‘ 携带cookie ‘‘‘ i3 = requests.get( url=‘https://github.com/settings/repositories‘, cookies=c1, headers={ ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36‘ }) soup3 = BeautifulSoup(i3.text, ‘html.parser‘) list_gruop = soup3.find_all(name=‘div‘, attrs={‘class‘:‘Box-row simple public js-collab-repo‘}) # print(list_gruop) for child in list_gruop: a = child.find(name=‘a‘) print(a.text) print(a.attrs.get(‘href‘))
示例四:登陆拉勾网
- 密码如果加密 - 找js,通过python实现加密方式 - 找密文,直接用密文登录 - Referer头;上一次请求地址,可以用来做防盗链
代码:
import requests, re r1 = requests.get(‘https://passport.lagou.com/login/login.html‘, headers={ ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36‘, }, ) X_Anti_Forge_Token = re.findall("X_Anti_Forge_Token = ‘(.*?)‘", r1.text, re.S)[0] X_Anti_Forge_Code = re.findall("X_Anti_Forge_Code = ‘(.*?)‘", r1.text, re.S)[0] print(X_Anti_Forge_Token, X_Anti_Forge_Code) # print(r1.text) r2 = requests.post( url=‘https://passport.lagou.com/login/login.json‘, headers={ ‘Referer‘: ‘https://passport.lagou.com/login/login.html‘, # 上一次的请求是什么 ‘User-Agent‘: ‘Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36‘, ‘X-Anit-Forge-Code‘: X_Anti_Forge_Code, ‘X-Anit-Forge-Token‘: X_Anti_Forge_Token, ‘X-Requested-With‘: ‘XMLHttpRequest‘ }, data={ "isValidate": True, ‘username‘: ‘17715689746‘, ‘password‘: ‘ab18d270d7126ea65915c50288c22c12‘, ‘request_form_verifyCode‘: ‘‘, ‘submit‘: ‘‘ }, cookies=r1.cookies.get_dict() ) print(r2.text)
- 方法 requests.get requests.post requests.put requests.delete ... requests.request(method=‘POST‘) - 参数 2.1 url 2.2 headers 2.3 cookies 2.4 params 2.5 data 传请求体 request.post( ..., data={ ‘user‘:‘xx‘, ‘pwd‘:‘s123‘ } ) GET/index http1.1 host:c1.com user=xx&pwd=s123 2.6 json 传请求体 request.post( ..., json={ ‘user‘:‘xx‘, ‘pwd‘:‘s123‘ } ) GET/index http1.1 host:c1.com Content-Type:application/json {‘user‘:‘xx‘,‘pwd‘:‘s123‘} ------------------------------------------------------------------------------------------ 2.7 代理 proxies # 无验证 proxies_dict = { "http": "61.172.249.96:80", "https": "http://61.185.219.126:3128", } ret=requests.get("http://www.proxy360.cn/Proxy", proxies=proxies_dict) # 待验证的代理 from requests.auth import HTTPProxyAuth proxyDict = { ‘http‘: ‘77.75.105.165‘, ‘https‘: ‘77.75.105.165‘ } auth = HTTPProxyAuth(‘username‘, ‘mypassword‘) r = requests.get("http://www.google.com", data={‘xx‘,‘ff‘},proxies=proxyDict, auth=auth) print(r.text) ------------------------------------------------------------------------------------------ 2.8 上传文件 files # 发送文件 file_dict = { ‘f1‘: open(‘text.txt‘, ‘rb‘) } requests.request( method=‘POST‘, url=‘http://127.0.0.1:8000/test/‘, files=file_dict ) ------------------------------------------------------------------------------------------ 2.9 认证 auth 内部: 将用户和密码加密,放在请求头中传给后台 - ‘用户名:密码‘ - base64(‘用户名:密码‘) - "Basic base64(‘用户名:密码‘)" - 请求头 Authorization: "basic base64(‘用户名:密码‘)" from requests.auth import HTTPBasicAuth, HTTPDigestAuth ret = requests.get(‘https://api.github.com/user‘, auth=HTTPBasicAuth(‘ale‘, ‘sdfasdfasdf‘)) print(ret.text) ------------------------------------------------------------------------------------------ 2.10 超时时间 timeout ret = requests.get(‘http://google.com/‘, timeout=1) print(ret) ret = requests.get(‘http://google.com/‘, timeout=(5, 1)) print(ret) ------------------------------------------------------------------------------------------ 2.11 允许重定向 allow_redirects ret = requests.get(‘http://127.0.0.1:8000/test/‘, allow_redirects=False) print(ret.text) ------------------------------------------------------------------------------------------ 2.12 大文件下载 stream from contextlib import closing with closing(requests.get(‘http://httpbin.org/get‘, stream=True)) as r: # 在此处理响应。 for i in r.iter_content(): print(i) ------------------------------------------------------------------------------------------ 注: - session session = requests.Session() # 用session时候不用传cookie,session中会自动携带 session.get() session.post()
长轮询:投票
app.py: from flask import Flask, render_template, request, jsonify, session import queue import uuid app = Flask(__name__) app.secret_key = "sadxasxc" USERS = { ‘1‘: {‘name‘:‘alex‘, ‘count‘:1}, ‘2‘: {‘name‘: ‘blex‘, ‘count‘: 0}, ‘3‘: {‘name‘: ‘clex‘, ‘count‘: 0} } # 为每个用户建立一个q对象 # 以用户的uuid为key 值为q对象 Q_DICT = {} @app.route("/") def index(): user_uuid = str(uuid.uuid4()) session["user_uuid"] = user_uuid Q_DICT[user_uuid] = queue.Queue() return render_template("user_list.html", users=USERS) @app.route("/vote", methods=["POST"]) def vote(): # 投票 循环q对象的dict 给每个q对象返回值 uid = request.form.get("uid") USERS[uid]["count"] += 1 for q in Q_DICT.values(): q.put(USERS) return "投票成功" @app.route("/get/vote", methods=["GET",]) def get_vote(): # 获取投票结果 去自己的q对象里取值 没有夯住 知道有或者超时返回 user_uuid = session.get("user_uuid") q = Q_DICT[user_uuid] ret = {‘status‘: True, ‘data‘: None} try: users = q.get(timeout=30) ret[‘data‘] = users except queue.Empty: ret[‘status‘] = False return jsonify(ret) if __name__ == ‘__main__‘: app.run() user_list.html: <!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> <style> li{ cursor: pointer; } </style> </head> <body> <ul id="userList"> {% for key,val in users.items() %} <li uid="{{key}}">{{val.name}}({{val.count}})</li> {% endfor %} </ul> <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script> <script> $(function () { $(‘#userList‘).on(‘click‘, ‘li‘, function () { var uid = $(this).attr(‘uid‘); $.ajax({ url:‘/vote‘, type:‘POST‘, data:{uid:uid}, success:function (arg) { console.log(arg); } }) }); get_vote(); }); // 获取投票信息 function get_vote() { $.ajax({ url:‘/get/vote‘, type: ‘GET‘, dataType:‘JSON‘, success:function (arg) { if(arg.status){ $(‘#userList‘).empty(); $.each(arg.data,function (k,v) { // console.log(k,v); var li = document.createElement(‘li‘); li.setAttribute(‘uid‘,k); li.innerText= v.name+"(" + v.count + ")"; $(‘#userList‘).append(li); }) } get_vote(); } }) } </script> </body> </html>
基于Flask开发Web微信
前端: import json from bs4 import BeautifulSoup from flask import Flask,render_template,session,jsonify,request import time, re import requests app = Flask(__name__) app.secret_key=‘134‘ def xml_parse(arg): soup= BeautifulSoup(arg, ‘html.parser‘) tag_list = soup.find(name=‘error‘).find_all() res = {} for tag in tag_list: res[tag.name] = tag.text return res # 获取二维码 @app.route(‘/login‘) def login(): ctime = int(time.time()*1000) qcode_url = ‘https://login.wx.qq.com/jslogin?appid=wx782c26e4c19acffb&redirect_uri=https%3A%2F%2Fwx.qq.com%2Fcgi-bin%2Fmmwebwx-bin%2Fwebwxnewloginpage&fun=new&lang=zh_CN&_={0}‘.format(ctime) rep = requests.get( url=qcode_url ) # print(rep.text) # window.QRLogin.code = 200; window.QRLogin.uuid = "Qfq90M3SgA=="; qcode = re.findall(‘uuid = "(.*)";‘, rep.text)[0] session[‘qcode‘] = qcode return render_template(‘login.html‘, qcode=qcode) # 检查登录状态 @app.route(‘/check/login‘) def check_log(): result = {‘code‘: 408} qcode = session[‘qcode‘] ctime = int(time.time()*1000) check_log_url = ‘https://login.wx.qq.com/cgi-bin/mmwebwx-bin/login?loginicon=true&uuid={0}&tip=0&r=-1380761436&_={1}‘.format(qcode,ctime) rep = requests.get(url=check_log_url) if ‘window.code=408‘ in rep.text: # 用户未扫描 result[‘code‘] = 408 elif ‘window.code=201‘ in rep.text: # 用户扫码,获取图像 result[‘code‘] = 201 result[‘avatar‘] = re.findall("window.userAvatar = ‘(.*)‘;", rep.text)[0] elif ‘window.code=200‘ in rep.text: # 用户确认登录 result[‘code‘] = 200 redirect_uri = re.findall(‘window.redirect_uri="(.*)";‘, rep.text)[0] redirect_uri = redirect_uri+"&fun=new&version=v2" ru = requests.get(url=redirect_uri) print(ru.text) ticket_dict = xml_parse(ru.text) session[‘ticket_dict‘] = ticket_dict session[‘ticket_cookies‘] = ru.cookies.get_dict() return jsonify(result) # 初始化信息 @app.route(‘/index‘) def index(): pass_ticket = session[‘ticket_dict‘][‘pass_ticket‘] init_url = "https://login.wx.qq.com/cgi-bin/mmwebwx-bin/webwxinit?r=-979112921&lang=zh_CN&pass_ticket={0}".format(pass_ticket) rep = requests.post( url=init_url, json={ ‘BaseRequest‘:{ ‘DeviceID‘:‘e700290354098676‘, ‘Sid‘:session[‘ticket_dict‘][‘wxsid‘], ‘Skey‘:session[‘ticket_dict‘][‘skey‘], ‘Uin‘:session[‘ticket_dict‘][‘wxuin‘] } } ) rep.encoding = ‘utf-8‘ init_user_dict = rep.json() print(init_user_dict) return render_template(‘index.html‘,init_user_dict=init_user_dict) # 获取所有联系人的列表 @app.route(‘/contact_list‘) def contact_list(): ‘‘‘ 获取所有联系人的列表 :return: ‘‘‘ ctime = int(time.time()*1000) pass_ticket = session[‘ticket_dict‘][‘pass_ticket‘] skey = session[‘ticket_dict‘][‘skey‘] contact_url = ‘https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxgetcontact?lang=zh_CN&pass_ticket={0}&r={1}&seq=0&skey={2}‘.format(pass_ticket, ctime, skey) res = requests.get( url=contact_url, cookies=session[‘ticket_cookies‘] ) res.encoding = ‘utf-8‘ user_list = res.json() return render_template(‘contact_list‘,user_list=user_list) # 获得用户图像 @app.route(‘/get_img‘) def get_img(): prev = request.args.get(‘prev‘) username = request.args.get(‘username‘) skey = request.args.get(‘skey‘) haed_img_url = "https://wx.qq.com{0}&username={1}&skey={2}".format(prev,username,skey) rep = requests.get( url= haed_img_url, cookies=session[‘ticket_cookies‘] ) return rep.content # 发送消息 @app.route(‘/send/msg‘, methods=[‘GET‘,‘POST‘]) def send_msg(): if request.method==‘GET‘: return render_template(‘send_msg.html‘) from_user = request.form.get(‘fromUser‘) to_user = request.form.get(‘toUser‘) content = request.form.get(‘content‘) ctime = int(time.time()*1000) data_dict = { ‘BaseRequest‘:{ ‘DeviceID‘:‘e700290354098676‘, ‘Sid‘:session[‘ticket_dict‘][‘wxsid‘], ‘Skey‘:session[‘ticket_dict‘][‘skey‘], ‘Uin‘:session[‘ticket_dict‘][‘wxuin‘] }, ‘Msg‘:{ ‘ClientMsgId‘:ctime, ‘Content‘:content, ‘FromUserName‘:from_user, ‘LocalID‘: ctime, ‘ToUserName‘:to_user, ‘Type‘:1 }, ‘Scene‘:0 } msg_url = ‘https://wx.qq.com/cgi-bin/mmwebwx-bin/webwxsendmsg?lang=zh_CN&pass_ticket={0}‘.format(session[‘ticket_dict‘][‘pass_ticket‘]) # 数据类型是json格式 rep = requests.post( url=msg_url, data=bytes(json.dumps(data_dict, ensure_ascii=False), encoding=‘utf-8‘) ) print(rep) return ‘发送成功‘ if __name__==‘__main__‘: app.run()
后端:
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <div style="width: 200px; margin: 0 auto;"> <h1 style="text-align: center">扫码登录</h1> <img id=‘userAvatar‘ style=‘width: 200px;height: 200px;‘ src="https://login.weixin.qq.com/qrcode/{{qcode}}" alt=""> </div> <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.js"></script> <script> $(function () { checkLogin(); }); function checkLogin() { $.ajax({ url:‘/check/login‘, method:‘GET‘, dataType:‘JSON‘, success:function (arg) { if(arg.code===408){ checkLogin(); }else if(arg.code===201){ $(‘#userAvatar‘).attr(‘src‘,arg.avatar); checkLogin(); }else if(arg.code===200){ location.href=‘/index‘ } } }) } </script> </body> </html>
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <h1>欢迎使用web微信:{{init_user_dict.User.NickName}}</h1> <h3>最近联系人</h3> <ul> {% for row in init_user_dict.ContactList %} <li>{{row.NickName}}</li> {% endfor %} <li><a href="/contact_list"></a>查看所有人联系人</li> </ul> <h3>最近公众号</h3> {% for item in init_user_dict.MPSubscribeMsgList %} <div> <h3>{{item.NickName}}</h3> <ul> {% for msg in item.MPArticleList %} <li><a href="{{msg.Url}}"></a>{{msg.Title}}</li> {% endfor %} </ul> </div> {% endfor %} </body> </html>
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <h1>联系人列表</h1> <ul> {% for user in user_list.MemberList %} <li> <img style="height: 50px; width: 50px" src="/get_img?prev={{user.HeadImgUrl}}" alt=""> <span>用户名:{{user.NickName}}唯一标识:{{user.UserName}}</span> </li> {% endfor %} </ul> </body> </html>
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>Title</title> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <from method="post"> <input type="text" name="fromUser"> <input type="text" name="toUser"> <input type="content" name="content"> <input type="submit" value="发送"> </from> </body> </html>
以上是关于爬虫基础的主要内容,如果未能解决你的问题,请参考以下文章