Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号

Posted

技术标签:

【中文标题】Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号【英文标题】:Kinesis Firehose putting JSON objects in S3 without seperator comma 【发布时间】:2021-11-22 07:39:30 【问题描述】:

在发送数据之前,我使用 JSON.stringify 到数据,它看起来像这样

"data": ["key1": value1, "key2": value2, "key1": value1, "key2": value2]

但是一旦它通过 AWS API Gateway 并且 Kinesis Firehose 将它放到 S3 中,它看起来像这样

    
     "key1": value1, 
     "key2": value2
    
     "key1": value1, 
     "key2": value2
    

JSON 对象之间的分隔符没有了,但我需要它来正确处理数据。

API 网关中的模板:

#set($root = $input.path('$'))

    "DeliveryStreamName": "some-delivery-stream",
    "Records": [
#foreach($r in $root.data)
#set($data = "
    ""key1"": ""$r.value1"",
    ""key2"": ""$r.value2""
")
    
        "Data": "$util.base64Encode($data)"
    #if($foreach.hasNext),#end
#end
    ]

【问题讨论】:

【参考方案1】:

我最近遇到了同样的问题,我能找到的唯一答案基本上只是在您将每条 JSON 消息发布到 Kinesis 流时添加换行符(“\n”),或者使用某种原始 JSON 解码器方法,该方法可以处理连接的 JSON 对象而无需分隔符。

我发布了一个 Python 代码解决方案,可以在相关的 Stack Overflow 帖子中找到该解决方案: https://***.com/a/49417680/1546785

【讨论】:

【参考方案2】:

一旦 AWS Firehose 将 JSON 对象转储到 s3,就完全可以从文件中读取单个 JSON 对象。

使用 Python,您可以使用 json 包中的 raw_decode 函数

from json import JSONDecoder, JSONDecodeError
import re
import json
import boto3

NOT_WHITESPACE = re.compile(r'[^\s]')

def decode_stacked(document, pos=0, decoder=JSONDecoder()):
    while True:
        match = NOT_WHITESPACE.search(document, pos)
        if not match:
            return
        pos = match.start()

        try:
            obj, pos = decoder.raw_decode(document, pos)
        except JSONDecodeError:
            # do something sensible if there's some error
            raise
        yield obj

s3 = boto3.resource('s3')

obj = s3.Object("my-bukcet", "my-firehose-json-key.json")
file_content = obj.get()['Body'].read()
for obj in decode_stacked(file_content):
    print(json.dumps(obj))
    #   "key1":value1,"key2":value2
    #   "key1":value1,"key2":value2

来源:https://***.com/a/50384432/1771155

使用 Glue / Pyspark 你可以使用

import json

rdd = sc.textFile("s3a://my-bucket/my-firehose-file-containing-json-objects")
df = rdd.map(lambda x: json.loads(x)).toDF()
df.show()

来源:https://***.com/a/62984450/1771155

【讨论】:

【参考方案3】:

您可以考虑的一种方法是通过添加 Lambda 函数作为其数据处理器来为 Kinesis Firehose 传输流配置数据处理,该函数将在最终将数据传输到 S3 存储桶之前执行。

DeliveryStream:
  ...
  Type: AWS::KinesisFirehose::DeliveryStream
  Properties:
    DeliveryStreamType: DirectPut
    ExtendedS3DestinationConfiguration:
      ...
      BucketARN: !GetAtt MyDeliveryBucket.Arn
      ProcessingConfiguration:
        Enabled: true
        Processors:
          - Parameters:
              - ParameterName: LambdaArn
                ParameterValue: !GetAtt MyTransformDataLambdaFunction.Arn
            Type: Lambda
    ...

并且在 Lambda 函数中,确保将 '\n' 附加到记录的 JSON 字符串中,请参见下面 Node.js 中的 Lambda 函数 myTransformData.ts

import 
  FirehoseTransformationEvent,
  FirehoseTransformationEventRecord,
  FirehoseTransformationHandler,
  FirehoseTransformationResult,
  FirehoseTransformationResultRecord,
 from 'aws-lambda';

const createDroppedRecord = (
  recordId: string
): FirehoseTransformationResultRecord => 
  return 
    recordId,
    result: 'Dropped',
    data: Buffer.from('').toString('base64'),
  ;
;

const processData = (
  payloadStr: string,
  record: FirehoseTransformationEventRecord
) => 
  let jsonRecord;
  // ...
  // Process the orginal payload,
  // And create the record in JSON
  return jsonRecord;
;

const transformRecord = (
  record: FirehoseTransformationEventRecord
): FirehoseTransformationResultRecord => 
  try 
    const payloadStr = Buffer.from(record.data, 'base64').toString();
    const jsonRecord = processData(payloadStr, record);
    if (!jsonRecord) 
      console.error('Error creating json record');
      return createDroppedRecord(record.recordId);
    
    return 
      recordId: record.recordId,
      result: 'Ok',
      // Ensure that '\n' is appended to the record's JSON string.
      data: Buffer.from(JSON.stringify(jsonRecord) + '\n').toString('base64'),
    ;
   catch (error) 
    console.error('Error processing record $record.recordId: ', error);
    return createDroppedRecord(record.recordId);
  
;

const transformRecords = (
  event: FirehoseTransformationEvent
): FirehoseTransformationResult => 
  let records: FirehoseTransformationResultRecord[] = [];
  for (const record of event.records) 
    const transformed = transformRecord(record);
    records.push(transformed);
  
  return  records ;
;

export const handler: FirehoseTransformationHandler = async (
  event,
  _context
) => 
  const transformed = transformRecords(event);
  return transformed;
;

换行分隔符到位后,Athena 等 AWS 服务将能够正常处理 S3 存储桶中的 JSON 记录数据,而不是 just seeing the first JSON record only。

【讨论】:

以上是关于Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号的主要内容,如果未能解决你的问题,请参考以下文章

将镶木地板从 AWS Kinesis firehose 写入 AWS S3

按事件时间对 Kinesis firehose S3 记录进行分区

使用 AWS kinesis-firehose 将数据写入文件

AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose

将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录

将AWS Kinesis Firehose回填到Elasticsearch Service失败的记录