雪球数据的爬取
Posted kend
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了雪球数据的爬取相关的知识,希望对你有一定的参考价值。
import requests from lxml import etree import json import pymongo
# 连接mongodb 数据库 存mongodb中 client = pymongo.MongoClient(‘127.0.0.1‘, port=27017) db = client.xueqiu collection = db.xueqiu
url = ‘https://xueqiu.com/‘ headers = "User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/73.0.3683.86 Safari/537.36" session = requests.Session() session.get(url=url,headers=headers) def get_page_list(): url = ‘https://xueqiu.com/v4/statuses/public_timeline_by_category.json?since_id=-1&max_id=-1&count=10&category=-1‘ params = "since_id": "-1", "max_id": "-1", "count": "10", "category": "-1" response = session.get(url=url,headers=headers,params=params) page_text = response.json() content = page_text["list"] info_dict = for x in content: per_info = x["data"] #json 格式 per_info = json.loads(per_info) # print(per_info) id = per_info["id"] title = per_info["title"] description = per_info["description"] target = per_info["target"] detail_url = "https://xueqiu.com"+target info_dict[‘id‘]=id info_dict[‘title‘] = title info_dict[‘detail_url‘]=detail_url parse_detail(detail_url) # break def parse_detail(url): response = session.get(url=url, headers=headers,) page_text = response.text tree = etree.HTML(page_text) title = tree.xpath(‘//div[@class="container article__container"]//h1[@class="article__bd__title"]/text()‘) print(title) print("=="*20) data_dict = data_dict["title"] = title p_list = tree.xpath(‘//div[@class="article__bd__detail"]/p‘) content_list = [] for p in p_list: content = p.xpath(‘./text()|./b/text()‘) content = "".join(content).strip() # print(content) if len(content)>0: content_list.append(content) content_str = "".join(content_list) data_dict["content"] = content_str # print(data_dict) collection.insert([data_dict]) def main(): get_page_list() if __name__ == ‘__main__‘: main()
优化成redis增量式获取数据
import requests from lxml import etree import json from redis import Redis import pymongo import time import datetime client = pymongo.MongoClient(‘127.0.0.1‘, port=27017) db = client.xueqiu collection = db.xueqiu conn = Redis(host=‘127.0.0.1‘,port=6379) url = ‘https://xueqiu.com/‘ headers = "User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36" session = requests.Session() session.get(url=url,headers=headers) def get_page_list(): url = ‘https://xueqiu.com/v4/statuses/public_timeline_by_category.json?since_id=-1&max_id=-1&count=10&category=-1‘ params = "since_id": "-1", "max_id": "-1", "count": "10", # 获取10条信息 "category": "-1" response = session.get(url=url,headers=headers,params=params) page_text = response.json() content = page_text["list"] info_dict = for x in content: per_info = x["data"] #json 格式 per_info = json.loads(per_info) # print(per_info) id = per_info["id"] title = per_info["title"] description = per_info["description"] target = per_info["target"] detail_url = "https://xueqiu.com"+target ex = conn.sadd(‘news_urls‘, detail_url) if ex == 0: print(‘暂无最新数据可爬取......‘) else: print(‘有最新数据的更新......‘) info_dict[‘id‘]=id info_dict[‘title‘] = title info_dict[‘detail_url‘]=detail_url parse_detail(detail_url) # break def parse_detail(url): response = session.get(url=url, headers=headers,) page_text = response.text tree = etree.HTML(page_text) title = tree.xpath(‘//div[@class="container article__container"]//h1[@class="article__bd__title"]/text()‘) print(title) print("=="*20) data_dict = data_dict["title"] = title p_list = tree.xpath(‘//div[@class="article__bd__detail"]/p‘) content_list = [] for p in p_list: content = p.xpath(‘./text()|./b/text()‘) content = "".join(content).strip() # print(content) if len(content)>0: content_list.append(content) content_str = "".join(content_list) data_dict["content"] = content_str # print(data_dict) collection.insert([data_dict]) def main(): flag = 0 now = datetime.datetime.now() sched_time = datetime.datetime(now.year, now.month, now.day, now.hour, now.minute, now.second) + datetime.timedelta( seconds=6) while True: now = datetime.datetime.now() # 设置爬取时间, 多久爬一次 if sched_time < now: time.sleep(300) print(now) get_page_list() flag = 1 else: if flag == 1: sched_time = sched_time+datetime.timedelta(minutes=1) flag = 0 if __name__ == ‘__main__‘: main()
以上是关于雪球数据的爬取的主要内容,如果未能解决你的问题,请参考以下文章