AWS Firehose 到 Elastic Search - 将一条 Firehose 记录转换为多个 Elastic 条目

Posted

技术标签:

【中文标题】AWS Firehose 到 Elastic Search - 将一条 Firehose 记录转换为多个 Elastic 条目【英文标题】:AWS Firehose to Elastic Search - Transforming one Firehose record into multiple Elastic entries 【发布时间】:2021-10-31 18:04:35 【问题描述】:

我的 Lambda 函数中有一个经过优化(标准化)的 JSON 字符串。它被转发到 Firehose 到 Elastic Search。 我的计划是使用 Kinesis Data Transformation Lambda(请参阅https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html)对 JSON 进行非规范化,以从中获取正确的 Elastic Search 条目。 这个问题是关于可行性/如何做的。

这是我的基本 AWS 设置:

    Lambda 函数:从 Internet 获取规范化的 JSON,对其进行验证,附加一些属性,然后通过 putRecord 将其转发到 Kinesis Firehose。 Kinesis Firehose 转换 lambda:获取一条记录,读取 JSON 并从中生成多个项目,然后将其返回给 Firehose,后者将其转发给 Elastic Search

我的问题是:Firehose/转换 lambda 是否有可能从一条记录中创建多个 Elastic Search 条目? 我将尝试使用一些伪代码来可视化场景:

lambda.js

exports.handler = async function (event) 
    // 1. get inputDoc from request, which contains multiple es_documents
    // 2. attach timestamp as property
    // result:
    const inputDoc = 
        request_logged_at: '2017-02-07T15:13:01.39256Z',
        es_documents: [
            
                foo: 'bar'
            ,
            
                foo: 'baz'
            
        ]
    ;

    const firehoseParams = 
        DeliveryStreamName: 'Delivery-Stream',
        Record: 
            Data: JSON.stringify(inputDoc)
        
    ;

    await firehose.putRecord(firehoseParams).promise();
    return  statusCode: 201 ;

firehose/transform_lambda.js

exports.handler = (event, context) => 
    const record = event.records[0];
    const myDoc = (new Buffer(record.data, 'base64')).toString('utf8');

    // Denormalize request_logged_at into the single documents, so every document in Elastic knows when it got logged
    const docsToElastic = myDoc.es_documents.map(doc => 
        return doc.request_logged_at = myDoc.request_logged_at;
    );

    // This is the main point: How to return this array back to Firehose so Elastic creates multiple table entries?
    // My first guess is the following, as I've seen this syntax in other places 
    // (see first "Note" on this page[https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html])
    const result = docsToElastic.reduce((accumulator, doc) => 
        return accumulator + JSON.stringify(doc);
    , '');

    // result: '"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z""foo":"baz","request_logged_at":"2017-02-07T15:13:01.39256Z"'

    const payload = (new Buffer(result, 'utf8')).toString('base64');
    return 
        recordId: record.recordId,
        result: 'Ok',
        data: payload
    ;

任何人都可以分享他对这个用例的了解吗?这行得通吗?

顺便说一句:我知道我可以在第一个 lambda 中进行非规范化并使用 firehose.putRecordBatch(),但第一个 lambda 已经有很多任务,这也是分离关注点的问题

【问题讨论】:

【参考方案1】:

我自己解决了。来自docs 的这些提示给了我一个想法:

然后,它 [Firehose] 会生成一个 Elasticsearch 批量请求,以将多条记录索引到您的 Elasticsearch 集群。在将记录发送到 Kinesis Data Firehose 之前,请确保您的记录采用 UTF-8 编码并展平为单行 JSON 对象。

所以实际上在后台发生的是 Firehose 执行了ES bulk request。那么 Firehose 可能会做什么:

    POST _bulk
     "index" :  "_index" : "my-index"  
    [FirehoseRecord.Data]
     "index" :  "_index" : "my-index"  
    [FirehoseRecord.Data]
     "index" :  "_index" : "my-index"  
    [FirehoseRecord.Data]

所以我所做的就是修改 one FirehoseRecord.Data 以拥有多行我的文档,每行由 "index" : ... 对象分隔:

const result = docsToElastic.reduce((accumulator, log, currentIndex) => 
    return accumulator +
        // Skip index at first time bcs. that is what Firehose does
        (currentIndex === 0 ? '' : ' "index" :  "_index" : "my-index" ' + '\n') +
        JSON.stringify(log) + '\n';
, '');

输出:

"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"
 "index" :  "_index" : "my-index"  
"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"
 "index" :  "_index" : "my-index"  
"foo":"bar","request_logged_at":"2017-02-07T15:13:01.39256Z"

注意第一个错误的 "index" : ...对象。这是因为 Firehose 在整个记录之前添加了第一个 "index" : ...

这已经过测试并且现在可以工作了。

【讨论】:

以上是关于AWS Firehose 到 Elastic Search - 将一条 Firehose 记录转换为多个 Elastic 条目的主要内容,如果未能解决你的问题,请参考以下文章

将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录

将镶木地板从 AWS Kinesis firehose 写入 AWS S3

AWS Kinesis Firehose 到 ElasticSearch 地理数据映射

AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose

从 fluentd 发送到 aws kinesis firehose 时数据丢失

AWS Firehose 到 S3 的内容前缀