Spring Cloud Stream 动态通道
Posted
技术标签:
【中文标题】Spring Cloud Stream 动态通道【英文标题】:Spring Cloud Stream dynamic channels 【发布时间】:2017-05-31 05:41:50 【问题描述】:我正在使用 Spring Cloud Stream 并希望以编程方式创建和绑定频道。我的用例是在应用程序启动期间,我收到要订阅的 Kafka 主题的动态列表。然后如何为每个主题创建一个频道?
【问题讨论】:
您可以在这里查看类似问题的答案:***.com/questions/40485421/… 这个答案是针对传出消息的。我需要传入的:( 你找到答案了吗?我有同样的问题。如果你能指出我正确的方向,那就太好了。谢谢 @CCC,不,我没有。我的要求已经改变,所以对我来说不再是问题。 【参考方案1】:我最近遇到了类似的情况,下面是我动态创建订阅者频道的示例。
ConsumerProperties consumerProperties = new ConsumerProperties();
consumerProperties.setMaxAttempts(1);
BindingProperties bindingProperties = new BindingProperties();
bindingProperties.setConsumer(consumerProperties);
bindingProperties.setDestination(retryTopic);
bindingProperties.setGroup(consumerGroup);
bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
beanFactory.registerSingleton(consumerName, channel);
channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
bindingService.bindConsumer(channel, consumerName);
channel.subscribe(consumerMessageHandler);
【讨论】:
你能分享完整的源代码吗? @sash,请告诉您在哪里找到此代码?对你有用吗? @YanKhonski 抱歉,但我已经没有实际的源代码了 :( 在调试并了解如何创建消费者之后,我确实写了上面的内容。我会在时间允许时尝试重新创建它。跨度> 当然,没问题,我解决了它并发布了我的解决方案。无论如何,如果你还记得,请分享。【参考方案2】:我必须为Camel Spring Cloud Stream 组件做类似的事情。
也许绑定目的地的消费者代码“真的只是一个String
指示频道名称”对您有用吗?
在我的例子中,我只绑定一个目的地,但我不认为它在概念上对于多个目的地有很大不同。
以下是它的要点:
@Override
protected void doStart() throws Exception
SubscribableChannel bindingTarget = createInputBindingTarget();
bindingTarget.subscribe(message ->
// have your way with the received incoming message
);
endpoint.getBindingService().bindConsumer(bindingTarget,
endpoint.getDestination());
// at this point the binding is done
/**
* Create a @link SubscribableChannel and register in the
* @link org.springframework.context.ApplicationContext
*/
private SubscribableChannel createInputBindingTarget()
SubscribableChannel channel = endpoint.getBindingTargetFactory()
.createInputChannel(endpoint.getDestination());
endpoint.getBeanFactory().registerSingleton(endpoint.getDestination(), channel);
channel = (SubscribableChannel) endpoint.getBeanFactory().initializeBean(channel,
endpoint.getDestination());
return channel;
请参阅here 了解完整来源以了解更多上下文。
【讨论】:
【参考方案3】:我有一个任务,我事先不知道主题。我通过一个输入通道来解决这个问题,该通道可以监听我需要的所有主题。
https://docs.spring.io/spring-cloud-stream/docs/***lyn.RELEASE/reference/html/_configuration_options.html
目的地
绑定中间件上通道的目标目的地(例如,RabbitMQ 交换或 Kafka 主题)。如果通道绑定为消费者,它可以绑定到多个目的地,并且目的地名称可以指定为逗号分隔的字符串值。如果未设置,则使用频道名称。
所以我的配置
spring:
cloud:
stream:
default:
consumer:
concurrency: 2
partitioned: true
bindings:
# inputs
input:
group: application_name_group
destination: topic-1,topic-2
content-type: application/json;charset=UTF-8
然后我定义了一个消费者来处理来自所有这些主题的消息。
@Component
@EnableBinding(Sink.class)
public class CommonConsumer
private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);
@StreamListener(target = Sink.INPUT)
public void consumeMessage(final Message<Object> message)
logger.info("Received a message: \nmessage:\n", message.getPayload());
// Here I define logic which handles messages depending on message headers and topic.
// In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
请注意,在您的情况下,它可能不是解决方案。我需要将消息转发到 webhook,所以我可以进行配置映射。
我也想过其他的想法。 1) 你没有 Spring Cloud 的 kafka 客户端消费者。
2) 创建预定义数量的输入,例如 50 个。
input-1
intput-2
...
intput-50
然后对其中一些输入进行配置。
相关讨论
Spring cloud stream to support routing messages dynamically https://github.com/spring-cloud/spring-cloud-stream/issues/690 https://github.com/spring-cloud/spring-cloud-stream/issues/1089我们使用 Spring Cloud 2.1.1 RELEASE
【讨论】:
【参考方案4】:MessageChannel messageChannel = createMessageChannel(channelName);
messageChannel.send(getMessageBuilder().apply(data));
public MessageChannel createMessageChannel(String channelName)
return (MessageChannel) applicationContext.getBean(channelName);
public Function<Object, Message<Object>> getMessageBuilder()
return payload -> MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build();
【讨论】:
请不要只发布代码作为答案,还要解释您的代码的作用以及它如何解决问题的问题。带有解释的答案通常更有帮助、质量更好,并且更有可能吸引投票。【参考方案5】:对于传入的消息,您可以显式使用BinderAwareChannelResolver
来动态解析目的地。你可以检查这个example router
sink 使用 binder 感知通道解析器。
【讨论】:
我不明白。我想订阅我只在运行时才知道名称的主题。我不想发送/路由消息。 好的,对不起;我误解了。dynamic
目标支持仅用于绑定生产者。我相信这个功能还有待解决和跟踪:github.com/spring-cloud/spring-cloud-stream/issues/746
@IlayaperumalGopinathan,你知道这个问题是否得到解决?以上是关于Spring Cloud Stream 动态通道的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑
Spring Cloud Stream在同一通道根据消息内容分发不同的消费逻辑