将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录相关的知识,希望对你有一定的参考价值。
我们有一个firehose将记录发送到Elasticsearch Service集群。我们的集群已经填满,一些记录未通过S3。 https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry上的文档表明失败的记录可用于回填:“跳过的文档将传递到elasticsearch_failed /文件夹中的S3存储桶,您可以将其用于手动回填”但我无法找到任何文档怎么做到这一点。
查看记录,它们似乎是包含JSON blob的文本文件的gzip文件,其中“rawData”字段包含我们发送给firehose的原始记录的base64编码字符串。
是否有现成的工具来处理S3中的这些gzip文件,将其分解并重新提交记录?文档暗示你可以“只是手动回填”,这是一个非常标准化的流程,所以我的假设是有人之前做过这个,但我还是找不到。
我想手动回填意味着使用其中一个AWS开发工具包再次将文档发送到Elasticsearch。 python中的一个例子(使用boto3),从S3读取失败文件并将文档发送到Elasticsearch:
es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('
'))))
for case in failure_cases:
try:
data = base64.b64decode(case['rawData'])
es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
logger.debug("Successfully sent {}".format(case['esDocumentId']))
except RequestError:
logger.info("Retry failed for Document ID {}
Reason: {}"
.format(case['esDocumentId'], case['errorMessage']))
以上是关于将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录的主要内容,如果未能解决你的问题,请参考以下文章
AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose
AWS Kinesis Firehose 到 ElasticSearch 地理数据映射
从 fluentd 发送到 aws kinesis firehose 时数据丢失
将镶木地板从 AWS Kinesis firehose 写入 AWS S3
使用 AWS kinesis-firehose 将数据写入文件
Node.js中的代码AWS Lambda Package不会调用putRecord()来将数据添加到AWS Kinesis Firehose Stream中