如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储
Posted
技术标签:
【中文标题】如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储【英文标题】:How to store Kinesis stream to S3 storage in specific folder structure within S3 bucket 【发布时间】:2014-08-05 13:58:18 【问题描述】:我有 Kinesis Stream 捕获的事件。我想将所有事件放在 S3 上的特定文件夹结构中。我想制作一个带有日期戳的文件夹,就像 6 月 15 日的所有事件都应该进入该文件夹,而 6 月 16 日之后的新文件夹应该来选择事件等等。
作为 Kinesis 的新手,我只是使用文档,我发现有一个连接器框架,其中 S3Emitter 与配置一起使用以选择需要发出数据的 S3 位置。但是有人可以建议我如何维护用于在日期明智的文件夹中捕获事件日期的文件夹结构?
【问题讨论】:
【参考方案1】:我找到了解决此问题的方法,并在此处发布了答案: https://github.com/awslabs/amazon-kinesis-connectors/issues/24
这里又是答案:
通过对示例代码进行以下更改很容易实现:
在 S3sample.properties 中:
createS3Bucket = true
在 S3Emitter.java 中:
/* Add the required imports */
import java.text.SimpleDateFormat;
import java.util.Calendar;
public class S3Emitter implements IEmitter
//create date_bucket variable
protected final String date_bucket = new SimpleDateFormat("yyyy_MM_dd_HH").format(Calendar.getInstance().getTime());
public S3Emitter(KinesisConnectorConfiguration configuration)
s3Bucket = configuration.S3_BUCKET + "/" + date_bucket;
希望这会有所帮助!
【讨论】:
【参考方案2】:不幸的是,您正在寻找的功能在 S3Emitter 中对于 Amazon Kinesis 不可用,它只是作为一个缓冲区工作,根据输入数据量刷新,请参阅相应的。 comment:
IEmitter 的这个实现用于存储来自 Kinesis 的文件 在 S3 中流式传输。 [...] 当缓冲区满时,这个类的 emit 方法 将缓冲区的内容作为一个文件添加到 S3。 文件名是 从记录的第一个和最后一个序列号生成 包含在由破折号分隔的文件中。 [...] [强调我的]
此外,Kinesis 没有事件的第一级日期概念(resp。数据记录),而仅处理序列号,因此您需要添加相应的日期。应用程序级别的日期处理,请参阅Amazon Kinesis Terminology 中的数据记录部分:
数据记录是存储在 Amazon Kinesis 流中的数据单元。 数据记录由序列号、分区键和数据 blob 组成,这是一个未解释的、不可变的字节序列。 Amazon Kinesis 服务不会以任何方式检查、解释或更改 blob 中的数据。 [...] [强调我的]
【讨论】:
【参考方案3】:自 2014 年以来,AWS 提供了新的解决方案。尤其是 Kinesis Firehose,它可以很好地完成这项工作。 您只需使用this lambda 将数据从 Kinesis 流发送到 Kinesis Firehose,然后单击几下即可创建 Firehose。
【讨论】:
以上是关于如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储的主要内容,如果未能解决你的问题,请参考以下文章
如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?
如何将旧 S3 存储桶中的 Terraform 状态移动到新的 S3 存储桶?
如何在不下载文件的情况下搜索amazon S3存储桶中的文件内容