IllegalStateException:无法订阅。处理器已终止

Posted

技术标签:

【中文标题】IllegalStateException:无法订阅。处理器已终止【英文标题】:IllegalStateException: Cannot Subscrie.Processor is already terminated 【发布时间】:2021-09-18 16:07:25 【问题描述】:

我创建了一个新的 eventthub 并尝试将消息发布到 eventHubA。当我尝试向 eventthub 发送消息时,出现以下错误:

java.lang.IllegalStateException: 命名空间[xxxxx] entityPath[xxxxx]: 无法订阅。处理器已终止于 com.azure.core.amqp.implementation.AmqpChannelProcessor.subscribe(AmqpChannelProcessor.java:217)

下面是我正在使用的代码sn-p:

public void send(Response response) 
        String responseInString = JsonHandlingUtil.objectToJsonString(response);

        EventData eventData = new EventData(responseInString);
        // create a batch
        EventDataBatch eventDataBatch = producer.createBatch();

        // try to add the event from the array to the batch
        if (!eventDataBatch.tryAdd(eventData)) 
            // if the batch is full, send it and then create a new batch
            producer.send(eventDataBatch);
            eventDataBatch = producer.createBatch();

            // Try to add that event that couldn't fit before.
            if (!eventDataBatch.tryAdd(eventData)) 
                throw new IllegalArgumentException("Event is too large for an empty batch. Max size: "
                        + eventDataBatch.getMaxSizeInBytes());
            
        
        // send the last batch of remaining events
        if (eventDataBatch.getCount() > 0) 
            producer.send(eventDataBatch);
        
        producer.close();
    

我已将 eventthubProducerClient 定义为 Bean。

 @Bean
    public EventHubProducerClient eventHubProducerClient() 
        return new EventHubClientBuilder()
                .transportType(AmqpTransportType.AMQP_WEB_SOCKETS)
                .connectionString(connectionString, eventHubName)
                .buildProducerClient();
    

以下是我的 gradle 依赖项

>    //eventhub
>         implementation 'com.azure:azure-messaging-eventhubs:5.7.0'
>         implementation group: 'io.projectreactor', name: 'reactor-core', version: '3.4.6'

【问题讨论】:

您的生产者客户的生命周期是多久? bean 是代表应用程序范围内的单例实例还是瞬态的?我注意到您在 send 方法中调用了 producer.close(),如果 bean 将其作为单例进行管理,这将使您的生产者实例无效。 您用来发送的模式与您的场景看起来很奇怪;您正在创建一个批处理来保存单个事件,但管理该批处理就好像它处于产生多个事件的循环中一样。由于您发送的是单个事件,因此您可能需要考虑不使用批处理,而是只调用接受一组消息的重载。 @JesseSquire 可能是你能用你想说的例子或小代码 sn-p 解释一下吗? 我假设这是对我的第二条评论的回应。具体来说,我说的是这种过载:send(Iterable<EventData> events) docs:docs.microsoft.com/en-us/java/api/… 【参考方案1】:

从follow-up 的问题来看,似乎根本原因已被确认为send 方法中的producer.close() 调用。

由于他的生产者似乎被应用程序作为单例管理,因此缓解措施是在不再发布事件时调用close,例如在应用程序关闭时。

【讨论】:

以上是关于IllegalStateException:无法订阅。处理器已终止的主要内容,如果未能解决你的问题,请参考以下文章

java.lang.IllegalStateException:扫描引擎未启动。无法执行任务

无法捕捉 MediaPlayer 的 IllegalStateException

IllegalStateException:在 onSaveInstanceState 和 onActivityResult 之后无法执行此操作

出现异常“IllegalStateException:onSaveInstanceState 后无法执行此操作”

java.lang.IllegalStateException:无法在 localhost:49167 连接到 Ryuk

java.lang.IllegalStateException(onSaveInstanceState 后无法执行此操作)