如何从scrapy spider中的数据框异步分配产品收益

Posted

技术标签:

【中文标题】如何从scrapy spider中的数据框异步分配产品收益【英文标题】:How to asynchronously distribute product-yielding from a dataframe in scrapy spider 【发布时间】:2021-05-22 13:58:18 【问题描述】:

在从数据框生成产品​​时,有没有办法利用 Scrapy 的异步架构?


概述

我的一个 Scrapy 项目中有一个蜘蛛,它与你典型的蜘蛛逻辑不同,如下所示:

    爬取在线文件目录以获取两个 zip 文件的最新版本,这两个文件都包含多个 csv 文件 将 csv 文件提取到当前工作目录 利用 pandas.read_csv 将每个 csv 读入其自己的数据帧中 执行pandas.merge 操作将数据组合成两个最终数据帧(一个是主数据帧,另一个是支持数据帧,其中存在一对多(主对支持)行关系 最后,蜘蛛循环遍历主数据框,开始填充一个 scrapy 项目,然后从支持数据框中收集额外的数据,并生成一个完整的项目

逻辑有效,但该过程需要大约 5 1/2 小时才能完成,因为它要处理 50 万个项目,并且一旦开始从数据帧中生成项目,它就会基本上变得同步。

下面是我用来完成所有这些的代码。瓶颈发生在process_csvs 函数中。

from ..items.redacted import Loader, REDACTEDItem
from scrapy.exceptions import CloseSpider
from datetime import datetime
import pandas as pd
import numpy as np
import zipfile
import scrapy
import json
import os


class REDACTEDSpider(scrapy.Spider):
    name = 'REDACTED'
    allowed_domains = ['REDACTED']
    start_urls = ['https://REDACTED/datasets/']
    # custom_settings = dict(TESTING_MODE=True, LOG_LEVEL='DEBUG')
    zip_filename = 'temp_redacted_data.zip'

    def parse(self, response):
        main_file_date = supporting_file_date = datetime.min
        main_file = supporting_file = None
        for link in response.xpath('//a[contains(@href, "primary_csv")]/@href').getall():
            link_date = datetime.strptime(link.rstrip('.zip')[-10:], '%Y-%m-%d')
            if link_date > main_file_date:
                main_file = link
                main_file_date = link_date
        if not main_file:
            raise CloseSpider('primary_csv zip file not found')
        self.logger.info('Found latest primary_csv file link (%s)' % main_file)
        main_file = f"https://REDACTED/datasets/main_file"
        for link in response.xpath('//a[contains(@href, "supporting_csv")]/@href').getall():
            link_date = datetime.strptime(link.rstrip('.zip')[-10:], '%Y-%m-%d')
            if link_date > supporting_file_date:
                supporting_file = link
                supporting_file_date = link_date
        if not supporting_file:
            raise CloseSpider('supporting_csv zip file not found')
        self.logger.info('Found latest supporting_csv file link (%s)' % supporting_file)
        supporting_file = f"https://REDACTED/datasets/supporting_file"

        # we pass supporting_file to essentially download the files sequentially
        # and so that we can make sure the files are downloaded before moving on to ingesting them
        self.logger.info('Downloading primary_csv zip file')
        yield scrapy.Request(main_file, callback=self.handle_zip, cb_kwargs=dict(supporting_file=supporting_file))

    def handle_zip(self, response, supporting_file=None):
        file_alias = 'primary_csv' if supporting_file else 'supporting_csv'
        # download zip - if this is the second time this function is called it will overwrite the first zip file
        # since we've already extracted the files we need from it
        self.logger.info(f"Saving file_alias zip file")
        with open(self.zip_filename, 'wb') as usda_file:
            usda_file.write(response.body)
        # extract zip contents
        self.logger.info(f"Extracting files from file_alias zip file")
        with zipfile.ZipFile(self.zip_filename, 'r') as zfile:
            if supporting_file:
                # we're extracting the first file, and still need to queue up the supporting_file
                zfile.extract('primary_csv_file_1.csv')
                zfile.extract('primary_csv_file_2.csv')
                zfile.extract('primary_csv_file_3.csv')
            else:
                # we're extracting the supporting_file now
                zfile.extract('supporting_csv_file.csv')

        if supporting_file:
            self.logger.info('Downloading supporting_csv zip file')
            yield scrapy.Request(supporting_file, callback=self.handle_zip)
        else:
            # remove the zipfile since we no longer need it
            # this will free up some storage space in case we need extra for the staging db
            os.remove(self.zip_filename)
            # both files have been unzipped, so we can move onto processing the csvs
            self.logger.info('Processing CSV files')
            
            # FIXME: This essentially bottlenecks into yielding items from a single thread
            yield from self.process_csvs()
    
    def process_csvs(self):
        primary_csv_file_1 = pd.read_csv('primary_csv_file_1.csv', usecols=[...], dtype=dict(...))
        primary_csv_file_2 = pd.read_csv('primary_csv_file_2.csv', usecols=[...], dtype=dict(...))
        primary_csv_file_3 = pd.read_csv('primary_csv_file_3.csv', usecols=[...], dtype=dict(...))
        supporting_csv_file = pd.read_csv('supporting_csv_file.csv', usecols=[...], dtype=dict(...))

        # Join the above four files into two pandas dataframes
        # Step 1: Join primary_csv_file_2.csv into primary_csv_file_1.csv
        primary_csv_file_1 = pd.merge(primary_csv_file_1, primary_csv_file_2, on='id', how='left')
        primary_csv_file_1.replace(np.nan, '', regex=True, inplace=True)
        # primary_csv_file_1 should now have most of the essential columns needed to create a full item
        # Step 2: Join supporting_csv_file.csv into primary_csv_file_3.csv
        primary_csv_file_3 = pd.merge(primary_csv_file_3, supporting_csv_file, left_on='supporting_id', right_on='id', how='left')
        primary_csv_file_3.replace(np.nan, '', regex=True, inplace=True)
        # primary_csv_file_3 should now have an additional column from supporting_csv_file

        # TODO: This is where I would like to fork the function in order to take full advantage of Scrapy's asynchronous processing
        for product in primary_csv_file_1.itertuples():
            loader = Loader(item=REDACTEDItem())
            loader.add_value('url', 'REDACTED')
            loader.add_value('category', product.category)
            loader.add_value('upc', product.upc)
            loader.add_value('brand', product.brand)
            loader.add_value('product_name', product.name)

            # Filter primary_csv_file_3 by id to get all nutrients and nutrient values for this product
            p_nutrients = primary_csv_file_3[primary_csv_file_3.id == product.supporting_id]
            nutrients = []
            for nutrient in p_nutrients.itertuples():
                nutrients.append(dict(
                    alias=nutrient.name,
                    value=nutrient.amount,
                    unit_of_measure=nutrient.units
                ))
            loader.add_value('nutrition', json.dumps(nutrients))

            yield loader.load_item()

        # remove the csv files to free up space
        os.remove('primary_csv_file_1.csv')
        os.remove('primary_csv_file_2.csv')
        os.remove('primary_csv_file_3.csv')
        os.remove('supporting_csv_file.csv')

【问题讨论】:

由于任务是 CPU 密集型的,我认为您唯一的解决方案是看看您是否可以使用 multiprocessing 来处理一些 CPU 密集型部分。 我害怕那个。您能否提供一个工作示例,说明我如何做到这一点,同时设法仍将项目返回到初始过程?我需要从原始爬虫中生成所有项目,以便它们可以通过管道。由于数据迁移限制,我不能将每个进程视为它自己的爬虫实例,因为这会导致一个产品实例覆盖其他实例(所以如果使用 4 个进程而不是全部,我最终会得到 1/4 项项)。 我不熟悉如何将 Scrapy(或 Twisted,Scrapy 用于并发)和多处理结合起来。我知道 Twisted 有 deferToThread,但是一个单独的线程只会防止 CPU 密集型的东西阻塞网络输入/输出,多处理是唯一可以让你使用多个处理器的东西(假设你的 CPU 有多个,并且您希望使用它们)。 REDACTEDItems 是否只是从 csvs 中读取的数据,还是这个类正在访问 Internet 并下载某些内容? parse函数中可以看出,蜘蛛从在线站点下载csv文件 【参考方案1】:

是否可以将其分成两个单独的 .py 文件?

第一个是爬取和复制——在复制之后更新一个包含适当文件名列表的 redis 数据库——它在扭曲或其他上运行)。

第二个在内部 python 调度程序或类似程序上运行,并访问未处理文件的 redis 数据库 - 然后可以使用多处理解析出工作。

试图扭曲以更好地使用多处理可能会以糟糕的方式结束 - 因此,如果您可以将它们解耦,它可能会产生更快的解决方案。

如果您需要将信息返回到第一个(扭曲的).py,它还可以轮询同一个 redis db 以获取来自第二个 .py 的更新


更新 #1

import redis
r = redis.Redis(db=0)
r.set("inputValName", inputVal)

extractedVal = r.get("inputValName").decode()

【讨论】:

为单个蜘蛛设置一个 redis 数据库对于一个目前拥有 30 多个蜘蛛(并且还在增长)的项目来说似乎是太多的额外开销。 你可能会惊讶于 redis 在语法方面的轻量级。 [我已将语法添加到原始评论中,因为它一团糟!] 如果需要,您可以将 JSON 打包为一个 inputVal 以压缩内容。在内存开销方面 - 如果您只包含文件的路径,那么使用的内存将是最小的。 这与我需要的解决方案不一致。我需要一些可以在爬虫蜘蛛中使用的东西。也许您的建议可以在这种情况下使用,但您的示例并未显示如何做到这一点。 是的 - 我没有时间提供完整的解决方案 - 只是建议一条出路。这可能会有所帮助:scrapy-redis.readthedocs.io/en/stable/index.html#document-index

以上是关于如何从scrapy spider中的数据框异步分配产品收益的主要内容,如果未能解决你的问题,请参考以下文章

scrapy框架spider

Python爬虫从入门到放弃(十五)之 Scrapy框架中Spiders用法

Scrapy之Spider

Scrapy的使用

爬虫:Scrapy4 - Spiders

scrapy spider官方文档