Spring Cloud Stream 动态通道

Posted

技术标签:

【中文标题】Spring Cloud Stream 动态通道【英文标题】:Spring Cloud Stream dynamic channels 【发布时间】:2017-05-31 05:41:50 【问题描述】:

我正在使用 Spring Cloud Stream 并希望以编程方式创建和绑定频道。我的用例是在应用程序启动期间,我收到要订阅的 Kafka 主题的动态列表。然后如何为每个主题创建一个频道?

【问题讨论】:

您可以在这里查看类似问题的答案:***.com/questions/40485421/… 这个答案是针对传出消息的。我需要传入的:( 你找到答案了吗?我有同样的问题。如果你能指出我正确的方向,那就太好了。谢谢 @CCC,不,我没有。我的要求已经改变,所以对我来说不再是问题。 【参考方案1】:

我最近遇到了类似的情况,下面是我动态创建订阅者频道的示例。

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);

【讨论】:

你能分享完整的源代码吗? @sash,请告诉您在哪里找到此代码?对你有用吗? @YanKhonski 抱歉,但我已经没有实际的源代码了 :( 在调试并了解如何创建消费者之后,我确实写了上面的内容。我会在时间允许时尝试重新创建它。跨度> 当然,没问题,我解决了它并发布了我的解决方案。无论如何,如果你还记得,请分享。【参考方案2】:

我必须为Camel Spring Cloud Stream 组件做类似的事情。 也许绑定目的地的消费者代码“真的只是一个String 指示频道名称”对您有用吗?

在我的例子中,我只绑定一个目的地,但我不认为它在概念上对于多个目的地有很大不同。

以下是它的要点:

    @Override
    protected void doStart() throws Exception 
        SubscribableChannel bindingTarget = createInputBindingTarget();
        bindingTarget.subscribe(message -> 
            // have your way with the received incoming message
        );

        endpoint.getBindingService().bindConsumer(bindingTarget,
                endpoint.getDestination());

       // at this point the binding is done
    

    /**
     * Create a @link SubscribableChannel and register in the
     * @link org.springframework.context.ApplicationContext
     */
    private SubscribableChannel createInputBindingTarget() 
        SubscribableChannel channel = endpoint.getBindingTargetFactory()
                .createInputChannel(endpoint.getDestination());
        endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
        channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
                endpoint.getDestination());
        return channel;
    

请参阅here 了解完整来源以了解更多上下文。

【讨论】:

【参考方案3】:

我有一个任务,我事先不知道主题。我通过一个输入通道来解决这个问题,该通道可以监听我需要的所有主题。

https://docs.spring.io/spring-cloud-stream/docs/***lyn.RELEASE/reference/html/_configuration_options.html

目的地

绑定中间件上通道的目标目的地(例如,RabbitMQ 交换或 Kafka 主题)。如果通道绑定为消费者,它可以绑定到多个目的地,并且目的地名称可以指定为逗号分隔的字符串值。如果未设置,则使用频道名称。

所以我的配置

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

然后我定义了一个消费者来处理来自所有这些主题的消息。

@Component
@EnableBinding(Sink.class)
public class CommonConsumer 

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) 
        logger.info("Received a message: \nmessage:\n", message.getPayload());
        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    

请注意,在您的情况下,它可能不是解决方案。我需要将消息转发到 webhook,所以我可以进行配置映射。

我也想过其他的想法。 1) 你没有 Spring Cloud 的 kafka 客户端消费者。

2) 创建预定义数量的输入,例如 50 个。

input-1
intput-2
...
intput-50

然后对其中一些输入进行配置。

相关讨论

Spring cloud stream to support routing messages dynamically https://github.com/spring-cloud/spring-cloud-stream/issues/690 https://github.com/spring-cloud/spring-cloud-stream/issues/1089

我们使用 Spring Cloud 2.1.1 RELEASE

【讨论】:

【参考方案4】:
MessageChannel messageChannel = createMessageChannel(channelName);
messageChannel.send(getMessageBuilder().apply(data));

public MessageChannel createMessageChannel(String channelName) 
return (MessageChannel) applicationContext.getBean(channelName);

public Function<Object, Message<Object>> getMessageBuilder() 
return payload -> MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();

【讨论】:

请不要只发布代码作为答案,还要解释您的代码的作用以及它如何解决问题的问题。带有解释的答案通常更有帮助、质量更好,并且更有可能吸引投票。【参考方案5】:

对于传入的消息,您可以显式使用BinderAwareChannelResolver 来动态解析目的地。你可以检查这个example router sink 使用 binder 感知通道解析器。

【讨论】:

我不明白。我想订阅我只在运行时才知道名称的主题。我不想发送/路由消息。 好的,对不起;我误解了。 dynamic 目标支持仅用于绑定生产者。我相信这个功能还有待解决和跟踪:github.com/spring-cloud/spring-cloud-stream/issues/746 @IlayaperumalGopinathan,你知道这个问题是否得到解决?

以上是关于Spring Cloud Stream 动态通道的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑

Spring Cloud Stream消息JSON转换无效

Spring Cloud Stream的分区和分组

使用spring cloud stream kafka动态改变instanceindex

Spring Cloud Stream教程编程模型