将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录
Posted
技术标签:
【中文标题】将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录【英文标题】:Backfill AWS Kinesis Firehose to Elasticsearch Service failed records 【发布时间】:2018-09-24 03:23:53 【问题描述】:我们有一个将记录发送到 Elasticsearch 服务集群的 firehose。我们的集群已满,一些记录故障转移到 S3。 https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry 的文档表明失败的记录可用于回填:“跳过的文档将传送到 elasticsearch_failed/ 文件夹中的 S3 存储桶,您可以将其用于手动回填”但我找不到任何有关如何完成此操作的文档。
查看记录,它们似乎是包含 JSON blob 的文本文件的 gzip 文件,其中“rawData”字段包含我们发送到 firehose 的原始记录的 base64 编码字符串。
是否有现成的工具可以从 S3 中处理这些 gzip 文件、分解它们并重新提交记录?该文档暗示您可以“仅手动回填”,这是一个非常标准化的流程,所以我的假设是之前有人这样做过,但我无法找到方法。
【问题讨论】:
您找到任何合适的解决方案了吗?我还在网上搜索对此的一些“内置”支持。 AWS Data Pipeline 提供了一种更好的方法,您可以重新运行失败的实例。 【参考方案1】:我想手动回填意味着使用 AWS 开发工具包之一将文档再次发送到 Elasticsearch。 python 中的一个示例(使用 boto3),从 S3 读取失败文件并将其中的文档发送到 Elasticsearch:
es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
for case in failure_cases:
try:
data = base64.b64decode(case['rawData'])
es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
logger.debug("Successfully sent ".format(case['esDocumentId']))
except RequestError:
logger.info("Retry failed for Document ID \nReason: "
.format(case['esDocumentId'], case['errorMessage']))
【讨论】:
【参考方案2】:遇到了同样的问题,修改了上面的脚本,将失败的文档(带有 403 )回填到现有的 elasticsearch 实例中
import boto3
import json
import base64
import logger
import requests
s3_client = boto3.client('s3', region_name="xx-xx-x", aws_access_key_id="xxxx", aws_secret_access_key="xxxx")
s3keys = s3_client.list_objects(Bucket="bucketname", Prefix='path/to/folder/file')
for s3key in s3keys['Contents']:
print(s3key['Key'])
file = s3_client.get_object(Bucket="bucketname", Key=s3key['Key'])
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
for case in failure_cases:
data = base64.b64decode(case['rawData'])
esid = case['esDocumentId']
esIndexName = case['esIndexName']
doc = data.decode('utf-8')
url = ("https://es-domain-name/%s/_doc/%s" %(esIndexName, esid ))
headers = "content-type": "application/json", "Accept-Charset": "UTF-8"
if case['errorCode'] == '403':
try:
print(case['errorCode'])
r = requests.post(url, data=doc, headers=headers, auth=('user', 'password'))
response = r.json()
print(response)
except:
pass
【讨论】:
老兄,对于这两个答案,这个:failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
代码是 hiiiiiideeeoousssss 而不是 pythonic。一个更好的选择:[json.loads(d) for d in body.strip().split(b"\n")]
其中 body 没有被解码(因此被 b"\n" 分割)。以上是关于将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录的主要内容,如果未能解决你的问题,请参考以下文章
使用 AWS kinesis-firehose 将数据写入文件
AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose
如何使用 AWS Kinesis Firehose 将嵌套结构推送到 Redshift
AWS Kinesis Firehose 到 ElasticSearch 地理数据映射