spring cloud function 函数接口返回成功/失败处理

Posted

技术标签:

【中文标题】spring cloud function 函数接口返回成功/失败处理【英文标题】:Spring cloud function Function interface return success/failure handling 【发布时间】:2021-12-21 08:19:52 【问题描述】:

我目前有一个spring cloud stream应用,有一个监听函数,主要监听某个topic,依次执行如下:

    使用来自主题的消息 在数据库中存储消费消息 调用外部服务获取一些信息 处理数据 在数据库中记录结果 将消息发送到另一个主题 确认消息(我已将确认模式设置为手动)

我们决定迁移到 Spring Cloud 功能,我已经能够使用Function 接口完成上述几乎所有步骤,以源主题为输入,接收器主题为输出。

@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() 
    return message -> 
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        return MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
    

我的问题与第 7 步(确认消息)中的异常处理有关。只有当我们确定消息已成功发送到 sink 队列时,我们才会确认该消息,否则我们不会确认该消息。

我的问题是,这样的事情怎么能在Spring cloud function中实现,特别是send方法完全依赖于Spring Framework(作为函数接口实现评估的结果)。

之前,我们可以通过 try/catch 来做到这一点

@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) 
    try 
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message);
        
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
    catch (Exception exception)
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    

Function接口成功返回后是否有监听器触发,类似KafkaSendCallback但不指定模板

【问题讨论】:

请注意,上面的摘录是简化的,旨在描述原始代码跨越多个类的概念 【参考方案1】:

Spring Cloud Stream 不知道函数。它与之前的消息处理程序相同,因此与之前使用的回调方法相同的方法可以用于函数。所以也许你可以分享一些可以澄清你的意思的代码?我也不明白你的意思 ..send 方法完全依赖于 Spring Framework..

【讨论】:

感谢您的及时回复!我已经包含了我的 spring 功能代码示例。我的意思是“方法完全依赖于 Spring 框架”是指 Spring 框架完全处理消息路由和发送过程,然后我自己会通过 KafkaTemplate 执行此操作,因此如果发送操作成功或可以得到回调失败【参考方案2】:

在上述 Oleg 的基础上,如果您想严格恢复 StreamListener 代码中的行为,您可以尝试以下方法。您可以切换到消费者,然后像以前一样使用KafkaTemplate 进行出站发送,而不是使用函数。

@Bean
public Consumer<Message<NotificationMessage>> validatedProducts() 
return message -> 
  try
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message); //here, you make sure that the data was sent successfully by using some callback. 
       //only ack if the data was sent successfully. 
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        
  
  catch (Exception exception)
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    
  ;


另一件值得研究的事情是使用 Kafka 事务,在这种情况下,如果它不能端到端工作,则不会发生确认。 Spring Cloud Stream binder 基于 Spring for Apache Kafka 的基础对此提供了支持。更多详情here。 Here 是 Spring Cloud Stream 文档。

【讨论】:

感谢您的及时回复!虽然这肯定是一个选项,但我们希望使用 Function 接口,因为它更适合用例,并希望有一种方法能够做到这一点,而无需将其作为消费者处理。交易听起来不错,我需要先浏览您分享的文档。【参考方案3】:

好吧,所以我选择的实际上是不使用 KafkaTemplate(或 streamBridge)。虽然这是一个可行的解决方案,但这意味着我的 Function 将被拆分为 Consumer 和某种临时提供的(在本例中为 KafkaTemplate)。

由于我想坚持功能接口的设计目标,我已经在 ProducerListener 接口实现中隔离了数据库更新的行为

@Configuration
public class ProducerListenerConfiguration 
    private final MongoTemplate mongoTemplate;

    public ProducerListenerConfiguration(MongoTemplate mongoTemplate) 
        this.mongoTemplate = mongoTemplate;
    

    @Bean
    public ProducerListener myProducerListener() 
        return new ProducerListener() 
            @SneakyThrows
            @Override
            public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) 
                final ValidatedEvent event = new ObjectMapper().readerFor(ValidatedEvent.class).readValue((byte[]) producerRecord.value());
                final var updateResult = updateDocumentProcessedState(event.getKey(), event.getPayload().getVersion(), true);
            

            @SneakyThrows
            @Override
            public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) 
                ProducerListener.super.onError(producerRecord, recordMetadata, exception);
            
        ;
    

    public UpdateResult updateDocumentProcessedState(String id, long version, boolean isProcessed) 
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        Update update = new Update();
        update.set("processed", isProcessed);
        update.set("version", version);
        return mongoTemplate.updateFirst(query, update, ProductChangedEntity.class);
    

然后每次成功尝试,数据库都会更新处理结果和更新的版本号。

【讨论】:

以上是关于spring cloud function 函数接口返回成功/失败处理的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Google Cloud Function 上的 Spring Cloud 函数中获取 Pub/Sub 事件的元数据

Spring Cloud(14)——Function

从 Spring Cloud Function 访问 AWS Lambda 上下文

一个项目中的多个 Spring Cloud Functions 用于在 AWS Lambda 上部署

spring cloud组件思维导图

Spring-Cloud-Function-Spel 漏洞复现