将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录相关的知识,希望对你有一定的参考价值。

我们有一个firehose将记录发送到Elasticsearch Service集群。我们的集群已经填满,一些记录未通过S3。 https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry上的文档表明失败的记录可用于回填:“跳过的文档将传递到elasticsearch_failed /文件夹中的S3存储桶,您可以将其用于手动回填”但我无法找到任何文档怎么做到这一点。

查看记录,它们似乎是包含JSON blob的文本文件的gzip文件,其中“rawData”字段包含我们发送给firehose的原始记录的base64编码字符串。

是否有现成的工具来处理S3中的这些gzip文件,将其分解并重新提交记录?文档暗示你可以“只是手动回填”,这是一个非常标准化的流程,所以我的假设是有人之前做过这个,但我还是找不到。

答案

我想手动回填意味着使用其中一个AWS开发工具包再次将文档发送到Elasticsearch。 python中的一个例子(使用boto3),从S3读取失败文件并将文档发送到Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('
'))))

for case in failure_cases:
    try:
        data = base64.b64decode(case['rawData'])
        es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
        logger.debug("Successfully sent {}".format(case['esDocumentId']))
    except RequestError:
        logger.info("Retry failed for Document ID {}
Reason: {}"
                    .format(case['esDocumentId'], case['errorMessage']))

以上是关于将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录的主要内容,如果未能解决你的问题,请参考以下文章

AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose

AWS Kinesis Firehose 到 ElasticSearch 地理数据映射

从 fluentd 发送到 aws kinesis firehose 时数据丢失

将镶木地板从 AWS Kinesis firehose 写入 AWS S3

使用 AWS kinesis-firehose 将数据写入文件

Node.js中的代码AWS Lambda Package不会调用putRecord()来将数据添加到AWS Kinesis Firehose Stream中