Mongo change-Stream with Spring resumeAt vs startAfter 以及连接丢失时的容错
Posted
技术标签:
【中文标题】Mongo change-Stream with Spring resumeAt vs startAfter 以及连接丢失时的容错【英文标题】:Mongo change-Stream with Spring resumeAt vs startAfter and fault tolerance in case of connection loss 【发布时间】:2022-01-17 13:31:04 【问题描述】:在 *** 和任何文档中都找不到答案, 我有以下更改流代码(收听数据库而不是特定集合)
Mongo 版本是 4.2
@Configuration
public class DatabaseChangeStreamListener
//Constructor, fields etc...
@PostConstruct
public void initialize()
MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate, new SimpleAsyncTaskExecutor(), this::onException);
ChangeStreamRequest.ChangeStreamRequestOptions options =
new ChangeStreamRequest.ChangeStreamRequestOptions(mongoTemplate.getDb().getName(), null, buildChangeStreamOptions());
container.register(new ChangeStreamRequest<>(this::onDatabaseChangedEvent, options), Document.class);
container.start();
private ChangeStreamOptions buildChangeStreamOptions()
return ChangeStreamOptions.builder()
.returnFullDocumentOnUpdate()
.filter(newAggregation(match(where(OPERATION_TYPE).in(INSERT.getValue(), UPDATE.getValue(), REPLACE.getValue(), DELETE.getValue()))))
.resumeAt(Instant.now().minusSeconds(1))
.build();
//more code
我希望流仅从系统启动时间开始侦听,而不在操作日志中预先获取任何内容,.resumeAt(Instant.now().minusSeconds(1))
是否有效?
如果需要,我是否需要使用starAfter
方法我如何在数据库中找到最新的resumeToken
?
还是开箱即用,我不需要添加任何简历/开始行?
第二个问题,我从不停止容器(它应该在应用程序运行时始终存在),如果与 mongoDB 断开连接并重新连接,当前配置中的侦听器会继续消费消息吗? (我很难模拟 DB 断开连接)
如果它不会恢复处理事件,我需要在配置中进行哪些更改,以便更改流将继续并在断开连接之前从最后收到的resumeToken
中获取所有事件?
我在媒体change stream in prodcution 上阅读了这篇很棒的文章,
但它直接使用光标,我想使用弹簧DefaultMessageListenerContainer
,因为它更优雅。
【问题讨论】:
【参考方案1】:所以我会回答我自己的问题(一些更愚蠢,一些更少:)...)问题:
-
如果没有提供 resumeAt 时间戳,则更改流将从当前时间开始,并且不会绘制任何以前的事件。
resumeAfter 事件与时间戳的差异可以在这里找到:*** answer
但请记住,对于时间戳,它包含事件,所以如果你想从下一个事件(在 java 中)开始:
private BsonTimestamp getNextEventTimestamp(BsonTimestamp timestamp)
return new BsonTimestamp(timestamp.getValue() + 1);
-
如果互联网断开,更改流将不会恢复,
因此,我建议在出现错误时采取以下方法:
private void onException()
ScheduledExecutorService executorService = newSingleThreadScheduledExecutor();
executorService.scheduleAtFixedRate(() -> recreateChangeStream(executorService), 0, 1, TimeUnit.SECONDS);
private void recreateChangeStream(ScheduledExecutorService executorService)
try
mongoTemplate.getDb().runCommand(new BasicDBObject("ping", "1"));
container.stop();
startNewContainer();
executorService.shutdown();
catch (Exception ignored)
首先,我正在创建一个始终运行的可运行计划任务(但一次只能运行 1 个 newSingleThreadScheduledExecutor()
),我正在尝试 ping 数据库,在成功 ping 之后,我正在停止旧容器并启动一个新容器,您还可以传递您使用的最后一个时间戳,以便获取您可能错过的所有事件
从事件中检索时间戳:
BsonTimestamp resumeAtTimestamp = changeStreamDocument.getClusterTime();
然后我将关闭任务。
还要确保 resumeAtTimestamp
存在于 oplog...
【讨论】:
以上是关于Mongo change-Stream with Spring resumeAt vs startAfter 以及连接丢失时的容错的主要内容,如果未能解决你的问题,请参考以下文章
Find or Query Data with the mongo Shell
解决:Command ‘mongo‘ not found, but can be installed with
Spring with mongo-java-driver 使用 Azure Cosmos DB 作为 MongoDB
#yyds干货盘点# 解决:Command ‘mongo‘ not found, but can be installed with
Mongodb $ dayOfMonth with timezone
Docker 问题集锦(20) - 解决:Command ‘mongo‘ not found, but can be installed with