Mongo change-Stream with Spring resumeAt vs startAfter 以及连接丢失时的容错

Posted

技术标签:

【中文标题】Mongo change-Stream with Spring resumeAt vs startAfter 以及连接丢失时的容错【英文标题】:Mongo change-Stream with Spring resumeAt vs startAfter and fault tolerance in case of connection loss 【发布时间】:2022-01-17 13:31:04 【问题描述】:

在 *** 和任何文档中都找不到答案, 我有以下更改流代码(收听数据库而不是特定集合)

Mongo 版本是 4.2

@Configuration
public class DatabaseChangeStreamListener 

//Constructor, fields etc...

    @PostConstruct
    public void initialize() 
        MessageListenerContainer container = new DefaultMessageListenerContainer(mongoTemplate, new SimpleAsyncTaskExecutor(), this::onException);
        ChangeStreamRequest.ChangeStreamRequestOptions options =
                new ChangeStreamRequest.ChangeStreamRequestOptions(mongoTemplate.getDb().getName(), null, buildChangeStreamOptions());
        container.register(new ChangeStreamRequest<>(this::onDatabaseChangedEvent, options), Document.class);
        container.start();
    

    private ChangeStreamOptions buildChangeStreamOptions() 
        return ChangeStreamOptions.builder()
                .returnFullDocumentOnUpdate()
                .filter(newAggregation(match(where(OPERATION_TYPE).in(INSERT.getValue(), UPDATE.getValue(), REPLACE.getValue(), DELETE.getValue()))))
                .resumeAt(Instant.now().minusSeconds(1))
                .build();
    
//more code

我希望流仅从系统启动时间开始侦听,而不在操作日志中预先获取任何内容,.resumeAt(Instant.now().minusSeconds(1)) 是否有效? 如果需要,我是否需要使用starAfter 方法我如何在数据库中找到最新的resumeToken? 还是开箱即用,我不需要添加任何简历/开始行?

第二个问题,我从不停止容器(它应该在应用程序运行时始终存在),如果与 mongoDB 断开连接并重新连接,当前配置中的侦听器会继续消费消息吗? (我很难模拟 DB 断开连接)

如果它不会恢复处理事件,我需要在配置中进行哪些更改,以便更改流将继续并在断开连接之前从最后收到的resumeToken 中获取所有事件? 我在媒体change stream in prodcution 上阅读了这篇很棒的文章, 但它直接使用光标,我想使用弹簧DefaultMessageListenerContainer,因为它更优雅。

【问题讨论】:

【参考方案1】:

所以我会回答我自己的问题(一些更愚蠢,一些更少:)...)问题:

    如果没有提供 resumeAt 时间戳,则更改流将从当前时间开始,并且不会绘制任何以前的事件。 resumeAfter 事件与时间戳的差异可以在这里找到:*** answer 但请记住,对于时间戳,它包含事件,所以如果你想从下一个事件(在 java 中)开始:
    private BsonTimestamp getNextEventTimestamp(BsonTimestamp timestamp) 
        return new BsonTimestamp(timestamp.getValue() + 1);
    
    如果互联网断开,更改流将不会恢复, 因此,我建议在出现错误时采取以下方法:
    private void onException() 
        ScheduledExecutorService executorService = newSingleThreadScheduledExecutor();
        executorService.scheduleAtFixedRate(() -> recreateChangeStream(executorService), 0, 1, TimeUnit.SECONDS);
    

    private void recreateChangeStream(ScheduledExecutorService executorService) 
        try 
            mongoTemplate.getDb().runCommand(new BasicDBObject("ping", "1"));
            container.stop();
            startNewContainer();
            executorService.shutdown();
         catch (Exception ignored) 
        
    

首先,我正在创建一个始终运行的可运行计划任务(但一次只能运行 1 个 newSingleThreadScheduledExecutor()),我正在尝试 ping 数据库,在成功 ping 之后,我正在停止旧容器并启动一个新容器,您还可以传递您使用的最后一个时间戳,以便获取您可能错过的所有事件

从事件中检索时间戳:

BsonTimestamp resumeAtTimestamp = changeStreamDocument.getClusterTime();

然后我将关闭任务。

还要确保 resumeAtTimestamp 存在于 oplog...

【讨论】:

以上是关于Mongo change-Stream with Spring resumeAt vs startAfter 以及连接丢失时的容错的主要内容,如果未能解决你的问题,请参考以下文章

Find or Query Data with the mongo Shell

解决:Command ‘mongo‘ not found, but can be installed with

Spring with mongo-java-driver 使用 Azure Cosmos DB 作为 MongoDB

#yyds干货盘点# 解决:Command ‘mongo‘ not found, but can be installed with

Mongodb $ dayOfMonth with timezone

Docker 问题集锦(20) - 解决:Command ‘mongo‘ not found, but can be installed with