使用 Hadoop Streaming 和 MapReduce 处理来自 CommonCrawl 的许多 WARC 档案
Posted
技术标签:
【中文标题】使用 Hadoop Streaming 和 MapReduce 处理来自 CommonCrawl 的许多 WARC 档案【英文标题】:Processing many WARC archives from CommonCrawl using Hadoop Streaming and MapReduce 【发布时间】:2019-01-20 17:43:51 【问题描述】:我正在做一个项目,我需要从 S3 容器中下载特定 URL 的爬网数据(来自 CommonCrawl),然后处理该数据。
目前我有一个 MapReduce 作业(通过 Hadoop Streaming 的 Python),它为 URL 列表获取正确的 S3 文件路径。然后我尝试使用第二个 MapReduce 作业通过从 commoncrawl S3 存储桶下载数据来处理此输出。在映射器中,我使用 boto3 从 commoncrawl S3 存储桶下载特定 URL 的 gzip 内容,然后输出有关 gzip 内容的一些信息(字数计数器信息、内容长度、链接到的 URL 等)。然后 reducer 通过这个输出得到最终的字数、URL 列表等。
第一个 MapReduce 作业的输出文件只有大约 6mb 大小(但一旦我们扩展到完整的数据集就会更大)。当我运行第二个 MapReduce 时,这个文件只被拆分了两次。通常对于这么小的文件来说这不是问题,但是我上面描述的映射器代码(获取 S3 数据、吐出映射的输出等)需要一段时间才能为每个 URL 运行。由于文件只拆分了两次,因此只有 2 个映射器正在运行。我需要增加拆分的数量,以便更快地完成映射。
我已尝试为 MapReduce 作业设置“mapreduce.input.fileinputformat.split.maxsize”和“mapreduce.input.fileinputformat.split.minsize”,但它不会更改发生的拆分次数。
这是映射器的一些代码:
s3 = boto3.client('s3', 'us-west-2', config=Config(signature_version=UNSIGNED))
offset_end = offset + length - 1
gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
'Body'].read()
fileobj = io.BytesIO(gz_file)
with gzip.open(fileobj, 'rb') as file:
[do stuff]
我还手动将输入文件拆分为多个文件,最多 100 行。这具有为我提供更多映射器的预期效果,但随后我开始遇到来自 s3client.get_object() 调用的 ConnectionError:
Traceback (most recent call last):
File "dmapper.py", line 103, in <module>
commoncrawl_reader(base_url, full_url, offset, length, warc_file)
File "dmapper.py", line 14, in commoncrawl_reader
gz_file = s3.get_object(Bucket='commoncrawl', Key=filename, Range='bytes=%s-%s' % (offset, offset_end))[
File "/usr/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/lib/python3.6/site-packages/botocore/client.py", line 599, in _make_api_call
operation_model, request_dict)
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 148, in make_request
return self._send_request(request_dict, operation_model)
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 177, in _send_request
success_response, exception):
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 273, in _needs_retry
caught_exception=caught_exception, request_dict=request_dict)
File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 227, in emit
return self._emit(event_name, kwargs)
File "/usr/lib/python3.6/site-packages/botocore/hooks.py", line 210, in _emit
response = handler(**kwargs)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 183, in __call__
if self._checker(attempts, response, caught_exception):
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 251, in __call__
caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 277, in _should_retry
return self._checker(attempt_number, response, caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 317, in __call__
caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 223, in __call__
attempt_number, caught_exception)
File "/usr/lib/python3.6/site-packages/botocore/retryhandler.py", line 359, in _check_caught_exception
raise caught_exception
File "/usr/lib/python3.6/site-packages/botocore/endpoint.py", line 222, in _get_response
proxies=self.proxies, timeout=self.timeout)
File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/sessions.py", line 573, in send
r = adapter.send(request, **kwargs)
File "/usr/lib/python3.6/site-packages/botocore/vendored/requests/adapters.py", line 415, in send
raise ConnectionError(err, request=request)
botocore.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))
我目前只使用少数几个 URL 来运行它,但是一旦我开始工作,我将需要使用数千个(每个都有许多子目录)来运行它。
我不确定从哪里着手解决这个问题。我觉得很有可能有比我正在尝试的方法更好的方法。映射器似乎为每个 URL 花费了这么长时间的事实似乎是我正在接近这个错误的一个重要迹象。我还应该提到,如果直接作为管道命令运行,则映射器和化简器都可以正常运行:
"cat short_url_list.txt | python mapper.py | sort | python reducer.py" -> 产生所需的输出,但在整个 URL 列表上运行需要很长时间。
任何指导将不胜感激。
【问题讨论】:
【参考方案1】:MapReduce API 提供NLineInputFormat。属性“mapreduce.input.lineinputformat.linespermap”允许控制最多有多少行(这里是 WARC 记录)传递给映射器。它适用于mrjob,参见。伊利亚的WARC indexer。
关于 S3 连接错误:最好在数据所在的 us-east-1 AWS 区域运行作业。
【讨论】:
以上是关于使用 Hadoop Streaming 和 MapReduce 处理来自 CommonCrawl 的许多 WARC 档案的主要内容,如果未能解决你的问题,请参考以下文章
linux下用hadoop streaming 跑php总是jobs fail!