将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?

Posted

技术标签:

【中文标题】将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?【英文标题】:How to return response to webflux endpoint after pushing message into google cloud pubsub? 【发布时间】:2020-08-16 18:07:02 【问题描述】:

我正在使用 Spring 集成创建一个简单的 Spring Boot 应用程序。下面是这个应用程序的三个主要结构:

    入站网关:接受 http 请求的 WebFluxInboundEndpoint 出站网关:PubSubMessageHandler 将消息推送到谷歌云 pubsub 主题 消息通道:FluxMessageChannel 充当请求通道

谷歌云 PubSubMessageHandler 提供失败和成功回调,因为错误/成功响应没有返回到 webflux 端点并且请求无限期等待。

问:收到pubsub的响应后如何返回成功/失败响应?

应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

要运行应用程序,请将您的谷歌云服务密钥放入 serviceAccountKey.json 文件中并 然后提供环境变量 GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json

示例请求:curl -d "name=piyush" http://localhost:8080/createPerson

以下是接受上述请求并转换为spring消息后的示例文件,它推送到pubsub主题“人”

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication 

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) 
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) 
        return new JacksonPubSubMessageConverter(objectMapper);
    

    @Bean
    public MessageChannel pubSubOutputChannel() 
        return MessageChannels.flux().get();
    

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(inputChannel = "pubSubOutputChannel")
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) 
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setPublishCallback(new ListenableFutureCallback<>() 
            @Override
            public void onFailure(Throwable ex) 
                LOGGER.info("There was an error sending the message.");
            

            @Override
            public void onSuccess(String result) 
                LOGGER.info("Message was sent successfully.");
            
        );

        return handler;
    

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() 

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    

build.gradle 依赖项是:

plugins 
    id 'org.springframework.boot' version '2.2.6.RELEASE'
    id 'io.spring.dependency-management' version '1.0.9.RELEASE'
    id 'java'


group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations 
    compileOnly 
        extendsFrom annotationProcessor
    


repositories 
    mavenCentral()


ext 
    set('springCloudVersion', "Hoxton.SR4")


dependencies 
    implementation 'org.springframework.boot:spring-boot-starter-webflux'
    implementation 'org.springframework.boot:spring-boot-starter-integration'
    implementation 'org.springframework.integration:spring-integration-webflux'
    implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
    testImplementation('org.springframework.boot:spring-boot-starter-test') 
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    


dependencyManagement 
    imports 
        mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
    


test 
    useJUnitPlatform()

将 PubSubMessageHandler 设置为同步并添加 ExpressionEvaluatingRequestHandlerAdvice 后的新应用程序文件,但这会在 MessagingGatewaySupport 创建 Correlator 时给出错误“'beanFactory' must not be null”。

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication 

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) 
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) 
        return new JacksonPubSubMessageConverter(objectMapper);
    

    @Bean
    public MessageChannel pubSubOutputChannel() 
        return MessageChannels.flux().get();
    

    @Bean
    public MessageChannel replyChannel() 
        return MessageChannels.flux().get();
    

    @Bean
    public MessageChannel errorChannel() 
        return MessageChannels.flux().get();
    

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "expressionAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) 
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() 

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());
        endpoint.setReplyChannel(replyChannel());
        endpoint.setErrorChannel(errorChannel());

        return endpoint;
    

    @Bean
    public Advice expressionAdvice() 
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannel(replyChannel());
        advice.setFailureChannel(errorChannel());
        return advice;
    

发送 http 请求后出现的错误堆栈跟踪:

2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1]  500 Server Error for HTTP POST "/createPerson"

java.lang.IllegalArgumentException: 'beanFactory' must not be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
        at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
        at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]

【问题讨论】:

【参考方案1】:

感谢@Artem。我通过提供自定义请求处理程序建议解决了这个问题,该建议在成功场景中从消息头中识别 replyChannel 并发送消息有效负载以响应 webflux 端点。

对于错误场景,我依赖 ReactiveStreamsConsumer 的错误处理机制,它在内部使用 errorChannel 将错误发送回 webflux 端点。

请告知这个实现是否正确。

下面是 PubSubRequestHandlerAdvice 的代码:

package com.example;

import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice 

  private final MessagingTemplate messagingTemplate = new MessagingTemplate();

  @Override
  protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) 

    Object result = callback.execute();

    Object evalResult = message.getPayload();
    MessageChannel successChannel = null;
    Object replyChannelHeader = message.getHeaders().getReplyChannel();
    if (replyChannelHeader instanceof MessageChannel) 
      successChannel = (MessageChannel) replyChannelHeader;
    

    if (evalResult != null && successChannel != null) 
      AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
      this.messagingTemplate.send(successChannel, resultMessage);
    
    return result;
  

将 PubSubRequestHandlerAdvice 用于 PubSubMessageHandler 的最终应用程序文件。

package com.example;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

/**
 * Entry point into the sample application.
 *
 * @author Piyush Garg
 */
@SpringBootApplication
public class PubSubWebFluxApplication 

    private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);

    private static final String TOPIC_NAME = "person";

    public static void main(String[] args) 
        SpringApplication.run(PubSubWebFluxApplication.class, args);
    

    /**
     * bean to deserialize request payload.
     */
    @Bean
    public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper) 
        return new JacksonPubSubMessageConverter(objectMapper);
    

    @Bean
    public MessageChannel pubSubOutputChannel() 
        return MessageChannels.flux().get();
    

    /**
     * Message handler which will consume messages from message channel.
     * Then it will send google cloud pubsub topic.
     */
    @Bean
    @ServiceActivator(
            inputChannel = "pubSubOutputChannel",
            adviceChain = "pubSubAdvice"
    )
    public MessageHandler messageSender(PubSubTemplate pubSubTemplate) 
        PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
        handler.setSync(true);
        return handler;
    

    /**
     * Webflux endpoint to consume http request.
     */
    @Bean
    public WebFluxInboundEndpoint webFluxInboundEndpoint() 

        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();

        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setMethods(HttpMethod.POST);
        requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
        requestMapping.setPathPatterns("/createPerson");
        endpoint.setRequestMapping(requestMapping);

        endpoint.setRequestChannel(pubSubOutputChannel());

        return endpoint;
    

    @Bean
    public Advice pubSubAdvice() 
        return new PubSubRequestHandlerAdvice();
    


应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample

【讨论】:

【参考方案2】:

PubSubMessageHandler 不是为请求/回复行为而设计的。 在大多数情况下,它被用作 send-n-forget。

由于您真的担心成功/失败回复,我只能建议如下:

    PubSubMessageHandler.setSync(true):

    /**
     * Set publish method to be synchronous or asynchronous.
     *
     * <p>Publish is asynchronous be default.
     * @param sync true for synchronous, false for asynchronous
     */
    public void setSync(boolean sync) 
    

这样,您的PubSubMessageHandler 将等待pubsubFuture.get();,如果失败,将抛出MessageHandlingException

    要处理此sync 场景的成功或失败,我建议查看ExpressionEvaluatingRequestHandlerAdvice 及其successChannelfailureChannel。 在onSuccessExpression 上我认为应该#root 指向requestMessageonFailureExpression 可以查询#exception SpEL 表达式变量,但仍将requestMessage 传播到failureChannel。我之所以谈论requestMessage,是因为它具有重要的replyChannel 来响应WebFluxInboundEndpoint 请求。 在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

    那些successChannelfailureChannel 以及失败的子流应该正确回复,一些返回将它们的outputChannel 留空。

但同时我完全同意将PubSubMessageHandler 设置为AbstractReplyProducingMessageHandler 返回一些ListenableFuture 让我们处理发布结果会容易得多。

【讨论】:

感谢@Artem 的建议。我已经创建了另外 2 个 FluxMessageChannel 作为 replyChannel() 和 errorChannel()。然后将这些通道附加到 WebFluxEndPoint 和 expressionAdvice。但是在发送 http 请求时,MessagingGatewaySupport 会创建 Correlator,但由于 beanFactory 为空而失败。在问题中附加了新的应用程序文件和堆栈跟踪以供参考。请指教,我是否正确配置了 expressionAdvice。 不,您不需要endpoint.setReplyChannel(replyChannel());,因此不需要专用通道bean。您只需要依赖自动创建的 replyChannel 标头,它是一个 TemporaryReplyChannel 实例。您的 ExpressionEvaluatingRequestHandlerAdvice 应该发送到其他不会回复任何内容的频道 - 基本上是提到的 replyChannel 标头。您也不能依赖默认的Message::getPayload,因为您需要来自请求消息的标头,这些标头将包含提到的replyChannel 正如我所说:现有的PubSubMessageHandler 行为将是一项艰巨的任务。您可以考虑将PublishSubscribeChannelPubSubMessageHandler 作为第一个订阅者,将其他内容作为success pub/sub 发布的第二个订阅者。 对于错误变体,我真的会留在您的endpoint.setErrorChannel(errorChannel()),但您仍然需要处理该错误并将合理的内容返回到相同的replyChannel 标头中。

以上是关于将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?的主要内容,如果未能解决你的问题,请参考以下文章

谷歌云发布订阅延迟消息

关于 GCM 中的规范 ID(谷歌云消息传递)

GCP - 谷歌云平台:有没有任何方法可以在没有任何版本控制系统的情况下将代码从 eclipse 推送到云端?

谷歌云消息“未注册”失败并取消订阅最佳做法?

博客搬家 - 记第四次搬家(hugo建站推送到谷歌云存储)

谷歌云构建不使用环境变量替换 Firebase 令牌