实现多线程爬取数据并保存到mongodb
Posted L某人
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实现多线程爬取数据并保存到mongodb相关的知识,希望对你有一定的参考价值。
多线程爬取二手房网页并将数据保存到mongodb的代码:
import pymongo import threading import time from lxml import etree import requests from queue import Queue index_url=‘https://m.lianjia.com/gz/ershoufang/pg{}/‘ detail_url=‘https://m.lianjia.com{}‘ # 设置爬取主页的页数 INDEX_PAGE_NUM=200 # 定义一个类 # 0定义主页url队列、主页html队列、详情页url队列、html队列、内容队列 # 1获取首页url并解析详情页url # 2获取详情页的内容 # 3保存内容 # 4设置多线程调用方法 # 设置mongodb client = pymongo.MongoClient(‘localhost‘) # 设置数据库名 db = client[‘ershoufang‘] # 指定集合名 index = ‘index_info‘ detail = ‘detail_info‘ class lianJia(): def __init__(self): self.index_url_queue=Queue() self.html_index_queue=Queue() self.index_content_queue=Queue() self.detail_content_queue = Queue() # 获取主页的url和html内容并解析出index页内容和详情页url def get_index(self): for i in range(INDEX_PAGE_NUM): # print(index_url.format(i+1)) url=index_url.format(i+1) self.index_url_queue.put(url) # index=requests.get(index_url.format(i+1)).content.decode() # self.html_index_queue.put(index) # 获取主页html def get_index_html(self): while True: url=self.index_url_queue.get() index = requests.get(url).content.decode() self.html_index_queue.put(index) self.index_url_queue.task_done() def parse_index(self): while True: # 获取队列里得内容 html1=self.html_index_queue.get() xml=etree.HTML(html1) pingjie_list=xml.xpath(‘‘‘//ul[@class=‘lists‘]/li[position()>1]‘‘‘) # 将 pingjie_list拼接在xpath前,少写xpath语句 index_content_list=[] for pj in pingjie_list: index_infor={} # #判空炒作,如果为空则显示none if len(index_infor[‘title‘]) > 0 else None index_infor[‘title‘]=pj.xpath(‘‘‘./div/div[@class=‘item_list‘]/div[1]/text()‘‘‘) index_infor[‘title‘]=index_infor[‘title‘][0] if len(index_infor[‘title‘]) > 0 else None index_infor[‘detail_url‘] = pj.xpath(‘‘‘./a/@href‘‘‘)[0] index_infor[‘index_detail‘]=pj.xpath(‘‘‘./div/div[2]/div[2]/text()‘‘‘) index_infor[‘index_detail‘]=index_infor[‘index_detail‘][0] if len(index_infor[‘index_detail‘])>0 else None index_infor[‘total_price‘]=pj.xpath(‘‘‘./div/div[2]/div[position()>2]/span[1]/em/text()‘‘‘) index_infor[‘total_price‘]= index_infor[‘total_price‘][0] if len( index_infor[‘total_price‘])>0 else None index_infor[‘average_price‘]=pj.xpath(‘‘‘./div/div[@class=‘item_list‘]/div[3]/span[2]/text()‘‘‘) index_infor[‘average_price‘]=index_infor[‘average_price‘][0]if len(index_infor[‘average_price‘])>0 else None index_content_list.append(index_infor) # 队列保存时不能在循环里 否之回保存很多个队列 # self.index_content_queue.put(index_content_list) # 把content_list放进content_queue里面 self.index_content_queue.put(index_content_list) # print(index_content_list) # 每从队列中获取一个数,队列则减少一个数,所以此代码必须写 self.html_index_queue.task_done() # 获取详情页内容 def get_detail(self): pass # 保存内容 def save_content(self): while True: index_conten_list=self.index_content_queue.get() for i in index_conten_list: # print(i[‘title‘]) if i[‘title‘]==None or i[‘total_price‘]==None or i[‘average_price‘]==None: print(‘该数据为空,不进行保存‘) else: db[‘index_info‘].insert(i) # db[‘detailDta‘].insert(detail_datas) print(‘保存数据成功‘) self.index_content_queue.task_done() # 主线程:分配各种子线程去执行class里得每一个函数 # 使用队列的方式得设置多线程进行调用函数,才能让程序执行速度更快 def run(self): # 设置线程列表 thread_list=[] # start_time=time.time() # 1.url_list # threading.Thread不需要传参数,参数都是从队列里面取得 # for i in range(20): t_index_u=threading.Thread(target=self.get_index) thread_list.append(t_index_u) # 2.遍历,发送请求,获取响应 for i in range(20): t_index_html=threading.Thread(target=self.get_index_html) thread_list.append(t_index_html) # 3.提取数据 for i in range(2): t_parse_index=threading.Thread(target=self.parse_index) thread_list.append(t_parse_index) # 4.保存数据 t_save=threading.Thread(target=self.save_content) thread_list.append(t_save) # 循环开启各子线程 for t in thread_list: # 表示主线程结束,子线程(设置为true无限循环)也跟着结束(用主线程控制子线程) t.setDaemon(True) # 启动线程 t.start() for q in [self.index_url_queue,self.html_index_queue,self.index_content_queue]: # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程 q.join() # end_time=time.time() # print(‘总耗时%.2f秒‘%(end_time-start_time)) if __name__==‘__main__‘: sk = time.clock() func=lianJia() func.run() ek = time.clock() print(‘程序总耗时:‘,ek-sk)
多线程爬取糗事百科:
# coding=utf-8 import requests from lxml import etree import threading from queue import Queue # https://docs.python.org/3/library/queue.html#module-queue # 队列使用方法简介 # q.qsize() 返回队列的大小 # q.empty() 如果队列为空,返回True,反之False # q.full() 如果队列满了,返回True,反之False # q.full 与 maxsize 大小对应 # q.get([block[, timeout]]) 获取队列,timeout等待时间 # q.get_nowait() 相当q.get(False) # q.put(item) 写入队列,timeout等待时间 # q.put_nowait(item) 相当q.put(item, False) # q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号 # q.join() 实际上意味着等到队列为空,再执行别的操作 class QiubaiSpdier: def __init__(self): self.url_temp = "https://www.qiushibaike.com/8hr/page/{}/" self.headers = {"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"} self.url_queue = Queue() self.html_queue = Queue() self.content_queue = Queue() def get_url_list(self): # return [self.url_temp.format(i) for i in range(1,14)] for i in range(1,14): # 把13个索引页面的Url放进url_queue队列里 self.url_queue.put(self.url_temp.format(i)) def parse_url(self): while True: # get方法和task_done搭配使用 # 在put是队列+1,get和task_done一起使用时队列才会-1 url = self.url_queue.get() print(url) response = requests.get(url,headers=self.headers) # 然后把索引页的响应页面放进html_queue队列里 self.html_queue.put(response.content.decode()) self.url_queue.task_done() def get_content_list(self): #提取数据 while True: # 先从索引页响应页面html_queue队列里面取出索引页面 html_str = self.html_queue.get() html = etree.HTML(html_str) div_list = html.xpath("//div[@id=‘content-left‘]/div") #分组 content_list = [] for div in div_list: item= {} item["content"] = div.xpath(".//div[@class=‘content‘]/span/text()") item["content"] = [i.replace(" ","") for i in item["content"]] item["author_gender"] = div.xpath(".//div[contains(@class,‘articleGender‘)]/@class") item["author_gender"] = item["author_gender"][0].split(" ")[-1].replace("Icon","") if len(item["author_gender"])>0 else None item["auhtor_age"] = div.xpath(".//div[contains(@class,‘articleGender‘)]/text()") item["auhtor_age"] = item["auhtor_age"][0] if len(item["auhtor_age"])>0 else None item["content_img"] = div.xpath(".//div[@class=‘thumb‘]/a/img/@src") item["content_img"] = "https:"+item["content_img"][0] if len(item["content_img"])>0 else None item["author_img"] = div.xpath(".//div[@class=‘author clearfix‘]//img/@src") item["author_img"] = "https:"+item["author_img"][0] if len(item["author_img"])>0 else None item["stats_vote"] = div.xpath(".//span[@class=‘stats-vote‘]/i/text()") item["stats_vote"] = item["stats_vote"][0] if len(item["stats_vote"])>0 else None content_list.append(item) # 把content_list放进content_queue里面 self.content_queue.put(content_list) self.html_queue.task_done() def save_content_list(self): #保存 while True: content_list = self.content_queue.get() for i in content_list: print(i) pass self.content_queue.task_done() def run(self): #实现主要逻辑 thread_list = [] #1.url_list # threading.Thread不需要传参数,参数都是从队列里面取得 t_url = threading.Thread(target=self.get_url_list) thread_list.append(t_url) #2.遍历,发送请求,获取响应 for i in range(20): # 添加20个线程 t_parse = threading.Thread(target=self.parse_url) thread_list.append(t_parse) #3.提取数据 for i in range(2): # 添加2个线程 t_html = threading.Thread(target=self.get_content_list) thread_list.append(t_html) #4.保存 t_save = threading.Thread(target=self.save_content_list) thread_list.append(t_save) for t in thread_list: t.setDaemon(True) #把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束) t.start() for q in [self.url_queue,self.html_queue,self.content_queue]: q.join() #让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程 print("主线程结束") if __name__ == ‘__main__‘: qiubai = QiubaiSpdier() qiubai.run() # 所没有tast_done方法,程序最终会卡着不动,无法终止 # 线程的设计注意:耗时的操作要分配一些线程
以上是关于实现多线程爬取数据并保存到mongodb的主要内容,如果未能解决你的问题,请参考以下文章