Python多线程和多进程爬虫
Posted Harris-H
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多线程和多进程爬虫相关的知识,希望对你有一定的参考价值。
文章目录
Python多线程和多进程爬虫
1.多线程爬虫
使用threading的Thread 和queue进行爬虫
a.第一个实例(非爬虫)
from threading import Thread
import time
def coding():
for x in range(5):
print('%s正在写代码' % x)
time.sleep(1)
def drawing():
for x in range(5):
print('%s正在画图' % x)
time.sleep(1)
def single_thread():
coding()
drawing()
def multi_thread():
t1 = Thread(target=coding)
t2 = Thread(target=drawing)
t1.start()
t2.start()
if __name__ == '__main__':
multi_thread()
继承
自threading.Thread
类:
为了让线程代码更好的封装。可以使用threading
模块下的Thread
类,继承自这个类,然后重写 run
方法,线程就会自动运行run
方法中的代码。示例代码如下:
import threading
import time
class CodingThread(threading.Thread):
def run(self):
for x in range(3):
print('%s正在写代码' % threading.current_thread())
time.sleep(1)
class DrawingThread(threading.Thread):
def run(self):
for x in range(3):
print('%s正在画图' % threading.current_thread())
time.sleep(1)
def multi_thread():
t1 = CodingThread()
t2 = DrawingThread()
t1.start()
t2.start()
if __name__ == '__main__':
multi_thread()
锁机制
为了解决以上使用共享全局变量的问题。threading
提供了一个Lock
类,这个类可以在某个线程访问某个变量的时候加锁,其他线程此时就不能进来,直到当前线程处理完后,把锁释放了,其他线程才能进来处理。示例代码如下:
Lock(互斥)版本生产者和消费者模式
import threading
import random
import time
gMoney = 1000
gLock = threading.Lock()
# 记录生产者生产的次数,达到10次就不再生产
gTimes = 0
class Producer(threading.Thread):
def run(self):
global gMoney
global gLock
global gTimes
while True:
money = random.randint(100, 1000)
gLock.acquire()
# 如果已经达到10次了,就不再生产了
if gTimes >= 10:
gLock.release()
break
gMoney += money
print('%s当前存入%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
gTimes += 1
time.sleep(0.5)
gLock.release()
class Consumer(threading.Thread):
def run(self):
global gMoney
global gLock
global gTimes
while True:
money = random.randint(100, 500)
gLock.acquire()
if gMoney > money:
gMoney -= money
print('%s当前取出%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
time.sleep(0.5)
else:
# 如果钱不够了,有可能是已经超过了次数,这时候就判断一下
if gTimes >= 10:
gLock.release()
break
print("%s当前想取%s元钱,剩余%s元钱,不足!" % (threading.current_thread(), money, gMoney))
gLock.release()
def main():
for x in range(5):
Consumer(name='消费者线程%d' % x).start()
for x in range(5):
Producer(name='生产者线程%d' % x).start()
if __name__ == '__main__':
main()
Condition
版的生产者与消费者模式代码如下:
import threading
import random
import time
gMoney = 1000
gCondition = threading.Condition()
gTimes = 0
gTotalTimes = 5
class Producer(threading.Thread):
def run(self):
global gMoney
global gCondition
global gTimes
while True:
money = random.randint(100, 1000)
gCondition.acquire()
if gTimes >= gTotalTimes:
gCondition.release()
print('当前生产者总共生产了%s次'%gTimes)
break
gMoney += money
print('%s当前存入%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
gTimes += 1
time.sleep(0.5)
gCondition.notify_all()
gCondition.release()
class Consumer(threading.Thread):
def run(self):
global gMoney
global gCondition
while True:
money = random.randint(100, 500)
gCondition.acquire()
# 这里要给个while循环判断,因为等轮到这个线程的时候
# 条件有可能又不满足了
while gMoney < money:
if gTimes >= gTotalTimes:
gCondition.release()
return
print('%s准备取%s元钱,剩余%s元钱,不足!'%(threading.current_thread(),money,gMoney))
gCondition.wait()
gMoney -= money
print('%s当前取出%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
time.sleep(0.5)
gCondition.release()
def main():
for x in range(5):
Consumer(name='消费者线程%d'%x).start()
for x in range(2):
Producer(name='生产者线程%d'%x).start()
if __name__ == '__main__':
main()
b.第二个实例
import re
import time
import requests
import threading
from lxml import etree
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread
def run(in_q, out_q):
headers =
'Accept': '',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Cookie': '',
'DNT': '1',
'Host': 'www.g.com',
'Referer': '',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(Khtml, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
while in_q.empty() is not True:
data = requests.get(url=in_q.get(), headers=headers)
r = data.content
content = str(r, encoding='utf-8', errors='ignore')
soup = BeautifulSoup(content, 'html5lib')
fixed_html = soup.prettify()
html = etree.HTML(fixed_html)
nums = html.xpath('//div[@class="col-md-1"]//text()')
for num in nums:
num = re.findall('[0-9]', ''.join(num))
real_num = int(''.join(num))
out_q.put(str(threading.current_thread().getName()) + '-' + str(real_num))
in_q.task_done() # 发送一个任务完成得信号
if __name__ == '__main__':
start = time.time()
queue = Queue()
result_queue = Queue()
for i in range(1, 1001):
queue.put('http://www.g.com?page='+str(i))
print('queue 开始大小 %d' % queue.qsize())
for index in range(10):
thread = Thread(target=run, args=(queue, result_queue, ))
thread.daemon = True # 随主线程退出而退出
thread.start()
queue.join() # 队列消费完 线程结束 停止阻塞
end = time.time()
print('总耗时:%s' % (end - start))
print('queue 结束大小 %d' % queue.qsize())
print('result_queue 结束大小 %d' % result_queue.qsize())
c.第三个实例
import threading # 导入threading模块
from queue import Queue #导入queue模块
import time #导入time模块
# 爬取文章详情页
def get_detail_html(detail_url_list, id):
while True:
url = detail_url_list.get() #Queue队列的get方法用于从队列中提取元素
time.sleep(2) # 延时2s,模拟网络请求和爬取文章详情的过程
print("thread id: get url detail finished".format(id=id,url=url)) #打印线程id和被爬取了文章内容的url
# 爬取文章列表页
def get_detail_url(queue):
for i in range(10000):
time.sleep(1) # 延时1s,模拟比爬取文章详情要快
queue.put("http://testedu.com/id".format(id=i))#Queue队列的put方法用于向Queue队列中放置元素,由于Queue是先进先出队列,所以先被Put的URL也就会被先get出来。
print("get detail url id end".format(id=i))#打印出得到了哪些文章的url
#主函数
if __name__ == "__main__":
detail_url_queue = Queue(maxsize=1000) #用Queue构造一个大小为1000的线程安全的先进先出队列
# 先创造四个线程
thread = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) #A线程负责抓取列表url
html_thread= []
for i in range(3):
thread2 = threading.Thread(target=get_detail_html, args=(detail_url_queue,i))
html_thread.append(thread2)#B C D 线程抓取文章详情
start_time = time.time()
# 启动四个线程
thread.start()
for i in range(3):
html_thread[i].start()
# 等待所有线程结束,thread.join()函数代表子线程完成之前,其父进程一直处于阻塞状态。
thread.join()
for i in range(3):
html_thread[i].join()
print("last time: s".format(time.time()-start_time))#等ABCD四个线程都结束后,在主进程中计算总爬取时间。
d.生产者-消费者爬虫
# !usr/bin/env python 3.6
# -*- coding: utf-8 -*-
# Author: fcj
# Time: 2019-05-09
# Description: python 多线程-普通多线程-生产者消费者模型
import re
import time
import requests
import threading
from lxml import etree
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread
def producer(in_q): # 生产者
ready_list = []
while in_q.full() is False:
for i in range(1, 1001):
url = 'http://www.g.com/?page='+str(i)
if url not in ready_list:
ready_list.append(url)
in_q.put(url)
else:
continue
def consumer(in_q, out_q): # 消费者
headers =
'Accept': ‘',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Connection': 'keep-alive',
'Cookie': ',
'DNT': '1',
'Host': 'www..com',
'Referer': 'http://www.g.com',
'Upgrade-Insecure-Requests': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
while True:
data = requests.get(url=in_q.get(), headers=headers)
r = data.content
content = str(r, encoding='utf-8', errors='ignore')
soup = BeautifulSoup(content, 'html5lib')
fixed_html = soup.prettify()
html = etree.HTML(fixed_html)
nums = html.xpath('//div[@class="col-md-1"]//text()')
for num in nums:
num = re.findall('[0-9]', ''.join(num))
real_num = int(''.join(num))
out_q.put(str(threading.current_thread().getName()) + '-' + str(real_num))
in_q.task_done() # 通知生产者,队列已消化完
if __name__ == '__main__':
start = time.time()
queue = Queue(maxsize=10) # 设置队列最大空间为10
result_queue = Queue()
print('queue 开始大小 %d' % queue.qsize())
producer_thread = Thread(target=producer, args=(queue,))
producer_thread.daemon = True
producer_thread.start()
for index in range(10):
consumer_thread = Thread(target=consumer, args=(queue, result_queue, ))
consumer_thread.daemon = True
consumer_thread.start()
queue.join()
end = time.time()
print('总耗时:%s' % (end - start))
print('queue 结束大小 %d' % queue.qsize())
print('result_queue 结束大小 %d' % result_queue.qsize())
e.第四个实例
import threading
import requests
from lxml import etree
from urllib import request
import os
import re
from queue import Queue
class Producer(threading.Thread):
headers =
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'
def __init__(self,page_queue,img_queue,*args,**kwargs):
super(Producer, self).__init__(*args,**kwargs)
self.page_queue = page_queue
self.img_queue = img_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self,url):
response = requests.get(url,headersPython爬虫编程思想(142):多线程和多进程爬虫--多进程
Python爬虫编程思想(134):多线程和多进程爬虫--线程与进程详解
Python爬虫编程思想(134):多线程和多进程爬虫--线程与进程详解
Python爬虫编程思想(135):多线程和多进程爬虫--Python与线程