我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?

Posted

技术标签:

【中文标题】我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?【英文标题】:Can I customize partitioning in Kinesis Firehose before delivering to S3? 【发布时间】:2018-12-21 04:11:27 【问题描述】:

我有一个 Firehose 流,旨在从不同来源和不同事件类型中提取数百万个事件。流应将所有数据传送到一个 S3 存储桶,作为原始\未更改数据的存储。

我正在考虑根据嵌入在事件消息中的元数据(如事件源、事件类型和事件日期)在 S3 中对这些数据进行分区。

但是,Firehose 遵循基于记录到达时间的默认分区。是否可以自定义此分区行为以满足我的需求?

更新:已接受的答案已更新为新答案表明该功能自 2021 年 9 月起可用

【问题讨论】:

类似:Partitioning AWS Kinesis Firehose data to s3 by payload @JohnRotenstein 不幸的是,答案并没有解决这个问题。两者都建议附加一个 lambda 函数,该函数将基于特定 ID 将传入数据路由到不同的流。这个问题和另一个问题是解决是否可以定义消防软管的分区方法。不过还是谢谢你的参考!! 【参考方案1】:

自 2021 年 9 月 1 日起,AWS Kinesis Firehose 支持此功能。阅读the announcement blog post here。

来自文档:

您可以使用 Key 和 Value 字段来指定要用作动态分区键和 jq 查询的数据记录参数以生成动态分区键值。 ...

从 UI 看是这样的:

【讨论】:

这太棒了!【参考方案2】:

没有。您不能根据事件内容进行“分区”。

一些选项是:

发送到单独的 Firehose 流 发送到 Kinesis Data Stream(而不是 Firehose)并编写您自己的自定义 Lambda 函数来处理和保存数据(请参阅:AWS Developer Forums: Athena and Kinesis Firehose) 使用 Kinesis Analytics 处理消息并将其“定向”到不同的 Firehose 流

如果您打算将输出与 Amazon Athena 或 Amazon EMR 一起使用,您还可以考虑将其转换为 Parquet 格式,其中包含 much better performance。这需要将 S3 中的数据作为批处理进行后处理,而不是在数据到达流时对其进行转换。

【讨论】:

【参考方案3】:

根据 John 的回答,如果您没有近乎实时的流式传输要求,我们发现使用 Athena 进行批处理对我们来说是一个简单的解决方案。

Kinesis 流到给定表 unpartitioned_event_data,它可以使用 native record arrival time partitioning。

我们定义了另一个 Athena 表 partitioned_event_table,可以使用自定义分区键进行定义,并利用 Athena 拥有的 INSERT INTO 功能。 Athena 将自动以您想要的格式重新分区您的数据,而无需任何自定义使用者或新的基础架构来管理。这可以使用 cron、SNS 或 Airflow 之类的东西来安排。

很酷的是,您可以创建一个视图,对两个表执行UNION,以便在一个地方查询历史和实时数据。

我们实际上在 Radar 和 talk about more trade-offs in this blog post 处理过这个问题。

【讨论】:

【参考方案4】:

在撰写本文时,Vlad 提到的动态分区功能仍然很新。我需要它成为 CloudFormation 模板的一部分,该模板仍未正确记录。我必须添加DynamicPartitioningConfiguration 才能使其正常工作。 MetadataExtractionQuery 语法也没有正确记录。

  MyKinesisFirehoseStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    ...
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "clients/client_id=!client_id/dt=!timestamp:yyyy-MM-dd/"
        ErrorOutputPrefix: "errors/!firehose:error-output-type/"
        DynamicPartitioningConfiguration:
          Enabled: "true"
          RetryOptions:
            DurationInSeconds: "300"
        ProcessingConfiguration:
          Enabled: "true"
          Processors:
            - Type: AppendDelimiterToRecord
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: "client_id:.client_id"
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6

【讨论】:

【参考方案5】:

为了扩展 Murali 的答案,我们在 CDK 中实现了它:

我们传入的 json 数据如下所示:


    "data": 
        
        "timestamp":1633521266990,
        "defaultTopic":"Topic",
        "data":
        
            "OUT1":"Inactive",
            "Current_mA":3.92
        
    

CDK 代码如下:

const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', 
  deliveryStreamName: 'deliverystream',
  extendedS3DestinationConfiguration: 
    cloudWatchLoggingOptions: 
      enabled: true,
    ,
    bucketArn: Bucket.bucketArn,
    roleArn: deliveryStreamRole.roleArn,
    prefix: 'defaultTopic=!partitionKeyFromQuery:defaultTopic/!timestamp:yyyy/MM/dd/',
    errorOutputPrefix: 'error/!firehose:error-output-type/',
    bufferingHints: 
      intervalInSeconds: 60,
    ,
    dynamicPartitioningConfiguration: 
      enabled: true,
    ,
    processingConfiguration: 
      enabled: true,
      processors: [
        
          type: 'MetadataExtraction',
          parameters: [
            
              parameterName: 'MetadataExtractionQuery',
              parameterValue: 'Topic: .data.defaultTopic',
            ,
            
              parameterName: 'JsonParsingEngine',
              parameterValue: 'JQ-1.6',
            ,
          ],
        ,
        
          type: 'AppendDelimiterToRecord',
          parameters: [
            
              parameterName: 'Delimiter',
              parameterValue: '\\n',
            ,
          ],
        ,
      ],
    ,
  ,
)

【讨论】:

你知道如何将 2 个字段用作 2 个单独的值吗?

以上是关于我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Lambda 或 Kinesis Firehose 将 CloudWatch Logs 传输到 S3?

Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号

Kinesis Firehose 将 JSON 对象放入 S3 中,没有分隔符逗号

python 一个简短的Lambda函数可以发送CloudWatch Logs(在Flow Logs的情况下)并将它们发送到Kinesis Firehose以便在S3中存储。满满的

python 一个简短的Lambda函数可以发送CloudWatch Logs(在Flow Logs的情况下)并将它们发送到Kinesis Firehose以便在S3中存储。满满的

如何将 Kinesis 流存储到 S3 存储桶中特定文件夹结构中的 S3 存储