Spring Cloud Stream GCP如何重新排队失败的消息
Posted
技术标签:
【中文标题】Spring Cloud Stream GCP如何重新排队失败的消息【英文标题】:Spring Cloud Stream GCP How to do Re-queue Failed Messages 【发布时间】:2019-07-25 12:40:50 【问题描述】:根据文档here,当前支持的绑定器(Rabbit 和 Kafka)依赖于 RetryTemplate。对于 GCP?
我的项目详情
Spring Boot 版本 2.1.3.RELEASE
依赖pom.xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>
application.properties
spring.cloud.stream.bindings.input.destination=inputtopic
spring.cloud.stream.bindings.output.destination=outputtopic
spring.cloud.gcp.project-id=testinggcp
spring.cloud.gcp.credentials.location=file:C:/Users/my_gcp_credentials.json
休息控制器
@EnableBinding(Source.class,Sink.class)
@RestController
public class SourceExample
@Autowired
private Source source;
@GetMapping("/newMessage")
public UserMessage sendMessage(@RequestParam("messageBody") String messageBody,
@RequestParam("username") String username)
UserMessage userMessage = new UserMessage(messageBody, username, LocalDateTime.now());
this.source.output().send(new GenericMessage<>(userMessage));
return userMessage;
@StreamListener(target = Sink.INPUT)
public void handle(UserMessage userMessage) throws IOException
System.out.println(userMessage);
【问题讨论】:
【参考方案1】:不,Spring Cloud GCP Pub/Sub Binder 不提供任何重试挂钩。
在@ServiceActivator
上使用RequestHandlerRetryAdvice
代替@StreamListener
很容易。因此,您在 POJO 方法中的所有失败都将根据您的配置进行重试。 RequestHandlerRetryAdvice
有一个 RecoveryCallback
选项,这可能只是简单的 ErrorMessageSendingRecoverer
,您可以在其中配置一些错误处理并将错误消息发送到 GCP Pub/Sub 上的某些死信主题。
在参考手册中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#retry-advice
还有一点关于Advising Endpoints Using Annotations
.
【讨论】:
我真的很困惑 ServiceActivator 不起作用。如果抛出异常,AFAIK GCP 具有 pubSubAcknowledgementExecutor,它会自动调用句柄函数(Sink.INPUT)。意思是说,Re-queue Failed Messages 的实现就没有必要了? 好吧,在那种情况下,我是这么认为的。如果消息已发送到侦听器,则 ack 是自动的。 以及当生产者在this.source.output()这一行抛出异常时如何重试消息以上是关于Spring Cloud Stream GCP如何重新排队失败的消息的主要内容,如果未能解决你的问题,请参考以下文章
使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置
spring-cloud-gcp-starter-bigquery 从属性文件中忽略 spring.cloud.gcp.credentials.location
spring-cloud 和 spring-cloud-gcp 是不是有通用 BOM 文件?
Spring boot 和 GCP - 使用 spring-cloud-gcp-starter-sql-postgresql 连接 Cloud SQL 实例尝试 SSL 并且延迟启动