AWS Kinesis ShardIteratorType TRIM_HORIZON 的预期行为

Posted

技术标签:

【中文标题】AWS Kinesis ShardIteratorType TRIM_HORIZON 的预期行为【英文标题】:Expected behavior for AWS Kinesis ShardIteratorType TRIM_HORIZON 【发布时间】:2015-12-28 01:29:52 【问题描述】:

上下文:我不一定指的是基于 KCL 的应用程序,只是纯粹的 Kinesis API 调用。

使用TRIM_HORIZON 分片迭代器类型是否会立即为您提供流中最早发布的记录(即在 Kinesis 的内置 24 小时窗口内最早可用的记录),或者只是一段时间内的迭代器/游标24 小时前,您必须使用它沿着溪流前进,直到您达到最早发布的记录?

换一种说法,以防万一不是很清楚......

当使用TRIM_HORIZON 的分片迭代器类型时,预期的行为是它会从返回 24 小时前可用的记录开始,但如果零记录是在 24 小时前发布的,而不是仅在 3 小时前发布,您的应用程序需要在前 21 小时内迭代轮询,然后才能到达 3 小时前发布的记录?

时间线示例:

    9 月 29 日上午 5:00 - 创建一个包含 1 个分片的流“foo” 9 月 29 日上午 5:02 - 将单个记录“Item=A”发布到“foo”流 9 月 29 日上午 5:03 - 发出 GetShardIterator 调用,将 TRIM_HORIZON 作为您的分片迭代器类型,然后使用该分片迭代器发出 GetRecords 调用并接收记录“Item=A” 9 月 30 日上午 7:02 - 将第二条记录“Item=B”发布到“foo”流 9 月 30 日上午 7:03 - 发出 GetShardIterator 调用,将 TRIM_HORIZON 作为您的分片迭代器类型,然后使用该分片迭代器发出 GetRecords 调用。 此调用的结果应该是什么? (注意:我们没有记住/重复使用第 3 步中的分片迭代器)

对于上面的第 5 步,“Item=A”消息在流上发布已超过 24 小时,而“Item=B”发布仅一分钟。带有TRIM_HORIZON 的新分片迭代器会立即为您提供最早的可用记录,还是您需要继续迭代,直到您遇到某个已发布内容的时间段?

我一直在试验 Kinesis,昨天或两天前一切正常(即,我在发布和消费时没有任何问题)。我对我的代码进行了一些额外的修改,并于今天再次开始发布。当我启动我的消费者时,即使让它运行了几分钟,也没有任何东西出现。我尝试同时发布和消费,但仍然没有。在手动使用 AFTER_SEQUENCE_NUMBER 迭代器类型并使用几天前我的消费者日志中的一些序列号之后,我能够访问我最近发布的消息。但是,如果我重新使用 TRIM_HORIZON 类型,我根本看不到任何消息。

我查看了docs,但我发现的大多数文档都假设您使用的是 KCL(我实际上最初使用的是 KCL,但当它开始失败时,我会退回到原始 API 调用)并提到您必须具有应用程序名称并且 DynamoDB 表用于跟踪状态。如果您使用的是纯 Kinesis API 调用或 Kinesis CLI(我最终都尝试过),那么我可以说这不是真的。我终于编写了一个纯 API 脚本,从 TRIM_HORIZON 开始并无限轮询,最终达到了新的记录(大约需要 600 次迭代;在“现在”之后 14 小时开始,在“现在”之后大约 5 小时发现记录)。如果这是预期的行为,那么 wording in the docs 似乎有点令人困惑/误导:

TRIM_HORIZON - 从分片中最后一条未修剪的记录开始读取 在系统中,这是分片中最旧的数据记录。

我假设(现在看来不正确)术语“最旧的数据记录”是指我已发布到流中的记录,而不仅仅是流中的时间段。

如果有人可以帮助确认/解释我所看到的行为,那就太好了。

谢谢!

【问题讨论】:

【参考方案1】:

它位于 TRIM HORIZON,或流 TRIMming 发生的 HORIZON。

分片迭代器在调用时可能会得到 0 条记录,因此您需要不断迭代以到达最旧记录所在的区域(如果您不经常推送到流或有时间间隔)。 getRecords 将为您提供下一个可用于迭代的分片迭代器。

来自文档: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html

如果分片中没有可用的记录 迭代器指向,GetRecords 返回一个空列表。请注意,它 可能需要多次调用才能到达分片的一部分 包含记录。

【讨论】:

所以即使有新记录可用,GetRecords 返回 0 条记录也是正常/预期的行为?是什么决定了中断发生的位置? Kinesis 有一个 24 小时的窗口,但分片迭代器并不总是在 24 小时后开始。在我的测试中,它落后了 14 小时,但那里没有记录。那 14 小时的意义何在? 免责声明:我不知道 Kinesis 在内部是如何工作的。我正在根据文档和观察者的行为进行猜测。答:是的,你看到的行为是我也看到的。有时通过分片迭代器得到 0 条记录是正常的。我猜测 Kinesis 在内部保留了一个 shardIterator id 的映射来记录序列号,并且随着修剪水平的进展,它会回收这些序列号。还要猜测这取决于回收发生的时间以及它是以懒惰的方式完成的 还会猜测,当您查找序列号的内容时,它还使用 sharditerator id->sequence 映射来快速查找您的数据,然后遍历指向的记录分片 id 并找到您的序列。 感谢您确认看到类似行为。在使用 TRIM_HORIZON 类型时,您是否注意到 MillisBehindLatest 值的延迟有任何趋势/一致性?我很想知道这背后的细节。【参考方案2】:

TRIM_HORIZON 给出流中最旧的记录。

只是有时将 TRIM_HORIZON 作为 shard_iterator_type :-

 Suppose the value of "millis_behind_latest" in the kinesis response is ~86399000 & your stream retention period is 24 hours(86400000) 

当您使用 shard_iterator 检索记录时,由于已超过记录的保留期,该记录不再在流中。因此,您会得到一个空结果,因为最旧的记录已过期并且不再存在于数据流中。所以 shard_iterator 现在指向磁盘中的一个空白空间。

当这种情况发生时,取“next_shard_iterator”的值并使用 get_records 再次获取 kinesis 数据记录。

另外一件事是我们并不完全了解 AWS 如何管理数据流中的每个分片。如何删除数据并将其添加到其中。也许数据没有存储在并发/连续内存块中,因此我们在检索数据之间得到空结果。

继续获取“next_shard_iterator”的值并使用 get_records,直到“millis_behind_latest”的值为 0。

希望这个答案有帮助。 :)

【讨论】:

以上是关于AWS Kinesis ShardIteratorType TRIM_HORIZON 的预期行为的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming 集成 AWS Kinesis

带有 aws-kinesis 的 Spring Cloud Bus

将 AWS Lambda 数据推送到 Kinesis Stream

如何确定 AWS kinesis 流中的分区键总数?

使用 AWS Java DynamoDB 流 Kinesis 适配器处理 DynamoDB 流

php RAW Curl使用PHP发布到AWS Kinesis