@MessagingGateway 如何配置 Spring Cloud Stream MessageChannels?

Posted

技术标签:

【中文标题】@MessagingGateway 如何配置 Spring Cloud Stream MessageChannels?【英文标题】:How can @MessagingGateway be configured with Spring Cloud Stream MessageChannels? 【发布时间】:2018-05-27 18:49:03 【问题描述】:

我已经开发了异步 Spring Cloud Stream 服务,我正在尝试开发一个边缘服务,它使用 @MessagingGateway 来提供对本质上是异步的服务的同步访问。

我目前正在获取以下堆栈跟踪:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

我的@MessagingGateway:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService 
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);

如果我通过@StreamListener 使用回复通道上的消息,它就可以正常工作:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = @HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE"), ignoreExceptions = ClientException.class)
  @StreamListener(AccountChannels.ACCOUNT_CREATED)
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) 
    try 
      if (log.isInfoEnabled()) 
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
      
     catch (JsonProcessingException e) 
      log.error(e.getMessage(), e);
    
  

在生产者方面,我正在配置requiredGroups,以确保多个消费者可以处理消息,并且相应地,消费者具有匹配的group配置。

消费者:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created

制作人:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created

生产者端处理请求并发送响应的代码:

  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());

我可以调试,看到请求被接收和处理,但是当响应发送到回复通道时,就发生了错误。

要使@MessagingGateway 正常工作,我缺少哪些配置和/或代码?我知道我正在结合 Spring Integration 和 Spring Cloud Gateway,所以我不确定将它们一起使用是否会导致问题。

【问题讨论】:

【参考方案1】:

这是个好问题,也是个好主意。但这不会那么容易。

首先我们必须自己确定gateway 表示request/reply,因此correlation。这在@MessagingGateway 中通过replyChannel 标头在TemporaryReplyChannel 实例面前​​可用。即使您有明确的replyChannel = AccountChannels.ACCOUNT_CREATED,关联也只能通过提到的标头及其值完成。事实上,这个TemporaryReplyChannel 是不可序列化的,也不能通过网络传输给另一端的消费者。

幸运的是 Spring Integration 为我们提供了一些解决方案。它是HeaderEnricher 及其headerChannelsToString 选项的一部分,位于HeaderChannelRegistry 后面:

从 Spring Integration 3.0 开始,一个新的子元素 <int:header-channels-to-string/> 可用;它没有属性。这会将现有的 replyChannel 和 errorChannel 标头(当它们是 MessageChannel 时)转换为 String 并将通道存储在注册表中,以便以后在发送回复或处理错误时进行解析。这对于标头可能丢失的情况很有用;例如,将消息序列化到消息存储中或通过 JMS 传输消息时。如果标头不存在,或者它不是 MessageChannel,则不会进行任何更改。

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher

但在这种情况下,您必须引入从网关到HeaderEnricher 的内部通道,并且只有最后一个通道会将消息发送到AccountChannels.CREATE_ACCOUNT_REQUEST。因此,replyChannel 标头将被转换为字符串表示形式并能够通过网络传输。在消费者方面,当您发送回复时,您应该确保您也传输该 replyChannel 标头,原样。因此,当消息到达生产者端的AccountChannels.ACCOUNT_CREATED(我们有@MessagingGateway)时,相关机制能够将通道标识符转换为正确的TemporaryReplyChannel,并将回复与等待网关调用相关联.

这里唯一的问题是您的生产者应用程序必须是 AccountChannels.ACCOUNT_CREATED 组中的单个消费者 - 我们必须确保云中一次只有一个实例在运行。仅仅因为只有一个实例的内存中有TemporaryReplyChannel

更多网关信息:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway

更新

一些帮助代码:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService 
  @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);


@Bean
public IntegrationFlow headerEnricherFlow() 
   return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
            .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
            .channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
            .get();


更新

一些简单的应用程序来演示 PoC:

@EnableBinding( Processor.class, CloudStreamGatewayApplication.GatewayChannels.class )
@SpringBootApplication
public class CloudStreamGatewayApplication 

    interface GatewayChannels 

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();


        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    

    private static final String ENRICH = "enrich";


    @MessagingGateway
    public interface StreamGateway 

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    

    @Bean
    public IntegrationFlow headerEnricherFlow() 
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) 
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    


    public static void main(String[] args) 
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    


application.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

我使用spring-cloud-starter-stream-rabbit

MessageBuilder.withPayload(request.getPayload().toUpperCase())
            .copyHeaders(request.getHeaders())
            .build()

将请求标头复制到回复消息的技巧。因此,网关能够在回复端将标头中的通道标识符转换为适当的TemporaryReplyChannel,以便将回复正确地传达给网关的调用者。

SCSt 关于此事的问题:https://github.com/spring-cloud/spring-cloud-stream/issues/815

【讨论】:

感谢您的快速回复,Artem。我想我理解你所说的要点,但我想确保我理解。您是说我需要在 @MessagingGateway 旁边创建一个 HeaderEnricher bean?如果是这样,鉴于当前 requestChannel 是 AccountChannels.CREATE_ACCOUNT_REQUEST,我将如何配置其 inputChannel 和 outputChannel 属性? 另外,我不清楚 TemporaryReplyChannel 如何参与 HeaderEncricher bean 的创建。 网关创建TemporaryReplyChannel 并将其填充到它发送到requestChannel 的消息中。 requestChannel 必须作为 HeaderEncricher 的输入,这是内部的,而不是绑定目标。 HeaderEncricheroutputChannel 已经是 AccountChannels.CREATE_ACCOUNT_REQUEST 那么,如果我的 bean 有@Transformer(inputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST_HEADERS, outputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST),我应该指定哪个通道作为headerChannelRegistry.channelToChannelName(channel) 的参数?抱歉,如果我在这里遗漏了一些明显的东西。【参考方案2】:

在 Artem 的帮助下,我找到了我正在寻找的解决方案。我已将 Artem 发布的代码拆分为两个服务,一个网关服务和一个 CloudStream 服务。我还添加了一个@RestController 用于测试目的。这基本上模仿了我想要对持久队列做的事情。感谢 Artem 的协助!我真的很感谢你的时间!我希望这可以帮助其他想做同样事情的人。

网关代码

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;

import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding(GatewayApplication.GatewayChannels.class)
@SpringBootApplication
public class GatewayApplication 

  interface GatewayChannels 

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  

  @MessagingGateway
  public interface StreamGateway 
    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    String process(String payload);
  

  private static final String ENRICH = "enrich";

  public static void main(String[] args) 
    SpringApplication.run(GatewayApplication.class, args);
  

  @Bean
  public IntegrationFlow headerEnricherFlow() 
    return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  

  @RestController
  public class UppercaseController 
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/string",
        produces = MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE)
    public ResponseEntity<String> getUser(@PathVariable("string") String string) 
      return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
    
  


网关配置 (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          producer:
            required-groups: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          group: gateway-to-uppercase-reply
server:
  port: 8080

CloudStream 代码

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(CloudStreamApplication.CloudStreamChannels.class)
@SpringBootApplication
public class CloudStreamApplication 

  interface CloudStreamChannels 

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  

  public static void main(String[] args) 
    SpringApplication.run(CloudStreamApplication.class, args);
  

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public Message<?> process(Message<String> request) 
    return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        .copyHeaders(request.getHeaders()).build();
  


CloudStream 配置 (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          producer:
            required-groups: gateway-to-uppercase-reply
server:
  port: 8081

【讨论】:

它工作正常。但是如何将有效载荷转换为对象?【参考方案3】:

嗯,我有点困惑以及你想要完成的事情,但让我们看看我们是否能解决这个问题。 混合 SI 和 SCSt 绝对是自然的,因为一个建立在另一个之上,所以一切都应该工作: 这是一个示例代码 sn-p,我刚刚从一个旧示例中挖掘出来,该示例公开 REST 端点但委托(通过网关)到 Source 的输出通道。看看是否有帮助:

@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication 
    . . . 

    @Autowired
    private Source channels;

    @Autowired
    private CompletionService completionService;

    @RequestMapping("/complete")
    public String completeRequest(@RequestParam int id) 
        this.completionService.complete("foo");
        return "OK";
    

    @MessagingGateway
    interface CompletionService 
        @Gateway(requestChannel = Source.OUTPUT)
        void complete(String message);
    

【讨论】:

也请看我的回答。

以上是关于@MessagingGateway 如何配置 Spring Cloud Stream MessageChannels?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 SP 密钥库 spring saml 中的证书

VS2008 SP1如何解决单步问题

如何提高SQL Server的安全性控制

将消息发送到套接字端口并使用Spring Integration接收响应

SP_配置方案库存价

配置 saml-sample (SP) 以使用 Okta (IdP)