按事件时间对 Kinesis firehose S3 记录进行分区
Posted
技术标签:
【中文标题】按事件时间对 Kinesis firehose S3 记录进行分区【英文标题】:Partition Kinesis firehose S3 records by event time 【发布时间】:2017-06-27 01:03:24 【问题描述】:Firehose->S3 使用当前日期作为在 S3 中创建密钥的前缀。因此,这会在写入记录时对数据进行分区。我的 firehose 流包含具有特定事件时间的事件。
有没有办法创建包含此事件时间的 S3 密钥?下游的处理工具依赖于每个事件都在与实际发生时间相关的“小时文件夹”中。还是在 Firehose 完成后需要额外的处理步骤?
事件时间可以在分区键中,或者我可以使用 Lambda 函数从记录中解析它。
【问题讨论】:
【参考方案1】:Kinesis Firehose(尚)不允许客户端控制最终 S3 对象的日期后缀的生成方式。
您唯一的选择是在 Kinesis Firehose 之后添加一个后处理层。例如,您可以使用 Data Pipeline 安排每小时一次的 EMR 作业,该作业读取上一小时写入的所有文件并将它们发布到正确的 S3 目标。
【讨论】:
【参考方案2】:这不是问题的答案,但是我想稍微解释一下根据事件到达时间存储记录的想法。
首先介绍一下流。 Kinesis 只是一个数据流。它有一个消费的概念。只有通过顺序读取流,才能可靠地使用流。还有一个想法是将检查点作为暂停和恢复消费过程的机制。检查点只是一个序列号,用于标识流中的位置。通过指定这个数字,可以开始从某个事件读取流。
现在回到默认的 s3 firehose 设置... 由于 kinesis 流的容量非常有限,很可能需要将来自 kinesis 的数据存储在某个地方以供以后分析。 firehose to s3 setup 开箱即用。它只是将流中的原始数据存储到 s3 存储桶中。但从逻辑上讲,这些数据仍然是相同的记录流。为了能够可靠地消费(读取)这个流,需要这些序列号作为检查点。这些数字是记录到达时间。
如果我想按创建时间读取记录怎么办?看起来完成此任务的正确方法是顺序读取 s3 流,将其转储到某个 [时间序列] 数据库或数据仓库并对这个存储进行基于创建时间的读数。否则,在读取 s3(流)时,总会有非零的机会错过一些事件。所以我根本不建议重新排序 s3 存储桶。
【讨论】:
【参考方案3】:您需要进行一些后处理或编写自定义流式消费者(例如 Lambda)来执行此操作。
我们在我的公司处理了大量的事件,因此编写 Lambda 函数似乎不是一个很好的用钱方式。相反,我们发现使用 Athena 进行批处理是一个非常简单的解决方案。
首先,您将流式传输到 Athena 表 events
,可以选择为 partitioned by an arrival-time。
然后,您定义另一个 Athena 表,例如 events_by_event_time
,该表由您的事件的 event_time
属性分区,或者它已在架构中定义。
最后,您安排一个进程运行 Athena INSERT INTO 查询,该查询从 events
获取事件并自动将它们重新分区到 events_by_event_time
,现在您的事件由 event_time
分区,而无需 EMR、数据管道或任何其他基础设施。
您可以对事件的任何属性执行此操作。还值得注意的是,您可以创建一个对两个表执行UNION 的视图来查询实时和历史事件。
我实际上在blog post here 中写了更多关于此的内容。
【讨论】:
【参考方案4】:对于未来的读者 - Firehose 支持 Amazon S3 对象的自定义前缀
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html
【讨论】:
这没有回答问题。问题是关于“事件时间”,即事件中的时间字段。 Firehose 仅支持“处理时间”,请参阅:Kinesis Data Firehose uses the approximate arrival timestamp of the oldest record that's contained in the Amazon S3 object being written
。【参考方案5】:
AWS 于 2021 年 8 月开始提供“动态分区”:
Dynamic partitioning enables you to continuously partition streaming data in Kinesis Data Firehose by using keys within data (for example, customer_id or transaction_id) and then deliver the data grouped by these keys into corresponding Amazon Simple Storage Service (Amazon S3) prefixes.
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
【讨论】:
以上是关于按事件时间对 Kinesis firehose S3 记录进行分区的主要内容,如果未能解决你的问题,请参考以下文章
Kinesis Stream 和 Kinesis Firehose 更新 Elasticsearch 索引
使用 AWS kinesis-firehose 将数据写入文件
AWS Typescript CDK,尝试将 kinesis 流作为来源添加到 firehose
将 AWS Kinesis Firehose 回填到 Elasticsearch Service 失败记录