IllegalStateException:无法订阅。处理器已终止
Posted
技术标签:
【中文标题】IllegalStateException:无法订阅。处理器已终止【英文标题】:IllegalStateException: Cannot Subscrie.Processor is already terminated 【发布时间】:2021-09-18 16:07:25 【问题描述】:我创建了一个新的 eventthub 并尝试将消息发布到 eventHubA。当我尝试向 eventthub 发送消息时,出现以下错误:
java.lang.IllegalStateException: 命名空间[xxxxx] entityPath[xxxxx]: 无法订阅。处理器已终止于 com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:217)
下面是我正在使用的代码sn-p:
public void send(Response response)
String responseInString = JsonHandlingUtil.objectToJsonString(response);
EventData eventData = new EventData(responseInString);
// create a batch
EventDataBatch eventDataBatch = producer.createBatch();
// try to add the event from the array to the batch
if (!eventDataBatch.tryAdd(eventData))
// if the batch is full, send it and then create a new batch
producer.send(eventDataBatch);
eventDataBatch = producer.createBatch();
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData))
throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
+ eventDataBatch.getMaxSizeInBytes());
// send the last batch of remaining events
if (eventDataBatch.getCount() > 0)
producer.send(eventDataBatch);
producer.close();
我已将 eventthubProducerClient 定义为 Bean。
@Bean
public EventHubProducerClient eventHubProducerClient()
return new EventHubClientBuilder()
.transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
.connectionString(connectionString, eventHubName)
.buildProducerClient();
以下是我的 gradle 依赖项
> //eventhub
> implementation 'com.azure:azure-messaging-eventhubs:5.7.0'
> implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.4.6'
【问题讨论】:
您的生产者客户的生命周期是多久? bean 是代表应用程序范围内的单例实例还是瞬态的?我注意到您在send
方法中调用了 producer.close()
,如果 bean 将其作为单例进行管理,这将使您的生产者实例无效。
您用来发送的模式与您的场景看起来很奇怪;您正在创建一个批处理来保存单个事件,但管理该批处理就好像它处于产生多个事件的循环中一样。由于您发送的是单个事件,因此您可能需要考虑不使用批处理,而是只调用接受一组消息的重载。
@JesseSquire 可能是你能用你想说的例子或小代码 sn-p 解释一下吗?
我假设这是对我的第二条评论的回应。具体来说,我说的是这种过载:send(Iterable<EventData> events)
docs:docs.microsoft.com/en-us/java/api/…
【参考方案1】:
从follow-up 的问题来看,似乎根本原因已被确认为send
方法中的producer.close()
调用。
由于他的生产者似乎被应用程序作为单例管理,因此缓解措施是在不再发布事件时调用close
,例如在应用程序关闭时。
【讨论】:
以上是关于IllegalStateException:无法订阅。处理器已终止的主要内容,如果未能解决你的问题,请参考以下文章
java.lang.IllegalStateException:扫描引擎未启动。无法执行任务
无法捕捉 MediaPlayer 的 IllegalStateException
IllegalStateException:在 onSaveInstanceState 和 onActivityResult 之后无法执行此操作
出现异常“IllegalStateException:onSaveInstanceState 后无法执行此操作”
java.lang.IllegalStateException:无法在 localhost:49167 连接到 Ryuk
java.lang.IllegalStateException(onSaveInstanceState 后无法执行此操作)