如何将抓取的数据从 Scrapy 以 csv 或 json 格式上传到 Amazon S3?
Posted
技术标签:
【中文标题】如何将抓取的数据从 Scrapy 以 csv 或 json 格式上传到 Amazon S3?【英文标题】:How to upload crawled data from Scrapy to Amazon S3 as csv or json? 【发布时间】:2016-12-11 19:14:57 【问题描述】:从 Scrapy 抓取的数据以 csv/jsonl/json 文件的形式上传到 Amazon s3 的步骤是什么?我只能从互联网上找到将抓取的图像上传到 s3 存储桶。
我目前使用的是 Ubuntu 16.04, 我已经通过命令安装了boto,
pip install boto
我在 settings.py 中添加了以下几行。谁能解释我必须做出的其他改变。
AWS_ACCESS_KEY_ID = 'access key id'
AWS_SECRET_ACCESS_KEY= 'access key'
FEED_URI = 'bucket path'
FEED_FORMAT = 'jsonlines'
FEED_EXPORT_FIELDS = None
FEED_STORE_EMPTY = False
FEED_STORAGES =
FEED_STORAGES_BASE =
'': None,
'file': None,
'stdout': None,
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': None,
FEED_EXPORTERS =
FEED_EXPORTERS_BASE =
'json': None,
'jsonlines': None,
'jl': None,
'csv': None,
'xml': None,
'marshal': None,
'pickle': None,
编辑 1: 当我配置上述所有内容并运行 scrapy crawl spider
,
爬取结果后出现如下错误。
2016-08-08 10:57:03 [scrapy] ERROR: Error storing csv feed (200 items) in: s3: myBucket/crawl.csv
Traceback (most recent call last):
File "/usr/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 246, in inContext
result = inContext.theWork()
File "/usr/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 262, in <lambda>
inContext.theWork = lambda: context.call(ctx, func, *args, **kw)
File "/usr/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "/usr/local/lib/python2.7/dist-packages/scrapy/extensions/feedexport.py", line 123, in _store_in_thread
key.set_contents_from_file(file)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 1293, in set_contents_from_file
chunked_transfer=chunked_transfer, size=size)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 750, in send_file
chunked_transfer=chunked_transfer, size=size)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/key.py", line 951, in _send_file_internal
query_args=query_args
File "/usr/local/lib/python2.7/dist-packages/boto/s3/connection.py", line 656, in make_request
auth_path = self.calling_format.build_auth_path(bucket, key)
File "/usr/local/lib/python2.7/dist-packages/boto/s3/connection.py", line 94, in build_auth_path
path = '/' + bucket
TypeError: cannot concatenate 'str' and 'NoneType' objects
【问题讨论】:
试一试,并解决您在操作过程中遇到的问题 我已经配置了如上所示的设置并执行了程序,但它没有留下任何错误并且仍然没有出现在 s3 存储桶中。我参考了以下链接 [link]_(***.com/questions/15955723/… 最好参考文档doc.scrapy.org/en/latest/topics/… 你能做一件事把它分成几个步骤:有scrapy signal
下载json
或csv
中的文件并使用s3cli
或aws cli
将数据推送到s3
使用一些 bash 脚本,最后它会删除本地系统上的文件
【参考方案1】:
到 2021 年这项任务变得容易得多。
FEED_URI
和 FEED_FORMAT
已弃用并已移动
在名为 FEEDS
的新设置中。
无需在settings.py
内定义自定义项管道。
您必须安装 botocore
才能正常工作。
这是您必须添加到settings.py
的内容:
AWS_ACCESS_KEY_ID = 'your_access_key_id'
AWS_SECRET_ACCESS_KEY = 'your_secret_access_key'
FEEDS =
's3://your-bucket/path-to-data/%(name)s/data.json':
'format': 'json',
'encoding': 'utf8',
'store_empty': False,
'indent': 4,
可以在docs 中查看所有可用选项的列表。
如果你还想在s3中存储文件或图片,那么你需要在settings.py
中指定一个item pipeline和一个存储变量:
ITEM_PIPELINES =
'scrapy.pipelines.images.ImagesPipeline': 1 # For images, Pillow must be installed
'scrapy.pipelines.files.FilesPipeline': 2 # For files
IMAGES_STORE = 's3://your-bucket/path_to_images_dir/'
FILES_STORE = 's3://your-bucket/path_to_files_dir/'
【讨论】:
【参考方案2】:我决定用对我有用的代码 sn-p 来回答 Mil0R3 对 Abhishek K 的评论。
在settings.py中需要添加如下代码:
AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''
# You need to have both variables FEED_URI and S3PIPELINE_URL set to the same
# file or this code will not work.
FEED_URI = 's3://bucket/file_name.jsonl'
S3PIPELINE_URL = FEED_URI
FEED_FORMAT = 'jsonlines'
# project_folder refers to the folder that both pipelines.py and settings.py are in
ITEM_PIPELINES =
'project_folder.pipelines.S3Pipeline': 1,
在 pipelines.py 中,您需要添加以下对象。从这里复制和粘贴的 github 项目可以在这里找到:https://github.com/orangain/scrapy-s3pipeline
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
from io import BytesIO
from urllib.parse import urlparse
from datetime import datetime
import gzip
import boto3
from botocore.exceptions import ClientError
from scrapy.exporters import JsonLinesItemExporter
class S3Pipeline:
"""
Scrapy pipeline to store items into S3 bucket with JSONLines format.
Unlike FeedExporter, the pipeline has the following features:
* The pipeline stores items by chunk.
* Support GZip compression.
"""
def __init__(self, settings, stats):
self.stats = stats
url = settings['S3PIPELINE_URL']
o = urlparse(url)
self.bucket_name = o.hostname
self.object_key_template = o.path[1:] # Remove the first '/'
self.max_chunk_size = settings.getint('S3PIPELINE_MAX_CHUNK_SIZE', 100)
self.use_gzip = settings.getbool('S3PIPELINE_GZIP', url.endswith('.gz'))
self.s3 = boto3.client(
's3',
region_name=settings['AWS_REGION_NAME'], use_ssl=settings['AWS_USE_SSL'],
verify=settings['AWS_VERIFY'], endpoint_url=settings['AWS_ENDPOINT_URL'],
aws_access_key_id=settings['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=settings['AWS_SECRET_ACCESS_KEY'])
self.items = []
self.chunk_number = 0
@classmethod
def from_crawler(cls, crawler):
return cls(crawler.settings, crawler.stats)
def process_item(self, item, spider):
"""
Process single item. Add item to items and then upload to S3 if size of items
>= max_chunk_size.
"""
self.items.append(item)
if len(self.items) >= self.max_chunk_size:
self._upload_chunk(spider)
return item
def open_spider(self, spider):
"""
Callback function when spider is open.
"""
# Store timestamp to replace time in S3PIPELINE_URL
self.ts = datetime.utcnow().replace(microsecond=0).isoformat().replace(':', '-')
def close_spider(self, spider):
"""
Callback function when spider is closed.
"""
# Upload remained items to S3.
self._upload_chunk(spider)
def _upload_chunk(self, spider):
"""
Do upload items to S3.
"""
if not self.items:
return # Do nothing when items is empty.
f = self._make_fileobj()
# Build object key by replacing variables in object key template.
object_key = self.object_key_template.format(**self._get_uri_params(spider))
try:
self.s3.upload_fileobj(f, self.bucket_name, object_key)
except ClientError:
self.stats.inc_value('pipeline/s3/fail')
raise
else:
self.stats.inc_value('pipeline/s3/success')
finally:
# Prepare for the next chunk
self.chunk_number += len(self.items)
self.items = []
def _get_uri_params(self, spider):
params =
for key in dir(spider):
params[key] = getattr(spider, key)
params['chunk'] = self.chunk_number
params['time'] = self.ts
return params
def _make_fileobj(self):
"""
Build file object from items.
"""
bio = BytesIO()
f = gzip.GzipFile(mode='wb', fileobj=bio) if self.use_gzip else bio
# Build file object using ItemExporter
exporter = JsonLinesItemExporter(f)
exporter.start_exporting()
for item in self.items:
exporter.export_item(item)
exporter.finish_exporting()
if f is not bio:
f.close() # Close the file if GzipFile
# Seek to the top of file to be read later
bio.seek(0)
return bio
特别说明:
我需要删除 OP 的 settings.py 文件中的一些数据,以使该管道正常工作。所有这些都需要删除
FEED_EXPORT_FIELDS = None
FEED_STORE_EMPTY = False
FEED_STORAGES =
FEED_STORAGES_BASE =
'': None,
'file': None,
'stdout': None,
's3': 'scrapy.extensions.feedexport.S3FeedStorage',
'ftp': None,
FEED_EXPORTERS =
FEED_EXPORTERS_BASE =
'json': None,
'jsonlines': None,
'jl': None,
'csv': None,
'xml': None,
'marshal': None,
'pickle': None,
另外,确保 S3PIPELINE_URL 变量等于 FEED_URI
不从 settings.py 中删除上述信息或不将上述两个变量设置为彼此都会导致 jsonl 文件显示在您的 S3 存储桶中,但仅添加了单个项目的多个副本。不过我不知道为什么会这样……
这花了我几个小时才弄清楚,所以我希望它可以节省一些时间。
【讨论】:
【参考方案3】:通过在settings.py
文件中添加以下行来解决问题:
ITEM_PIPELINE =
'scrapy.pipelines.files.S3FilesStore': 1
连同前面提到的 S3 凭据。
AWS_ACCESS_KEY_ID = 'access key id'
AWS_SECRET_ACCESS_KEY= 'access key'
FEED_URI='s3://bucket/folder/filename.json'
谢谢大家的指导。
【讨论】:
嗨,你能告诉我你是怎么写你的S3FilesStore
的吗?以上是关于如何将抓取的数据从 Scrapy 以 csv 或 json 格式上传到 Amazon S3?的主要内容,如果未能解决你的问题,请参考以下文章