Dispatcher 没有频道订阅者 - spring-cloud-stream-kafka
Posted
技术标签:
【中文标题】Dispatcher 没有频道订阅者 - spring-cloud-stream-kafka【英文标题】:Dispatcher has no subscribers for channel - spring-cloud-stream-kafka 【发布时间】:2018-09-17 06:32:52 【问题描述】:升级到 Spring Boot 2、Reactor 3.5、kafka-binder 2.0.0 RELEASE 和 kafka-client 1.0.1 后,其中一个模块不起作用。我花了 5 天时间完成它并阅读了可能所有相关的主题,但找不到这种行为的原因。
主类:
@Slf4j
@EnableI18N
@EnableSideBar
@ComponentScan
@SpringBootConfiguration
@EnableConfigurationProperties
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class View
public static void main(String[] args)
new SpringApplicationBuilder(View.class)
.profiles("production")
.bannerMode(Banner.Mode.OFF)
.headless(true)
.application()
.run(args);
log.info("\nhttp://localhost:8083/\n");
配置标记:
@Configuration
@Profile("production")
@EnableBinding(OffersChannel.class)
class ProductionOffersConfiguration
频道界面:
public interface OffersChannel
String OFFERS_OBTAIN = "offersObtain";
String OFFERS_OBTAIN_REQUEST= "offersObtainRequest";
@Input(OFFERS_OBTAIN)
SubscribableChannel offersChannel();
@Output(OFFERS_OBTAIN_REQUEST)
MessageChannel offersRequestChannel();
还有 AdminUI 类,在更新依赖项之前重要的是,当这个类被初始化时,我能够看到一个登录控制台,上面说我订阅了频道,现在什么都没有发生:
@Slf4j
@Push
@Theme("$view.default-theme")
@SpringUI(path = WebsiteMapping.ADMIN)
@RequiredArgsConstructor
public class AdminUI extends UI
MessageChannel offersObtainRequest;
Grid<Ad> adGrid = createAdGrid();
private List<Ad> ads = new LinkedList<>();
ConnectionService connectionService;
@Override
@SneakyThrows
protected void init(VaadinRequest vaadinRequest)
setContent(splitPane());
adGrid.setItems(ads);
private VerticalSplitPanel splitPane()
VerticalSplitPanel verticalSplitPanel = new VerticalSplitPanel();
verticalSplitPanel.setSplitPosition(10, Unit.PERCENTAGE);
verticalSplitPanel.setFirstComponent(buttonsLayout());
verticalSplitPanel.setSecondComponent(adGrid);
return verticalSplitPanel;
private Layout buttonsLayout()
HorizontalLayout layout = new HorizontalLayout();
layout.setMargin(true);
layout.addComponent(requestMoreOffersButton());
ThemeSelectorComboBox themeSelectorComboBox = new ThemeSelectorComboBox();
layout.addComponent(themeSelectorComboBox);
layout.setComponentAlignment(themeSelectorComboBox, Alignment.MIDDLE_RIGHT);
return layout;
@PostConstruct
private void createGridProperties()
adGrid.setSizeFull();
adGrid.addColumn(Ad::getTitle).setCaption("Title");
adGrid.addColumn(Ad::getLocation).setCaption("Location");
adGrid.addColumn(Ad::getHref).setCaption("Href");
@StreamListener
public void fetchAdsFrom(@Input(OffersChannel.OFFERS_OBTAIN) Flux<Ad> fluxAd)
fluxAd.subscribe(this::displayOfferInGrid);
private void displayOfferInGrid(Ad ad)
ads.add(ad);
adGrid.setItems(ads);
private Button requestMoreOffersButton()
return new Button("Request 10 more offers", this::requestMoreOffers);
private Button startServiceButton(String caption, String url, String message)
return new Button(caption, buttonClickedEvent -> startService(url, message));
private void startService(String url, String message)
ConnectionRequest startProviderRequest = ConnectionRequest
.builder()
.url(url)
.build();
connectionService
.getForhtml(startProviderRequest)
.thenAccept(serviceStarted -> Notification.show(message));
private void requestMoreOffers(Event event)
offersObtainRequest.send(new GenericMessage(new AdBroadcastRequest(ads.size(),10)));
private Grid<Ad> createAdGrid()
return new Grid<>();
Application.yml:
spring:
cloud:
stream:
bindings:
offersObtainRequest:
destination: adsBroadcastRequestes
binder: kafka
group: adsBroadcastRequestsProducer
offersObtain:
destination: adsBroadcast
binder: kafka
group: adsConsumer
堆栈跟踪:
2018-04-07 11:27:12.010 DEBUG o.s.retry.support.RetryTemplate : Retry: count=0
2018-04-07 11:27:12.010 DEBUG o.s.integration.channel.DirectChannel : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers=kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984]
2018-04-07 11:27:12.010 DEBUG o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 1000
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate : Retry: count=1
2018-04-07 11:27:13.011 DEBUG o.s.integration.channel.DirectChannel : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers=kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984]
2018-04-07 11:27:13.011 DEBUG o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 2000
2018-04-07 11:27:13.909 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Sending Heartbeat request to coordinator Kacper-PC:9092 (id: 2147483647 rack: null)
2018-04-07 11:27:14.138 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Received successful Heartbeat response
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Retry: count=2
2018-04-07 11:27:15.011 DEBUG o.s.integration.channel.DirectChannel : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers=kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984]
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
2018-04-07 11:27:15.012 DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[227], headers=kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.offersObtain'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[227], headers=kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) [spring-retry-1.2.2.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
... 23 common frames omitted
【问题讨论】:
【参考方案1】:嗯,那里发生了很多事情。我的意思是所有非 Spring-Cloud-Stream 的东西。 . .让人难以跟上。
无论如何,您的 @StreamListener
没有在任何 Spring 托管配置类中定义,因此不会被拾取。您可以将其移动到 ProductionOffersConfiguration
或 View
或任何其他 Spring 托管的配置类。
另外,考虑阅读这个快速教程https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start(5 分钟),以更好地了解 spring-cloud-stream 的机制
【讨论】:
哦,它有效!所以Vaadin一定有问题,非常感谢! :)【参考方案2】:在我的例子中,我用 @Input 通道声明了 spring-cloud-steam 接口
@Input("in")
SubscribableChannel inbound();
但是我没有使用任何@StreamListener 来处理这个目标,所以结果我得到了这样的错误
MessageDeliveryException: Dispatcher has no subscribers for channel
解决方案:我删除了@Input 声明,只留下@Output("out") 一个。
仅供参考:我的输入配置在其他单独的服务中。
【讨论】:
【参考方案3】:您可以使用@StreamListener
注释,但在注释中带有目标参数。
@StreamListener(target = OffersChannel.OFFERS_OBTAIN)
public void fetchAdsFrom(@Payload Flux<Ad> fluxAd, @Header final String someHeader)
fluxAd.subscribe(this::displayOfferInGrid);
确实为我工作,与您的设置类似(而不是 Flux,我收到 JSON - 字符串)。
可能是您必须为 Flux 消息编写自己的转换器,我不确定。
【讨论】:
【参考方案4】:我花了几个小时试图找出“调度程序没有订阅者”错误背后的真正问题。
在我的情况下,问题是 Kafka 存储了一些历史节点 ID (1001),而所有主题都使用 1003 作为领导者 ID,这就是无法发送消息的原因。必须手动擦除所有这些才能使其正常工作。
虽然作为快速修复它非常适合我的本地设置,但在生产环境中不太可能是正确的方法。对于永久解决方案(为您的代理设置静态 id),您可以尝试check out this answer。
【讨论】:
以上是关于Dispatcher 没有频道订阅者 - spring-cloud-stream-kafka的主要内容,如果未能解决你的问题,请参考以下文章