将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?
Posted
技术标签:
【中文标题】将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?【英文标题】:How to return response to webflux endpoint after pushing message into google cloud pubsub? 【发布时间】:2020-08-16 18:07:02 【问题描述】:我正在使用 Spring 集成创建一个简单的 Spring Boot 应用程序。下面是这个应用程序的三个主要结构:
-
入站网关:接受 http 请求的 WebFluxInboundEndpoint
出站网关:PubSubMessageHandler 将消息推送到谷歌云 pubsub 主题
消息通道:FluxMessageChannel 充当请求通道
谷歌云 PubSubMessageHandler 提供失败和成功回调,因为错误/成功响应没有返回到 webflux 端点并且请求无限期等待。
问:收到pubsub的响应后如何返回成功/失败响应?
应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
要运行应用程序,请将您的谷歌云服务密钥放入 serviceAccountKey.json 文件中并 然后提供环境变量 GOOGLE_APPLICATION_CREDENTIALS=/PATH_TO/serviceAccountKey.json
示例请求:curl -d "name=piyush" http://localhost:8080/createPerson
以下是接受上述请求并转换为spring消息后的示例文件,它推送到pubsub主题“人”
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args)
SpringApplication.run(PubSubWebFluxApplication.class, args);
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper)
return new JacksonPubSubMessageConverter(objectMapper);
@Bean
public MessageChannel pubSubOutputChannel()
return MessageChannels.flux().get();
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(inputChannel = "pubSubOutputChannel")
public MessageHandler messageSender(PubSubTemplate pubSubTemplate)
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setPublishCallback(new ListenableFutureCallback<>()
@Override
public void onFailure(Throwable ex)
LOGGER.info("There was an error sending the message.");
@Override
public void onSuccess(String result)
LOGGER.info("Message was sent successfully.");
);
return handler;
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint()
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
build.gradle 依赖项是:
plugins
id 'org.springframework.boot' version '2.2.6.RELEASE'
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
configurations
compileOnly
extendsFrom annotationProcessor
repositories
mavenCentral()
ext
set('springCloudVersion', "Hoxton.SR4")
dependencies
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-integration'
implementation 'org.springframework.integration:spring-integration-webflux'
implementation 'org.springframework.cloud:spring-cloud-gcp-starter-pubsub'
testImplementation('org.springframework.boot:spring-boot-starter-test')
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
dependencyManagement
imports
mavenBom "org.springframework.cloud:spring-cloud-dependencies:$springCloudVersion"
test
useJUnitPlatform()
将 PubSubMessageHandler 设置为同步并添加 ExpressionEvaluatingRequestHandlerAdvice 后的新应用程序文件,但这会在 MessagingGatewaySupport 创建 Correlator 时给出错误“'beanFactory' must not be null”。
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args)
SpringApplication.run(PubSubWebFluxApplication.class, args);
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper)
return new JacksonPubSubMessageConverter(objectMapper);
@Bean
public MessageChannel pubSubOutputChannel()
return MessageChannels.flux().get();
@Bean
public MessageChannel replyChannel()
return MessageChannels.flux().get();
@Bean
public MessageChannel errorChannel()
return MessageChannels.flux().get();
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "expressionAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate)
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint()
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
endpoint.setReplyChannel(replyChannel());
endpoint.setErrorChannel(errorChannel());
return endpoint;
@Bean
public Advice expressionAdvice()
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannel(replyChannel());
advice.setFailureChannel(errorChannel());
return advice;
发送 http 请求后出现的错误堆栈跟踪:
2020-05-04 16:23:47.371 ERROR 59089 --- [ctor-http-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [fd79ecbb-1] 500 Server Error for HTTP POST "/createPerson"
java.lang.IllegalArgumentException: 'beanFactory' must not be null
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ HTTP POST "/createPerson" [ExceptionHandlingWebHandler]
Stack trace:
at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.channel.ChannelUtils.getErrorHandler(ChannelUtils.java:52) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.endpoint.ReactiveStreamsConsumer.onInit(ReactiveStreamsConsumer.java:126) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.context.IntegrationObjectSupport.afterPropertiesSet(IntegrationObjectSupport.java:214) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.registerReplyMessageCorrelatorIfNecessary(MessagingGatewaySupport.java:799) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessageReactive(MessagingGatewaySupport.java:602) ~[spring-integration-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
【问题讨论】:
【参考方案1】:感谢@Artem。我通过提供自定义请求处理程序建议解决了这个问题,该建议在成功场景中从消息头中识别 replyChannel 并发送消息有效负载以响应 webflux 端点。
对于错误场景,我依赖 ReactiveStreamsConsumer 的错误处理机制,它在内部使用 errorChannel 将错误发送回 webflux 端点。
请告知这个实现是否正确。
下面是 PubSubRequestHandlerAdvice 的代码:
package com.example;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.message.AdviceMessage;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
public class PubSubRequestHandlerAdvice extends AbstractRequestHandlerAdvice
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message)
Object result = callback.execute();
Object evalResult = message.getPayload();
MessageChannel successChannel = null;
Object replyChannelHeader = message.getHeaders().getReplyChannel();
if (replyChannelHeader instanceof MessageChannel)
successChannel = (MessageChannel) replyChannelHeader;
if (evalResult != null && successChannel != null)
AdviceMessage<?> resultMessage = new AdviceMessage<>(evalResult, message);
this.messagingTemplate.send(successChannel, resultMessage);
return result;
将 PubSubRequestHandlerAdvice 用于 PubSubMessageHandler 的最终应用程序文件。
package com.example;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.aopalliance.aop.Advice;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.outbound.PubSubMessageHandler;
import org.springframework.cloud.gcp.pubsub.support.converter.JacksonPubSubMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.inbound.RequestMapping;
import org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
/**
* Entry point into the sample application.
*
* @author Piyush Garg
*/
@SpringBootApplication
public class PubSubWebFluxApplication
private static final Log LOGGER = LogFactory.getLog(PubSubWebFluxApplication.class);
private static final String TOPIC_NAME = "person";
public static void main(String[] args)
SpringApplication.run(PubSubWebFluxApplication.class, args);
/**
* bean to deserialize request payload.
*/
@Bean
public JacksonPubSubMessageConverter jacksonPubSubMessageConverter(ObjectMapper objectMapper)
return new JacksonPubSubMessageConverter(objectMapper);
@Bean
public MessageChannel pubSubOutputChannel()
return MessageChannels.flux().get();
/**
* Message handler which will consume messages from message channel.
* Then it will send google cloud pubsub topic.
*/
@Bean
@ServiceActivator(
inputChannel = "pubSubOutputChannel",
adviceChain = "pubSubAdvice"
)
public MessageHandler messageSender(PubSubTemplate pubSubTemplate)
PubSubMessageHandler handler = new PubSubMessageHandler(pubSubTemplate, TOPIC_NAME);
handler.setSync(true);
return handler;
/**
* Webflux endpoint to consume http request.
*/
@Bean
public WebFluxInboundEndpoint webFluxInboundEndpoint()
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setMethods(HttpMethod.POST);
requestMapping.setConsumes(MediaType.APPLICATION_FORM_URLENCODED_VALUE);
requestMapping.setPathPatterns("/createPerson");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannel(pubSubOutputChannel());
return endpoint;
@Bean
public Advice pubSubAdvice()
return new PubSubRequestHandlerAdvice();
应用程序的工作副本可在此处获得:https://github.com/piyushpcegarg/spring-gcp-pubsub-webflux-sample
【讨论】:
【参考方案2】:PubSubMessageHandler
不是为请求/回复行为而设计的。
在大多数情况下,它被用作 send-n-forget。
由于您真的担心成功/失败回复,我只能建议如下:
PubSubMessageHandler.setSync(true)
:
/**
* Set publish method to be synchronous or asynchronous.
*
* <p>Publish is asynchronous be default.
* @param sync true for synchronous, false for asynchronous
*/
public void setSync(boolean sync)
这样,您的PubSubMessageHandler
将等待pubsubFuture.get();
,如果失败,将抛出MessageHandlingException
。
要处理此sync
场景的成功或失败,我建议查看ExpressionEvaluatingRequestHandlerAdvice
及其successChannel
和failureChannel
。
在onSuccessExpression
上我认为应该#root
指向requestMessage
。
onFailureExpression
可以查询#exception
SpEL 表达式变量,但仍将requestMessage
传播到failureChannel
。我之所以谈论requestMessage
,是因为它具有重要的replyChannel
来响应WebFluxInboundEndpoint
请求。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain
那些successChannel
和failureChannel
以及失败的子流应该正确回复,一些返回将它们的outputChannel
留空。
但同时我完全同意将PubSubMessageHandler
设置为AbstractReplyProducingMessageHandler
返回一些ListenableFuture
让我们处理发布结果会容易得多。
【讨论】:
感谢@Artem 的建议。我已经创建了另外 2 个 FluxMessageChannel 作为 replyChannel() 和 errorChannel()。然后将这些通道附加到 WebFluxEndPoint 和 expressionAdvice。但是在发送 http 请求时,MessagingGatewaySupport 会创建 Correlator,但由于 beanFactory 为空而失败。在问题中附加了新的应用程序文件和堆栈跟踪以供参考。请指教,我是否正确配置了 expressionAdvice。 不,您不需要endpoint.setReplyChannel(replyChannel());
,因此不需要专用通道bean。您只需要依赖自动创建的 replyChannel
标头,它是一个 TemporaryReplyChannel
实例。您的 ExpressionEvaluatingRequestHandlerAdvice
应该发送到其他不会回复任何内容的频道 - 基本上是提到的 replyChannel
标头。您也不能依赖默认的Message::getPayload
,因为您需要来自请求消息的标头,这些标头将包含提到的replyChannel
。
正如我所说:现有的PubSubMessageHandler
行为将是一项艰巨的任务。您可以考虑将PublishSubscribeChannel
与PubSubMessageHandler
作为第一个订阅者,将其他内容作为success
pub/sub 发布的第二个订阅者。
对于错误变体,我真的会留在您的endpoint.setErrorChannel(errorChannel())
,但您仍然需要处理该错误并将合理的内容返回到相同的replyChannel
标头中。以上是关于将消息推送到谷歌云发布订阅后如何返回对 webflux 端点的响应?的主要内容,如果未能解决你的问题,请参考以下文章