如何扇出 AWS 运动流?

Posted

技术标签:

【中文标题】如何扇出 AWS 运动流?【英文标题】:How to fanout an AWS kinesis stream? 【发布时间】:2017-02-24 18:53:06 【问题描述】:

我想将输入 AWS Kinesis 流扇出/链接/复制到 N 个新 Kinesis 流,这样写入输入 Kinesis 的每条记录都会出现在 N 个流中的每一个中。

是否有 AWS 服务或开源解决方案

如果有现成的解决方案,我宁愿不编写代码来执行此操作。 AWS Kinesis firehose 无法解决,因为它无法输出到 kinesis。如果运行成本不会太高,也许是一个 AWS Lambda 解决方案?

【问题讨论】:

好奇为什么你觉得你需要做一个扇出? kinesis 流已经可以支持多个消费者从流的不同部分读取数据。 正如@E.J.Brennan 提到的,你为什么需要扇出? @E.J.Brennan 的确,kinesis 支持多个消费者,但它的全局限制为 5 次读取/秒。虽然每次读取都可以提取大量记录,但一旦您拥有超过 20 个消费者,您的延迟就会超过 4 秒。这对我的应用程序来说是不行的。查看更多:brandur.org/kinesis-in-production#five-reads 【参考方案1】:

有两种方法可以实现Amazon Kinesis 流的扇出

使用 Amazon Kinesis Analytics 将记录复制到其他流 触发 AWS Lambda 函数将记录复制到另一个流

选项 1:使用 Amazon Kinesis Analytics 扇出

您可以使用Amazon Kinesis Analytics 从现有流中生成新流。

来自Amazon Kinesis Analytics documentation:

Amazon Kinesis Analytics 应用程序持续实时读取和处理流数据。您使用 SQL 编写应用程序代码来处理传入的流数据并产生输出。然后,Amazon Kinesis Analytics 将输出写入配置的目标

Application Code 部分提到了扇出:

您还可以编写彼此独立运行的 SQL 查询。例如,您可以编写两条 SQL 语句来查询相同的应用程序内流,但将输出发送到不同的应用程序内流

我设法实现如下:

创建了三个流:输入、输出1、输出2 创建了两个 Amazon Kinesis Analytics 应用程序:copy1、copy2

Amazon Kinesis Analytics SQL 应用程序如下所示:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

此代码创建一个 pump(将其视为一个连续的 select 语句),它从 input 流中进行选择并输出到 output1 流。我创建了另一个输出到output2 流的相同应用程序。

为了测试,我将数据发送到input 流:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data=
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

我让它运行了一会儿,然后使用这段代码显示输出:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

输出是:

[u'"LOG":"Record 0"', u'"LOG":"Record 1"', u'"LOG":"Record 2"', u'"LOG":"Record 3"', u'"LOG":"Record 4"']

我再次运行 output2 并显示相同的输出。

选项 2:使用 AWS Lambda

如果您要扇出多个流,更有效的方法可能是创建 AWS Lambda 函数:

由 Amazon Kinesis 流记录触发 将记录写入到多个 Amazon Kinesis“输出”流

您甚至可以让 Lambda 函数根据命名约定自行发现输出流(例如,任何名为 app-output-* 的流)。

【讨论】:

您也可以使用相同的 Kinesis Analytics 应用程序并向其添加两个输出流 :)【参考方案2】:

Amazon 实验室有一个 github 存储库,提供了使用 lambda 的扇出。 https://github.com/awslabs/aws-lambda-fanout 。另请阅读 https://medium.com/retailmenot-engineering/building-a-high-throughput-data-pipeline-with-kinesis-lambda-and-dynamodb-7d78e992a02d 上的“将同步 Lambda 调用转换为异步调用”,这对于构建真正的异步处理至关重要。

【讨论】:

【参考方案3】:

有两种 AWS 原生解决方案可以扇出不需要 AWS Firehose 或 AWS Lambda 的 Kinesis 流。

    与 Kafka 消费者组类似,Kinesis 具有应用程序名称。流的每个使用者都可以提供唯一的应用程序名称。如果两个消费者具有相同的应用程序名称,则在它们之间分发消息。要扇出流,请为您希望从流中接收相同消息的消费者提供不同的应用程序名称。 Kinesis 将在后台创建新的 DynamoDB 表,以跟踪每个新应用程序的每个使用者,以便他们可以以不同的速率使用消息,等等。 使用Kinesis Enhanced Fan-Out 获得更高的吞吐量(高达每秒 2MiB),这不计入您的全局读取限制。在撰写本文时,每个流的“增强型扇出”消费者限制为 20 个。

据我所知,这两个选项的一个警告是您需要使用Kinesis Client Library (KCL)(而不是原始的AWS SDK)。

【讨论】:

以上是关于如何扇出 AWS 运动流?的主要内容,如果未能解决你的问题,请参考以下文章

如何在将记录异步放入运动流中时确保排序?

AWS RDS 到 AWS ES

I'AWS_PROXY' 目前仅支持 Lambda 函数和 Firehose 流调用

从 H.264 比特流中提取运动矢量 [关闭]

最小费用最大流解决KM匹配问题

来自 iOS 中摄像机实时流的运动检测