在使用 Flux 时顺序调用非阻塞操作,包括重试

Posted

技术标签:

【中文标题】在使用 Flux 时顺序调用非阻塞操作,包括重试【英文标题】:Invoking non-blocking operations sequentially while consuming from a Flux including retries 【发布时间】:2019-06-05 04:19:09 【问题描述】:

所以我的用例是在 Spring Webflux 应用程序中使用来自 Kafka 的消息,同时使用 Project Reactor 以反应式编程,并以与接收消息相同的顺序对每条消息执行非阻塞操作卡夫卡。系统也应该能够自行恢复。

这是设置为使用的代码 sn-p:

    Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> 
        KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
        return receiver.receive();
    );

    messages.map(this::transformToOutputFormat)
            .map(this::performAction)
            .flatMapSequential(receiverRecordMono -> receiverRecordMono)
            .doOnNext(record -> record.receiverOffset().acknowledge())
            .doOnError(error -> logger.error("Error receiving record", error))
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .subscribe();

如您所见,我所做的是:从 Kafka 获取消息,将其转换为用于新目的地的对象,然后将其发送到目的地,然后确认偏移量以将消息标记为已使用并已处理.以与从 Kafka 消费的消息相同的顺序确认偏移量至关重要,这样我们就不会将偏移量移动到未完全处理的消息之外(包括将一些数据发送到目的地)。因此,我使用flatMapSequential 来确保这一点。

为简单起见,我们假设transformToOutputFormat() 方法是一个恒等变换。

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) 
    return record;

performAction() 方法需要通过网络执行某些操作,例如调用 HTTP REST API。所以适当的 API 会返回一个 Mono,这意味着需要订阅该链。此外,我需要通过此方法返回 ReceiverRecord,以便可以在上面的 flatMapSequential() 运算符中确认偏移量。因为我需要订阅 Mono,所以我在上面使用 flatMapSequential。如果没有,我可以改用map

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) 
    return Mono.just(record)
            .flatMap(receiverRecord ->
                    HttpClient.create()
                            .port(3000)
                            .get()
                            .uri("/makeCall?data=" + receiverRecord.value().getData())
                            .responseContent()
                            .aggregate()
                            .asString()
            )
            .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
            .then(Mono.just(record));

我在这个方法中有两个相互冲突的需求: 1. 订阅发起 HTTP 调用的链 2.返回ReceiverRecord

使用 flatMap() 意味着我的返回类型更改为 Mono。在同一个地方使用 doOnNext() 将保留链中的 ReceiverRecord,但不允许自动订阅 HttpClient 响应。

我无法在asString() 之后添加.subscribe(),因为我想等到完全接收到 HTTP 响应后才能确认偏移量。

我也不能使用.block(),因为它在并行线程上运行。

因此,我需要作弊并从方法范围返回record对象。

另一件事是在performAction 内重试时,它会切换线程。由于 flatMapSequential() 急切地订阅外部流中的每个 Mono,这意味着虽然可以保证按顺序确认偏移量,但我们不能保证 performAction 中的 HTTP 调用将以相同的顺序执行。

所以我有两个问题。

    是否可以以自然的方式返回record 而不是返回方法范围对象? 是否可以确保 HTTP 调用和偏移确认都以与发生这些操作的消息相同的顺序执行?

【问题讨论】:

【参考方案1】:

这是我想出的解决方案。

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> 
    KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
    return receiver.receive();
);

messages.map(this::transformToOutputFormat)
        .delayUntil(this::performAction)
        .doOnNext(record -> record.receiverOffset().acknowledge())
        .doOnError(error -> logger.error("Error receiving record", error))
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
        .subscribe();

我没有使用 flatMapSequential 订阅 performAction Mono 并保留序列,而是将来自 Kafka 接收器的对更多消息的请求延迟到执行操作为止。这样就可以进行我需要的一次处理。

因此,performAction 不需要返回 ReceiverRecord 的 Mono。我还将其简化为以下内容:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) 
    HttpClient.create()
        .port(3000)
        .get()
        .uri("/makeCall?data=" + receiverRecord.value().getData())
        .responseContent()
        .aggregate()
        .asString()
        .retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));

【讨论】:

以上是关于在使用 Flux 时顺序调用非阻塞操作,包括重试的主要内容,如果未能解决你的问题,请参考以下文章

java同异步请求和阻塞非阻塞的区别

RocketMQ的消息重试

从 Mono 的列表中创建 Flux 的正确方法

springboot rabbitmq 非阻塞重试机制实现

springboot rabbitmq 非阻塞重试机制实现

springboot rabbitmq 非阻塞重试机制实现