多核执行Scrapy

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多核执行Scrapy相关的知识,希望对你有一定的参考价值。

嗨,您好。目前我正在构建一个运行速度不快的Web scraper。我能以某种方式管理我的蜘蛛使用其他CPU核心或多个相同的蜘蛛并行运行吗?

BricoMarcheSpider

# -*- coding: utf-8 -*-
import scrapy
import csv
from scrapy import FormRequest
from scrapy import Request
from scrapy.loader import ItemLoader
from bricomarche.items import Product
from datetime import date
from scrapy.loader.processors import TakeFirst

CATEGORIES = ['http://www.bricomarche.com/l/nos-produits/bricolage/outillage-et-equipement-de-l-atelier/outillage-electroportatif/perceuse-sans-fil-visseuse-accessoire-87.html?limit=90&p=1&solr_is_local=1', 'http://www.bricomarche.com/l/nos-produits/bricolage/outillage-et-equipement-de-l-atelier/outillage-electroportatif/perceuse-perforateur-et-marteau-piqueur-88.html?limit=90&p=1&solr_is_local=1', 'http://www.bricomarche.com/l/nos-produits/bricolage/outillage-et-equipement-de-l-atelier/outillage-electroportatif/meuleuse-rainureuse-accessoire-85.html?limit=90&p=1&solr_is_local=1']

class BricoMarcheSpider(scrapy.Spider):
name = 'brico_marche'

def start_requests(self):
    # full path 
    with open('file.csv') as csvfile:
        reader = csv.DictReader(csvfile)
        for i, row in enumerate(reader):
            magasin_id = row['Id']
            if row['Id'][0] == '0':
                magasin_id = row['Id'][1:]
            formdata = {'city' : row['City'], 'market' : row['Brand'], 'idPdv' : magasin_id}
            yield FormRequest(url='http://www.bricomarche.com/bma_popin/Geolocalisation/choisirMagasin', formdata=formdata, dont_filter=True, callback=self.parse, meta={'cookiejar': i})

def parse(self, response):
    for url in CATEGORIES:
        yield Request(url=url, dont_filter=True, callback=self.parse_category, meta={'cookiejar': response.meta['cookiejar']})

def parse_category(self, response):
    pos = response.xpath('//div[@class="store-details"]/p/strong/text()').extract_first()
    if pos:
        for url in response.xpath('//a[@class="view-product"]/@href').extract():
            yield Request(url=url, dont_filter=True, callback=self.parse_product, meta={'cookiejar': response.meta['cookiejar'], 'pos' : pos.strip()})
        next_page = response.xpath('//a[@title="Suivant"]/@href').extract_first()
        if next_page is not None:
            yield Request(url=next_page, callback=self.parse_category, dont_filter=True, meta={'cookiejar':response.meta['cookiejar'], 'pos' : pos.strip()})

def parse_product(self, response):
    l = ItemLoader(item=Product(), response=response)
    l.default_output_processor = TakeFirst()

    l.add_value('id_source', 'BRMRCH_FR')
    l.add_value('extract_date', str(date.today()))
    l.add_value('pos_name', response.meta['pos'])
    l.add_xpath('brand_seller', '//td[@itemprop="brand"]/text()')
    l.add_xpath('price_vat', '//span[contains(@class,"new-price")]/text()')
    categories = response.xpath('//li[@itemprop="itemListElement"]//span[@itemprop="name"]/text()').extract()
    # setting categories and family
    # check with category which has fewer categories to verify values
    try:
        l.add_value('prod_name', categories[-1])
        l.add_value('prod_family', categories[-2])
        l.add_value('prod_category1', categories[0])
        l.add_value('prod_category2', categories[1])
        l.add_value('prod_category3', categories[2])
        l.add_value('prod_category4', categories[3])
    except:
        pass
    l.add_xpath('sku_seller', '//div[@class="content-fiche-produit"]/ul/li/p/text()')
    # Réserver en magasin
    existing_stock = response.xpath('//script[contains(text(),"STOCK_PDV")]').extract()
    # Produit disponible en magasin text
    product_available =response.xpath('//span[@class="product_avaliable"]').extract()
    if existing_stock:
        l.add_value('inventory', existing_stock)
        l.add_value('available_yn', '1')
    if product_available:
        l.add_value('available_yn', '1')
        l.add_value('inventory', response.xpath('//div[@class="bg-white"]/p/text()').extract_first())
    else:
        l.add_value('available_yn', '0')
        l.add_xpath('available_pos_status', '//div[@class="fiche-items"]/div/p/text()')
        l.add_xpath('available_pos_date', '//div[@class="fiche-items"]/div/p/text()')

    return l.load_item()

基本上这是我的蜘蛛。在file.csv,大约有450行。如果我必须刮100个产品,我的要求是~450 x 100 = 45 000 GET请求。 POST请求用于cookie。每个项目都添加到我的数据库中。在我的settings.py中,我使用DOWNLOAD_DELAY=00.5,其他参数默认使用。当我尝试使用AutoThrottle时,它的时间增加了三倍。我测试的一些信息:

  • AutoThrottle的8个并发请求 - 1000个产品的82分钟
  • AutoThrottle同时请求16个 - 1000个产品73.5分钟
  • 没有AutoThrottle的16个并发请求 - 1000个产品的22.4分钟
答案

实现这一目标的最佳方法是使用scrapyd

Distributed crawls文档中的大多数建议也可以应用于在单个计算机上运行,​​除非您在同一个scrapyd服务器上多次运行您的蜘蛛。

如果你想通过许多机器运行单个(大)蜘蛛,你通常做的是分区网址以便抓取并将它们发送到每个单独的蜘蛛。这是一个具体的例子:

首先,准备要抓取的网址列表,并将它们放入单独的文件/网址中:

http://somedomain.com/urls-to-crawl/spider1/part1.list
http://somedomain.com/urls-to-crawl/spider1/part2.list
http://somedomain.com/urls-to-crawl/spider1/part3.list

然后你在3个不同的Scrapyd服务器上发射蜘蛛。蜘蛛会收到一个(蜘蛛)参数部分,其中包含要爬网的分区的编号:

curl http://scrapy1.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=1
curl http://scrapy2.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=2
curl http://scrapy3.mycompany.com:6800/schedule.json -d project=myproject -d spider=spider1 -d part=3

以上是关于多核执行Scrapy的主要内容,如果未能解决你的问题,请参考以下文章

scrapy按顺序启动多个爬虫代码片段(python3)

在 Java 中查找多核代码的执行时间

Scrapy Spider没有返回所有元素

Scrapy研究探索——Scrapy核心架构与代码执行分析

Scrapy框架概述

Scrapy的初步认识