Python多线程和多进程爬虫

Posted Harris-H

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多线程和多进程爬虫相关的知识,希望对你有一定的参考价值。

文章目录

Python多线程和多进程爬虫

1.多线程爬虫

使用threading的Thread 和queue进行爬虫

a.第一个实例(非爬虫)

from threading import Thread
import time


def coding():
    for x in range(5):
        print('%s正在写代码' % x)
        time.sleep(1)


def drawing():
    for x in range(5):
        print('%s正在画图' % x)
        time.sleep(1)


def single_thread():
    coding()
    drawing()


def multi_thread():
    t1 = Thread(target=coding)
    t2 = Thread(target=drawing)

    t1.start()
    t2.start()


if __name__ == '__main__':
    multi_thread()


继承

threading.Thread类:

为了让线程代码更好的封装。可以使用threading模块下的Thread类,继承自这个类,然后重写 run方法,线程就会自动运行run方法中的代码。示例代码如下:

import threading
import time


class CodingThread(threading.Thread):
    def run(self):
        for x in range(3):
            print('%s正在写代码' % threading.current_thread())
            time.sleep(1)


class DrawingThread(threading.Thread):
    def run(self):
        for x in range(3):
            print('%s正在画图' % threading.current_thread())
            time.sleep(1)


def multi_thread():
    t1 = CodingThread()
    t2 = DrawingThread()

    t1.start()
    t2.start()


if __name__ == '__main__':
    multi_thread()


锁机制

为了解决以上使用共享全局变量的问题。threading提供了一个Lock类,这个类可以在某个线程访问某个变量的时候加锁,其他线程此时就不能进来,直到当前线程处理完后,把锁释放了,其他线程才能进来处理。示例代码如下:

Lock(互斥)版本生产者和消费者模式

import threading
import random
import time

gMoney = 1000
gLock = threading.Lock()
# 记录生产者生产的次数,达到10次就不再生产
gTimes = 0


class Producer(threading.Thread):
    def run(self):
        global gMoney
        global gLock
        global gTimes
        while True:
            money = random.randint(100, 1000)
            gLock.acquire()
            # 如果已经达到10次了,就不再生产了
            if gTimes >= 10:
                gLock.release()
                break
            gMoney += money
            print('%s当前存入%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
            gTimes += 1
            time.sleep(0.5)
            gLock.release()


class Consumer(threading.Thread):
    def run(self):
        global gMoney
        global gLock
        global gTimes
        while True:
            money = random.randint(100, 500)
            gLock.acquire()
            if gMoney > money:
                gMoney -= money
                print('%s当前取出%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
                time.sleep(0.5)
            else:
                # 如果钱不够了,有可能是已经超过了次数,这时候就判断一下
                if gTimes >= 10:
                    gLock.release()
                    break
                print("%s当前想取%s元钱,剩余%s元钱,不足!" % (threading.current_thread(), money, gMoney))
            gLock.release()


def main():
    for x in range(5):
        Consumer(name='消费者线程%d' % x).start()

    for x in range(5):
        Producer(name='生产者线程%d' % x).start()


if __name__ == '__main__':
    main()


Condition版的生产者与消费者模式代码如下:

    import threading
    import random
    import time
    
    gMoney = 1000
    gCondition = threading.Condition()
    gTimes = 0
    gTotalTimes = 5
    
    class Producer(threading.Thread):
        def run(self):
            global gMoney
            global gCondition
            global gTimes
            while True:
                money = random.randint(100, 1000)
                gCondition.acquire()
                if gTimes >= gTotalTimes:
                    gCondition.release()
                    print('当前生产者总共生产了%s次'%gTimes)
                    break
                gMoney += money
                print('%s当前存入%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
                gTimes += 1
                time.sleep(0.5)
                gCondition.notify_all()
                gCondition.release()
    
    class Consumer(threading.Thread):
        def run(self):
            global gMoney
            global gCondition
            while True:
                money = random.randint(100, 500)
                gCondition.acquire()
                # 这里要给个while循环判断,因为等轮到这个线程的时候
                # 条件有可能又不满足了
                while gMoney < money:
                    if gTimes >= gTotalTimes:
                        gCondition.release()
                        return
                    print('%s准备取%s元钱,剩余%s元钱,不足!'%(threading.current_thread(),money,gMoney))
                    gCondition.wait()
                gMoney -= money
                print('%s当前取出%s元钱,剩余%s元钱' % (threading.current_thread(), money, gMoney))
                time.sleep(0.5)
                gCondition.release()
    
    def main():
        for x in range(5):
            Consumer(name='消费者线程%d'%x).start()
    
        for x in range(2):
            Producer(name='生产者线程%d'%x).start()
    
    if __name__ == '__main__':
        main()
    

b.第二个实例

import re
import time
import requests
import threading
from lxml import etree
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread


def run(in_q, out_q):
    headers = 
        'Accept': '',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Connection': 'keep-alive',
        'Cookie': '',
        'DNT': '1',
        'Host': 'www.g.com',
        'Referer': '',
        'Upgrade-Insecure-Requests': '1',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                      '(Khtml, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
    
    while in_q.empty() is not True:
        data = requests.get(url=in_q.get(), headers=headers)
        r = data.content
        content = str(r, encoding='utf-8', errors='ignore')
        soup = BeautifulSoup(content, 'html5lib')
        fixed_html = soup.prettify()
        html = etree.HTML(fixed_html)
        nums = html.xpath('//div[@class="col-md-1"]//text()')
        for num in nums:
            num = re.findall('[0-9]', ''.join(num))
            real_num = int(''.join(num))
            out_q.put(str(threading.current_thread().getName()) + '-' + str(real_num))
        in_q.task_done() # 发送一个任务完成得信号


if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    result_queue = Queue()
    for i in range(1, 1001):
        queue.put('http://www.g.com?page='+str(i))
    print('queue 开始大小 %d' % queue.qsize())

    for index in range(10):
        thread = Thread(target=run, args=(queue, result_queue, ))
        thread.daemon = True  # 随主线程退出而退出
        thread.start()

    queue.join()  # 队列消费完 线程结束 停止阻塞
    end = time.time()
    print('总耗时:%s' % (end - start))
    print('queue 结束大小 %d' % queue.qsize())
    print('result_queue 结束大小 %d' % result_queue.qsize())

c.第三个实例

import threading # 导入threading模块
from queue import Queue #导入queue模块
import time  #导入time模块

# 爬取文章详情页
def get_detail_html(detail_url_list, id):
    while True:
        url = detail_url_list.get() #Queue队列的get方法用于从队列中提取元素
        time.sleep(2)  # 延时2s,模拟网络请求和爬取文章详情的过程
        print("thread id: get url detail finished".format(id=id,url=url)) #打印线程id和被爬取了文章内容的url

# 爬取文章列表页
def get_detail_url(queue):
    for i in range(10000):
        time.sleep(1) # 延时1s,模拟比爬取文章详情要快
        queue.put("http://testedu.com/id".format(id=i))#Queue队列的put方法用于向Queue队列中放置元素,由于Queue是先进先出队列,所以先被Put的URL也就会被先get出来。
        print("get detail url id end".format(id=i))#打印出得到了哪些文章的url

#主函数
if __name__ == "__main__":
    detail_url_queue = Queue(maxsize=1000) #用Queue构造一个大小为1000的线程安全的先进先出队列
    # 先创造四个线程
    thread = threading.Thread(target=get_detail_url, args=(detail_url_queue,)) #A线程负责抓取列表url
    html_thread= []
    for i in range(3):
        thread2 = threading.Thread(target=get_detail_html, args=(detail_url_queue,i))
        html_thread.append(thread2)#B C D 线程抓取文章详情
    start_time = time.time()
    # 启动四个线程
    thread.start()
    for i in range(3):
        html_thread[i].start()
    # 等待所有线程结束,thread.join()函数代表子线程完成之前,其父进程一直处于阻塞状态。
    thread.join()
    for i in range(3):
        html_thread[i].join()

    print("last time:  s".format(time.time()-start_time))#等ABCD四个线程都结束后,在主进程中计算总爬取时间。

d.生产者-消费者爬虫

# !usr/bin/env python 3.6
# -*- coding: utf-8 -*-
# Author: fcj
# Time: 2019-05-09
# Description: python 多线程-普通多线程-生产者消费者模型

import re
import time
import requests
import threading
from lxml import etree
from bs4 import BeautifulSoup
from queue import Queue
from threading import Thread


def producer(in_q):  # 生产者
    ready_list = []
    while in_q.full() is False:
        for i in range(1, 1001):
            url = 'http://www.g.com/?page='+str(i)
            if url not in ready_list:
                ready_list.append(url)
                in_q.put(url)
            else:
                continue


def consumer(in_q, out_q):  # 消费者
    headers = 
        'Accept': ‘',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Connection': 'keep-alive',
        'Cookie': ',
        'DNT': '1',
        'Host': 'www..com',
        'Referer': 'http://www.g.com',
        'Upgrade-Insecure-Requests': '1',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
                      '(KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36'
    
    while True:
        data = requests.get(url=in_q.get(), headers=headers)
        r = data.content
        content = str(r, encoding='utf-8', errors='ignore')
        soup = BeautifulSoup(content, 'html5lib')
        fixed_html = soup.prettify()
        html = etree.HTML(fixed_html)
        nums = html.xpath('//div[@class="col-md-1"]//text()')
        for num in nums:
            num = re.findall('[0-9]', ''.join(num))
            real_num = int(''.join(num))
            out_q.put(str(threading.current_thread().getName()) + '-' + str(real_num))
        in_q.task_done()  # 通知生产者,队列已消化完


if __name__ == '__main__':
    start = time.time()
    queue = Queue(maxsize=10)  # 设置队列最大空间为10
    result_queue = Queue()
    print('queue 开始大小 %d' % queue.qsize())

    producer_thread = Thread(target=producer, args=(queue,))
    producer_thread.daemon = True
    producer_thread.start()

    for index in range(10):
        consumer_thread = Thread(target=consumer, args=(queue, result_queue, ))
        consumer_thread.daemon = True
        consumer_thread.start()

    queue.join()
    end = time.time()
    print('总耗时:%s' % (end - start))
    print('queue 结束大小 %d' % queue.qsize())
    print('result_queue 结束大小 %d' % result_queue.qsize())

e.第四个实例

    import threading
    import requests
    from lxml import etree
    from urllib import request
    import os
    import re
    from queue import Queue
    
    class Producer(threading.Thread):
        headers = 
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'
        
        def __init__(self,page_queue,img_queue,*args,**kwargs):
            super(Producer, self).__init__(*args,**kwargs)
            self.page_queue = page_queue
            self.img_queue = img_queue
    
    
        def run(self):
            while True:
                if self.page_queue.empty():
                    break
                url = self.page_queue.get()
                self.parse_page(url)
    
        def parse_page(self,url):
            response = requests.get(url,headersPython爬虫编程思想(142):多线程和多进程爬虫--多进程

Python爬虫编程思想(134):多线程和多进程爬虫--线程与进程详解

Python爬虫编程思想(134):多线程和多进程爬虫--线程与进程详解

Python爬虫编程思想(135):多线程和多进程爬虫--Python与线程

Python爬虫编程思想(135):多线程和多进程爬虫--Python与线程

python 多进程和多线程配合