解释 Kinesis 分片迭代器 - AWS Java 开发工具包
Posted
技术标签:
【中文标题】解释 Kinesis 分片迭代器 - AWS Java 开发工具包【英文标题】:Explain Kinesis Shard Iterator - AWS Java SDK 【发布时间】:2014-11-11 12:16:27 【问题描述】:好的,我将从一个详细的用例开始并解释我的问题:
-
我使用第 3 方网络分析平台,该平台利用 AWS Kinesis 流将数据从客户端传递到最终目的地 - Kinesis 流;
网络分析平台使用 2 个流:
-
数据收集器流(单个分片流);
第二个流,用于丰富来自收集器流(单个分片流)的原始数据;最重要的是,此流使用
TRIM_HORIZON
迭代器类型使用来自第一个流的原始数据;
GetShardIteratorRequest
类;
我目前正在开发提取类,所以这是同步完成的,这意味着我只在编译我的类时才使用数据;
该类的效果出人意料,尽管有些事情我无法理解,特别是关于如何从流中使用数据以及每种迭代器类型的含义;
我的问题是我检索到的数据不一致,里面没有时间逻辑。
当我使用 AT_SEQUENCE_NUMBER
并使用
.getSequenceNumberRange().getStartingSequenceNumber();
...作为``,我没有得到所有记录。同样,AFTER_SEQUENCE_NUMBER
;
LATEST
时,我得到的结果为零;
当我使用TRIM_HORIZON
时,它应该是有意义的,但它似乎不能正常工作。它曾经为我提供数据,然后我添加了新的“事件”(记录到最终流中)并且我收到了零记录。神秘。
我的问题是:
-
如何安全地使用流中的数据,而不必担心丢失记录?
是否有替代
ShardIteratorRequest
的方法?
如果有,我如何才能“浏览”流并查看其中的内容以供调试参考?
TRIM_HORIZON
方法缺少什么?
提前致谢,我真的很想了解更多有关 Kinesis 流中数据消耗的信息。
【问题讨论】:
我也有类似的问题 - 尽管对我来说,我在每次迭代中都会得到重复的记录(同时使用 AT_SEQUENCE_NUMBER 和 FROM_SEQUENCE_NUMBER),尽管每次响应都使用 NextShardIterator 值。文档在这个问题上有些神秘......我也很想知道“未修剪”是什么意思(w.r.t TRIM_HORIZON)。 为了记录,我在此期间做了一些不同的事情——我使用了一个现有的 Scala 消费者,它不断地监听流,然后为了我的目的将它移植回纯 Java。这是 Scala 应用程序,最初由 SnowPlow github.com/snowplow/kinesis-example-scala-consumer 开发 可悲的是,我对 java 不友好.....!我只是希望有一个与语言无关的明确指南,说明如何确保幂等性和 100% 的记录“覆盖”,同时允许消费者重启、崩溃等。如果我们必须保存并检查 SequenceNumber,这似乎否定了 Kinesis 的目的所有先前提取的记录,以确保没有重复。我确定我错过了一些东西....... 您尝试过亚马逊自己的库吗? github.com/awslabs/amazon-kinesis-connectors github.com/awslabs/amazon-kinesis-client 这些库(尤其是连接器)处理所有繁琐的事情,例如查明检查点、继续处理分片等。 我在使用没有 KCL 的 JSON api 时遇到了类似的问题。我想获取最后一条记录作为检查点。 LATEST 给了我一个空数组。 TRIM_HORIZON 目前给了我 8 条记录。我可以遍历所有记录(可能是数千个)以获得最后一个,但这似乎很荒谬。 latest 应该如何工作?无论 KCL 在做什么,它都应该使用相同的 API,说“使用 KCL”并不能回答问题,它的检查点应该只基于这个 API 和存储的结果。 【参考方案1】:我理解上面的困惑,我也遇到了同样的问题,但我想我现在已经弄清楚了。请注意,我直接使用 JSON API 而不使用 KCL。
我似乎 API 在客户端开始消费流时为他们提供了 2 种基本的迭代器选择:
A) TRIM_HORIZON:用于读取延迟数分钟(甚至数小时)到 24 小时之间的过去记录。它不会返回最近放置的记录。在此迭代器看到的最后一条记录上使用 AFTER_SEQUENCE_NUMBER 会返回一个空数组,即使记录最近已被 PUT。
B) LATEST:用于实时读取 FUTURE 记录(在 PUT 之后立即)。我被我在这个“在分片中最近的记录之后开始阅读,以便您始终阅读分片中的最新数据”中找到的唯一文档中的一句话欺骗了。你得到一个空数组,因为自从获得迭代器后没有记录被 PUT。如果您获得这种类型的迭代器,然后 PUT 一条记录,该记录将立即可用。
最后,如果您知道最近放置的记录的序列 ID,则可以使用 AT_SEQUENCE_NUMBER 立即获取它,并且可以使用 AFTER_SEQUENCE_NUMBER 获取以后的记录,即使它们不会出现在 TRIM_HORIZON 迭代器中。
上面确实意味着,如果你想实时读取所有已知的过去记录和未来记录,你必须使用 A 和 B 的组合,并用逻辑来处理介于两者之间的记录(最近的过去)。 KCL 可能会很好地解决这个问题。
【讨论】:
AWS 无法创建像样的 API 来拯救自己。对于我的下一个项目,我将迁移到 Google Cloud。情况再糟糕不过了。以上是关于解释 Kinesis 分片迭代器 - AWS Java 开发工具包的主要内容,如果未能解决你的问题,请参考以下文章