我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?
Posted
技术标签:
【中文标题】我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?【英文标题】:Can I customize partitioning in Kinesis Firehose before delivering to S3? 【发布时间】:2018-12-21 04:11:27 【问题描述】:我有一个 Firehose 流,旨在从不同来源和不同事件类型中提取数百万个事件。流应将所有数据传送到一个 S3 存储桶,作为原始\未更改数据的存储。
我正在考虑根据嵌入在事件消息中的元数据(如事件源、事件类型和事件日期)在 S3 中对这些数据进行分区。
但是,Firehose 遵循基于记录到达时间的默认分区。是否可以自定义此分区行为以满足我的需求?
更新:已接受的答案已更新为新答案表明该功能自 2021 年 9 月起可用
【问题讨论】:
类似:Partitioning AWS Kinesis Firehose data to s3 by payload @JohnRotenstein 不幸的是,答案并没有解决这个问题。两者都建议附加一个 lambda 函数,该函数将基于特定 ID 将传入数据路由到不同的流。这个问题和另一个问题是解决是否可以定义消防软管的分区方法。不过还是谢谢你的参考!! 【参考方案1】:自 2021 年 9 月 1 日起,AWS Kinesis Firehose 支持此功能。阅读the announcement blog post here。
来自文档:
您可以使用 Key 和 Value 字段来指定要用作动态分区键和 jq 查询的数据记录参数以生成动态分区键值。 ...
从 UI 看是这样的:
【讨论】:
这太棒了!【参考方案2】:没有。您不能根据事件内容进行“分区”。
一些选项是:
发送到单独的 Firehose 流 发送到 Kinesis Data Stream(而不是 Firehose)并编写您自己的自定义 Lambda 函数来处理和保存数据(请参阅:AWS Developer Forums: Athena and Kinesis Firehose) 使用 Kinesis Analytics 处理消息并将其“定向”到不同的 Firehose 流如果您打算将输出与 Amazon Athena 或 Amazon EMR 一起使用,您还可以考虑将其转换为 Parquet 格式,其中包含 much better performance。这需要将 S3 中的数据作为批处理进行后处理,而不是在数据到达流时对其进行转换。
【讨论】:
【参考方案3】:根据 John 的回答,如果您没有近乎实时的流式传输要求,我们发现使用 Athena 进行批处理对我们来说是一个简单的解决方案。
Kinesis 流到给定表 unpartitioned_event_data
,它可以使用 native record arrival time partitioning。
我们定义了另一个 Athena 表 partitioned_event_table
,可以使用自定义分区键进行定义,并利用 Athena 拥有的 INSERT INTO 功能。 Athena 将自动以您想要的格式重新分区您的数据,而无需任何自定义使用者或新的基础架构来管理。这可以使用 cron、SNS 或 Airflow 之类的东西来安排。
很酷的是,您可以创建一个视图,对两个表执行UNION,以便在一个地方查询历史和实时数据。
我们实际上在 Radar 和 talk about more trade-offs in this blog post 处理过这个问题。
【讨论】:
【参考方案4】:在撰写本文时,Vlad 提到的动态分区功能仍然很新。我需要它成为 CloudFormation 模板的一部分,该模板仍未正确记录。我必须添加DynamicPartitioningConfiguration
才能使其正常工作。 MetadataExtractionQuery
语法也没有正确记录。
MyKinesisFirehoseStream:
Type: AWS::KinesisFirehose::DeliveryStream
...
Properties:
ExtendedS3DestinationConfiguration:
Prefix: "clients/client_id=!client_id/dt=!timestamp:yyyy-MM-dd/"
ErrorOutputPrefix: "errors/!firehose:error-output-type/"
DynamicPartitioningConfiguration:
Enabled: "true"
RetryOptions:
DurationInSeconds: "300"
ProcessingConfiguration:
Enabled: "true"
Processors:
- Type: AppendDelimiterToRecord
- Type: MetadataExtraction
Parameters:
- ParameterName: MetadataExtractionQuery
ParameterValue: "client_id:.client_id"
- ParameterName: JsonParsingEngine
ParameterValue: JQ-1.6
【讨论】:
【参考方案5】:为了扩展 Murali 的答案,我们在 CDK 中实现了它:
我们传入的 json 数据如下所示:
"data":
"timestamp":1633521266990,
"defaultTopic":"Topic",
"data":
"OUT1":"Inactive",
"Current_mA":3.92
CDK 代码如下:
const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream',
deliveryStreamName: 'deliverystream',
extendedS3DestinationConfiguration:
cloudWatchLoggingOptions:
enabled: true,
,
bucketArn: Bucket.bucketArn,
roleArn: deliveryStreamRole.roleArn,
prefix: 'defaultTopic=!partitionKeyFromQuery:defaultTopic/!timestamp:yyyy/MM/dd/',
errorOutputPrefix: 'error/!firehose:error-output-type/',
bufferingHints:
intervalInSeconds: 60,
,
dynamicPartitioningConfiguration:
enabled: true,
,
processingConfiguration:
enabled: true,
processors: [
type: 'MetadataExtraction',
parameters: [
parameterName: 'MetadataExtractionQuery',
parameterValue: 'Topic: .data.defaultTopic',
,
parameterName: 'JsonParsingEngine',
parameterValue: 'JQ-1.6',
,
],
,
type: 'AppendDelimiterToRecord',
parameters: [
parameterName: 'Delimiter',
parameterValue: '\\n',
,
],
,
],
,
,
)
【讨论】:
你知道如何将 2 个字段用作 2 个单独的值吗?以上是关于我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?的主要内容,如果未能解决你的问题,请参考以下文章
使用 Lambda 或 Kinesis Firehose 将 CloudWatch Logs 传输到 S3?
Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号
Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号
python 一个简短的Lambda函数可以发送CloudWatch Logs(在Flow Logs的情况下)并将它们发送到Kinesis Firehose以便在S3中存储。满满的
python 一个简短的Lambda函数可以发送CloudWatch Logs(在Flow Logs的情况下)并将它们发送到Kinesis Firehose以便在S3中存储。满满的