将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录

Posted

技术标签:

【中文标题】将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录【英文标题】:Backfill AWS Kinesis Firehose to Elasticsearch Service failed records 【发布时间】:2018-09-24 03:23:53 【问题描述】:

我们有一个将记录发送到 Elasticsearch 服务集群的 firehose。我们的集群已满,一些记录故障转移到 S3。 https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry 的文档表明失败的记录可用于回填:“跳过的文档将传送到 elasticsearch_failed/ 文件夹中的 S3 存储桶,您可以将其用于手动回填”但我找不到任何有关如何完成此操作的文档。

查看记录,它们似乎是包含 JSON blob 的文本文件的 gzip 文件,其中“rawData”字段包含我们发送到 firehose 的原始记录的 base64 编码字符串。

是否有现成的工具可以从 S3 中处理这些 gzip 文件、分解它们并重新提交记录?该文档暗示您可以“仅手动回填”,这是一个非常标准化的流程,所以我的假设是之前有人这样做过,但我无法找到方法。

【问题讨论】:

您找到任何合适的解决方案了吗?我还在网上搜索对此的一些“内置”支持。 AWS Data Pipeline 提供了一种更好的方法,您可以重新运行失败的实例。 【参考方案1】:

我想手动回填意味着使用 AWS 开发工具包之一将文档再次发送到 Elasticsearch。 python 中的一个示例(使用 boto3),从 S3 读取失败文件并将其中的文档发送到 Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))

for case in failure_cases:
    try:
        data = base64.b64decode(case['rawData'])
        es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
        logger.debug("Successfully sent ".format(case['esDocumentId']))
    except RequestError:
        logger.info("Retry failed for Document ID \nReason: "
                    .format(case['esDocumentId'], case['errorMessage']))

【讨论】:

【参考方案2】:

遇到了同样的问题,修改了上面的脚本,将失败的文档(带有 403 )回填到现有的 elasticsearch 实例中

import boto3
import json
import base64
import logger
import requests

s3_client = boto3.client('s3', region_name="xx-xx-x", aws_access_key_id="xxxx", aws_secret_access_key="xxxx")
s3keys = s3_client.list_objects(Bucket="bucketname", Prefix='path/to/folder/file')
for s3key in s3keys['Contents']:
    print(s3key['Key'])
    file = s3_client.get_object(Bucket="bucketname", Key=s3key['Key'])
    text = file['Body'].read().decode("utf-8")
    failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
    for case in failure_cases:
        data = base64.b64decode(case['rawData'])
        esid = case['esDocumentId']
        esIndexName = case['esIndexName']
        doc = data.decode('utf-8')
        url = ("https://es-domain-name/%s/_doc/%s" %(esIndexName, esid ))
        headers = "content-type": "application/json", "Accept-Charset": "UTF-8"
        if case['errorCode'] == '403':
            try:
                print(case['errorCode'])
                r = requests.post(url, data=doc, headers=headers, auth=('user', 'password'))
                response = r.json()
                print(response)
            except:
                pass

【讨论】:

老兄,对于这两个答案,这个:failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n')))) 代码是 hiiiiiideeeoousssss 而不是 pythonic。一个更好的选择:[json.loads(d) for d in body.strip().split(b"\n")] 其中 body 没有被解码(因此被 b"\n" 分割)。

以上是关于将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录的主要内容,如果未能解决你的问题,请参考以下文章

使用 AWS kinesis-firehose 将数据写入文件

AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose

如何使用 AWS Kinesis Firehose 将嵌套结构推送到 Redshift

AWS Kinesis Firehose 到 ElasticSearch 地理数据映射

将镶木地板从 AWS Kinesis firehose 写入 AWS S3

从 fluentd 发送到 aws kinesis firehose 时数据丢失