使用适用于 Ruby 的 AWS 开发工具包时在 AWS Kinesis Firehose 记录之间插入换行符

Posted

技术标签:

【中文标题】使用适用于 Ruby 的 AWS 开发工具包时在 AWS Kinesis Firehose 记录之间插入换行符【英文标题】:Insert newlines between AWS Kinesis Firehose records when using the AWS SDK for Ruby 【发布时间】:2021-12-30 16:49:59 【问题描述】:

我有一个通过 S3 将数据发送到 Redshift 的 AWS Kinesis Firehose。 我希望在使用put_record_batch 发送的记录之间出现换行符。目前我的代码如下所示:

records = [ id: 1, value: "foo" ,  id: 2, value: "bar" ]
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record_batch(
  delivery_stream_name: "my_firehose",
  records: records
)

最终在 S3 中的记录如下所示:

"id":1,"value":"foo""id":2,"value":"bar"

我希望 S3 文件看起来像这样:

"id":1,"value":"foo"
"id":2,"value":"bar"

这将使在必要时手动解析文件变得更加容易(例如,如果我们需要调试为什么数据没有从 S3 传输到 Redshift)。

put_record 的解决方案很简单:将数据转换为 JSON 并添加换行符:

record =  id: 1, value: "foo" 
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record(
  delivery_stream_name: "my_firehose",
  data: record.to_json << "\n"
)

我尝试用put_record_batch做类似的事情:

records = [ id: 1, value: "foo",  id: 2, value: "bar" ]
json_records = records.map  |record| record.to_json << "\n" 
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record_batch(
  delivery_stream_name: "my_firehose",
  records: json_records
)

但这导致了错误:

ArgumentError: parameter validator found 2 errors:
  - expected params[:records][0] to be a hash, got value "\"id\":1,\"value\":\"foo\"\n" (class: String) instead.
  - expected params[:records][1] to be a hash, got value "\"id\":2,\"value\":\"bar\"\n" (class: String) instead.
from /mnt/istore/apps/my_app/shared/bundle/ruby/2.7.0/gems/aws-sdk-core-3.89.1/lib/aws-sdk-core/param_validator.rb:33:in `validate!'

看来我们需要发送一个哈希。

put_record_batch 的文档说:

Kinesis Data Firehose 在将记录传送到目的地之前对其进行缓冲。为了消除目标数据 blob 的歧义,一种常见的解决方案是在数据中使用分隔符,例如换行符 (\n) 或数据中唯一的一些其他字符。这允许消费者应用程序在从目标读取数据时解析单个数据项。

我该怎么做?

我正在使用 aws-sdk-firehose gem 的 version 1.26.0。

【问题讨论】:

类似问题:***.com/questions/48226472/… 【参考方案1】:

我认为问题在于我在使用 put_record_batch 时忽略了 data 键。这似乎有效:

records = [ id: 1, value: "foo",  id: 2, value: "bar" ]
json_records = records.map do |record|
  # Previously this line was `record.to_json << "\n"`
   data: record.to_json << "\n" 
end
Aws::Firehose::Client.new(
  region: "us-east-1"
).put_record_batch(
  delivery_stream_name: "my_firehose",
  records: json_records
)

【讨论】:

以上是关于使用适用于 Ruby 的 AWS 开发工具包时在 AWS Kinesis Firehose 记录之间插入换行符的主要内容,如果未能解决你的问题,请参考以下文章

适用于 AWS Lambda 的本地开发服务器

ruby 适用于AWS的SD

从适用于 PHP 的 AWS 开发工具包中提取受保护的请求响应

使用适用于 Node.js 的 AWS 开发工具包将二进制文件上传到 S3

如何使用适用于 DynamoDb 的 AWS Rust 开发工具包编写惯用的 Rust 错误处理?

使用适用于Ruby on Rails的AWS CI / CI流程进行部署