Spring Cloud Stream GCP如何重新排队失败的消息

Posted

技术标签:

【中文标题】Spring Cloud Stream GCP如何重新排队失败的消息【英文标题】:Spring Cloud Stream GCP How to do Re-queue Failed Messages 【发布时间】:2019-07-25 12:40:50 【问题描述】:

根据文档here,当前支持的绑定器(Rabbit 和 Kafka)依赖于 RetryTemplate。对于 GCP

我的项目详情

Spring Boot 版本 2.1.3.RELEASE

依赖pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    <version>1.1.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    <version>1.1.0.RELEASE</version>
</dependency>

application.properties

spring.cloud.stream.bindings.input.destination=inputtopic
spring.cloud.stream.bindings.output.destination=outputtopic

spring.cloud.gcp.project-id=testinggcp
spring.cloud.gcp.credentials.location=file:C:/Users/my_gcp_credentials.json

休息控制器

@EnableBinding(Source.class,Sink.class)
@RestController
public class SourceExample 

    @Autowired
    private Source source;


    @GetMapping("/newMessage")
    public UserMessage sendMessage(@RequestParam("messageBody") String messageBody,
                                   @RequestParam("username") String username) 
        UserMessage userMessage = new UserMessage(messageBody, username, LocalDateTime.now());

        this.source.output().send(new GenericMessage<>(userMessage));
        return userMessage;
    


    @StreamListener(target = Sink.INPUT)
    public void handle(UserMessage userMessage) throws IOException 
        System.out.println(userMessage);
    



【问题讨论】:

【参考方案1】:

不,Spring Cloud GCP Pub/Sub Binder 不提供任何重试挂钩。

@ServiceActivator 上使用RequestHandlerRetryAdvice 代替@StreamListener 很容易。因此,您在 POJO 方法中的所有失败都将根据您的配置进行重试。 RequestHandlerRetryAdvice 有一个 RecoveryCallback 选项,这可能只是简单的 ErrorMessageSendingRecoverer,您可以在其中配置一些错误处理并将错误消息发送到 GCP Pub/Sub 上的某些死信主题。

在参考手册中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#retry-advice

还有一点关于Advising Endpoints Using Annotations.

【讨论】:

我真的很困惑 ServiceActivator 不起作用。如果抛出异常,AFAIK GCP 具有 pubSubAcknowledgementExecutor,它会自动调用句柄函数(Sink.INPUT)。意思是说,Re-queue Failed Messages 的实现就没有必要了? 好吧,在那种情况下,我是这么认为的。如果消息已发送到侦听器,则 ack 是自动的。 以及当生产者在this.source.output()这一行抛出异常时如何重试消息

以上是关于Spring Cloud Stream GCP如何重新排队失败的消息的主要内容,如果未能解决你的问题,请参考以下文章

使用 GCP pubsub 的 Spring Cloud Stream 消费者的并发设置

spring-cloud-gcp-starter-bigquery 从属性文件中忽略 spring.cloud.gcp.credentials.location

spring-cloud 和 spring-cloud-gcp 是不是有通用 BOM 文件?

Spring boot 和 GCP - 使用 spring-cloud-gcp-starter-sql-postgresql 连接 Cloud SQL 实例尝试 SSL 并且延迟启动

无法使用 spring cloud gcp starter 将 Spring Boot 与 Cloud SQL 连接

从现有的 GCP pubsub 订阅中消费