我可以自动将换行符附加到 AWS Firehose 记录吗?

Posted

技术标签:

【中文标题】我可以自动将换行符附加到 AWS Firehose 记录吗?【英文标题】:Can I automatically append newlines to AWS Firehose records? 【发布时间】:2017-10-30 00:02:01 【问题描述】:

我正在尝试使用以下设置配置 Kinesis Analytics 应用程序:

输入流是采用字符串化 JSON 值的 Kinesis Firehose SQL 是一个简单的透传(后面需要更复杂,但为了测试,它只是发送数据) 输出流是第二个 Kinesis Firehose,它将记录传送到 S3 存储桶

稍后,我将使用 Hive + JSONSERDE 导入 S3 存储桶的内容,它希望每个 JSON 记录都存在于自己的行中。 Firehose 输出只是附加了所有破坏 JSONSERDE 的 JSON 记录。

可以将 AWS Lambda 数据格式化程序附加到输出流,但这似乎很昂贵。我想要的只是使用换行符分割每条记录。

如果我不使用 Analytics 应用程序,我会将换行符附加到每个 Firehose 记录。在应用程序的 SQL 中没有办法做到这一点似乎很奇怪:

CREATE OR REPLACE STREAM "STREAM_OUT" (
  a VARCHAR(4),
  b VARCHAR(4),
  c VARCHAR(4)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "STREAM_OUT"
    SELECT STREAM
      "a",
      "b",
      "c"
    FROM "SOURCE_SQL_STREAM_001";

添加 Lambda 数据格式化程序是最佳答案吗?我真的很想避免这种情况。

【问题讨论】:

【参考方案1】:

我发布答案只是为了让问题与最近的 AWS 公告保持同步。 AWS 最近宣布在 Kinesis Firehose 传输流上提供动态分区。它支持为每条记录添加换行符。更多信息请查看this和this。

【讨论】:

【参考方案2】:

使用 Python 或 Node.js 的解决方案

我正在使用 DynamoDB Streams,我需要将这些记录保存到 S3 中。我实现了 Kinesis Firehose 流和 Lambda 函数。这适用于将我的记录作为 JSON 字符串放入 S3,但是,保存到 S3 中文件的每条记录都是内联的,即在一个连续的行中,因此我需要在每条记录的末尾添加一个新行添加它以便每条记录都在自己的行中。对于我的解决方案,我最终不得不进行一些 base64 解码/编码。

我是这样做的:

    创建 Kinesis Firehose 流时,启用“转换 使用 AWS Lambda 的源记录”(选择“启用”)。如果您已经创建了流,您仍然可以通过编辑现有流来启用此功能。 此时,您需要选择另一个执行此转换的 Lambda 函数。就我而言,我需要 在每条记录的末尾添加一个新行,这样当我在文本编辑器中打开文件并查看它时,每个条目都位于单独的行中。

下面是我用于第二个 Lambda 的 Python 和 Node.js 的测试解决方案代码:

添加换行符的Python解决方案:

import json
import boto3
import base64

output = []

def lambda_handler(event, context):
    
    for record in event['records']:
        payload = base64.b64decode(record['data']).decode('utf-8')
        print('payload:', payload)
        
        row_w_newline = payload + "\n"
        print('row_w_newline type:', type(row_w_newline))
        row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
        
        output_record = 
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': row_w_newline
        
        output.append(output_record)

    print('Processed  records.'.format(len(event['records'])))
    
    return 'records': output

Node.js 添加换行符的解决方案:

'use strict';
console.log('Loading function');

exports.handler = (event, context, callback) => 

   
    /* Process the list of records and transform them */
    const output = event.records.map((record) => 
        
        let entry = (new Buffer(record.data, 'base64')).toString('utf8');
        let result = entry + "\n"
        const payload = (new Buffer(result, 'utf8')).toString('base64');
            
            return 
                recordId: record.recordId,
                result: 'Ok',
                data: payload,
            ;
            
    );
    console.log(`Processing completed.  Successful records $output.length.`);
    callback(null,  records: output );
;

一些很好的参考资料帮助我将 Python 版本拼凑在一起:

https://www.youtube.com/watch?v=wRGd2G82Opo&t=242s https://www.youtube.com/watch?v=6_03i26_DrQ

在上面的原始问题中,MrHen 想在不使用第二个 Lambda 的情况下做到这一点。我能够在第一个 Lambda 中实现这一点,而不是使用 Kinesis Firehose 转换源记录功能。我通过从 DynamoDB 中获取 newImage 并按以下顺序执行此操作:编码、解码、添加新行 ("\n")、编码、解码。可能有一种更清洁的方法。我选择使用第二个 Lambda 函数来使用转换源记录功能,因为此时它对我来说似乎更干净。

就我而言,单个 Lambda 解决方案如下所示:

 # Not pretty, but it works! Successfully adds new line to record.
 # newImage comes from the DynamoDB Stream as a Python dictionary object,
 # I convert it to a string before running the code below.

    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')
    newImage = newImage + "\n"
    newImage = base64.b64encode(newImage.encode('utf-8'))
    newImage = base64.b64decode(newImage).decode('utf-8')

【讨论】:

【参考方案3】:

我有类似的要求,要向 firehose 生成的文件添加新行,在我们的应用程序中,firehose 是通过 API Gateway 调用的。

这在集成请求部分下的正文映射模板中指定。

API 网关中的以下命令为 kinesis firehose 记录生成新行。

方法一:

    #set($payload="$input.path('$.Record.Data')
")
        
            "DeliveryStreamName": "$input.path('$.DeliveryStreamName')",
            "Record": 
            "Data": "$util.base64Encode($payload)"
        
        

如果您通过 API 网关调用 firehose,这将非常有效。

感谢和问候, 斯里维涅什 KN

【讨论】:

这适用于 Firehose,但不适用于 Analytics 应用程序。 Analytics 应用程序在其输出中去除了换行符。 您可以尝试在 firehose 中进行数据转换并使用 lambda 函数添​​加新行,然后让 kinesis 将其传递给 S3。 我在问题中提到了这一点。我宁愿不为此添加 Lambda 转换器。 你有没有找到其他方法来添加新的行分隔符?上周五,AWS 向某些区域发布了新的更新,破坏了您的解决方案。现在添加 Cg== 没有帮助。如果您添加 Cg==,它现在在大多数情况下会引发 SerializationException。例如,此代码不再起作用: "DeliveryStreamName": "fus-bear-csv-dev", "Records": [ "Data": "$util.base64Encode('a')Cg==" ] 转换后的方法响应体:"__type":"SerializationException" 感谢您的告知,您能说出您面对的是哪个地区吗,我会尝试并发布替代方案。这在美国东部和美国西部地区非常适合我。【参考方案4】:

这里是我们实现方式的一个基本示例。我们使用 javascript 将记录放入 Kinesis Stream,并使用 Firehose 通过 gzip 压缩重定向到 s3 位置。稍后 athena 将从 s3 位置查询以从 s3 获取记录。

以下代码用于在使用 javascript 代码发送到 Kinesis Stream 之前添加新行。

var payload = JSON.parse(payload);  
finalData = JSON.stringify(payload)+"\n";

var kinesisPayload = ;    
kinesisPayload.Data = finalData;    
kinesisPayload.StreamName = "kinesisStreamName");    
kinesisPayload.PartitionKey = "124";

【讨论】:

我猜这不适用于分析应用程序。向原始记录添加换行符并不难,但分析应用程序在将它们发送到 Firehose 之前将它们剥离。

以上是关于我可以自动将换行符附加到 AWS Firehose 记录吗?的主要内容,如果未能解决你的问题,请参考以下文章

加载文件后如何清理 AWS Firehose 使用的 S3 文件?

将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录

AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose

将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录

AWS Firehose 到 S3 的内容前缀

使用 AWS kinesis-firehose 将数据写入文件