spring cloud function 函数接口返回成功/失败处理
Posted
技术标签:
【中文标题】spring cloud function 函数接口返回成功/失败处理【英文标题】:Spring cloud function Function interface return success/failure handling 【发布时间】:2021-12-21 08:19:52 【问题描述】:我目前有一个spring cloud stream应用,有一个监听函数,主要监听某个topic,依次执行如下:
-
使用来自主题的消息
在数据库中存储消费消息
调用外部服务获取一些信息
处理数据
在数据库中记录结果
将消息发送到另一个主题
确认消息(我已将确认模式设置为手动)
我们决定迁移到 Spring Cloud 功能,我已经能够使用Function
接口完成上述几乎所有步骤,以源主题为输入,接收器主题为输出。
@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts()
return message ->
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
return MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
我的问题与第 7 步(确认消息)中的异常处理有关。只有当我们确定消息已成功发送到 sink 队列时,我们才会确认该消息,否则我们不会确认该消息。
我的问题是,这样的事情怎么能在Spring cloud function中实现,特别是send方法完全依赖于Spring Framework(作为函数接口实现评估的结果)。
之前,我们可以通过 try/catch 来做到这一点
@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message)
try
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
catch (Exception exception)
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
Function接口成功返回后是否有监听器触发,类似KafkaSendCallback
但不指定模板
【问题讨论】:
请注意,上面的摘录是简化的,旨在描述原始代码跨越多个类的概念 【参考方案1】:Spring Cloud Stream 不知道函数。它与之前的消息处理程序相同,因此与之前使用的回调方法相同的方法可以用于函数。所以也许你可以分享一些可以澄清你的意思的代码?我也不明白你的意思 ..send 方法完全依赖于 Spring Framework..
【讨论】:
感谢您的及时回复!我已经包含了我的 spring 功能代码示例。我的意思是“方法完全依赖于 Spring 框架”是指 Spring 框架完全处理消息路由和发送过程,然后我自己会通过 KafkaTemplate 执行此操作,因此如果发送操作成功或可以得到回调失败【参考方案2】:在上述 Oleg 的基础上,如果您想严格恢复 StreamListener
代码中的行为,您可以尝试以下方法。您可以切换到消费者,然后像以前一样使用KafkaTemplate
进行出站发送,而不是使用函数。
@Bean
public Consumer<Message<NotificationMessage>> validatedProducts()
return message ->
try
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message); //here, you make sure that the data was sent successfully by using some callback.
//only ack if the data was sent successfully.
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
catch (Exception exception)
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
;
另一件值得研究的事情是使用 Kafka 事务,在这种情况下,如果它不能端到端工作,则不会发生确认。 Spring Cloud Stream binder 基于 Spring for Apache Kafka 的基础对此提供了支持。更多详情here。 Here 是 Spring Cloud Stream 文档。
【讨论】:
感谢您的及时回复!虽然这肯定是一个选项,但我们希望使用 Function 接口,因为它更适合用例,并希望有一种方法能够做到这一点,而无需将其作为消费者处理。交易听起来不错,我需要先浏览您分享的文档。【参考方案3】:好吧,所以我选择的实际上是不使用 KafkaTemplate(或 streamBridge)。虽然这是一个可行的解决方案,但这意味着我的 Function 将被拆分为 Consumer 和某种临时提供的(在本例中为 KafkaTemplate)。
由于我想坚持功能接口的设计目标,我已经在 ProducerListener 接口实现中隔离了数据库更新的行为
@Configuration
public class ProducerListenerConfiguration
private final MongoTemplate mongoTemplate;
public ProducerListenerConfiguration(MongoTemplate mongoTemplate)
this.mongoTemplate = mongoTemplate;
@Bean
public ProducerListener myProducerListener()
return new ProducerListener()
@SneakyThrows
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata)
final ValidatedEvent event = new ObjectMapper().readerFor(ValidatedEvent.class).readValue((byte[]) producerRecord.value());
final var updateResult = updateDocumentProcessedState(event.getKey(), event.getPayload().getVersion(), true);
@SneakyThrows
@Override
public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception)
ProducerListener.super.onError(producerRecord, recordMetadata, exception);
;
public UpdateResult updateDocumentProcessedState(String id, long version, boolean isProcessed)
Query query = new Query();
query.addCriteria(Criteria.where("_id").is(id));
Update update = new Update();
update.set("processed", isProcessed);
update.set("version", version);
return mongoTemplate.updateFirst(query, update, ProductChangedEntity.class);
然后每次成功尝试,数据库都会更新处理结果和更新的版本号。
【讨论】:
以上是关于spring cloud function 函数接口返回成功/失败处理的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Google Cloud Function 上的 Spring Cloud 函数中获取 Pub/Sub 事件的元数据
从 Spring Cloud Function 访问 AWS Lambda 上下文