Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3

Posted 云中一盏灯,路边形影重

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3相关的知识,希望对你有一定的参考价值。

Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially

问题描述

开发Java Spring Cloud应用,需要发送消息到Azure Event Hub中。使用 Spring Cloud Stream Event Hubs Binder 依赖,应用执行一会就会遇见报错:reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.

 

问题解答

从错误来看,这明显是多线程并发处理时,多个线程同时触发了onSubscribe 或 onNext 或 onError 或 onComplete 事件,而这些事件在与 Subscriber处理时只能一个一个串行处理。

因为SpringCloud的 Controller 并发请求时,会分配多个线程同时调用many.emitNext(),这时如果之前请求线程处理还未结束,新请求的线程会直接这样的报错。

异常产生的代码为:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitFailureHandler.FAIL_FAST);  

Sinks 对 FAIL_FAST 和 FAIL_NON_SERIALIZED的枚举值说明参考:reactor-core/Sinks.java at main · reactor/reactor-core · GitHub

  • FAIL_FAST:表示对失败不会进行任何重试,会马上触发异常处理机制,这里就是抛出EmissionException异常。( A pre-made handler that will not instruct to retry any failure and trigger the failure handling immediately.)
  • FAIL_NON_SERIALIZED:表示会持续重试,直至成功。

如果发送到Event Hub的消息允许丢失,可以通过Try Catch捕获异常后记录日志即可。

但是,如果发送的消息不能丢失,必须成功传递到Event Hub中,就可以使用 FAIL_NON_SERIALIZED 模式。

修改为:

private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);  

 

 

参考资料

ReactorDispatcher: Making sure spec 1.3 is not violated and under race, signals are not lost upon concurrent ClosedChannelException : https://github.com/Azure/azure-sdk-for-java/issues/27320

FAIL NON SERIALIZED : 

https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java#L89

https://github.com/reactor/reactor-core/blob/main/reactor-core/src/main/java/reactor/core/publisher/Sinks.java#L118

/**
* Has successfully emitted the signal
*/
OK,
/**
* Has failed to emit the signal because the sink was previously terminated successfully or with an error
*/
FAIL_TERMINATED,
/**
* Has failed to emit the signal because the sink does not have buffering capacity left
*/
FAIL_OVERFLOW,
/**
* Has failed to emit the signal because the sink was previously interrupted by its consumer
*/
FAIL_CANCELLED,
/**
* Has failed to emit the signal because the access was not serialized
*/
FAIL_NON_SERIALIZED,
/**
* Has failed to emit the signal because the sink has never been subscribed to has no capacity
* to buffer the signal.
*/
FAIL_ZERO_SUBSCRIBER;

 

 

具有输入绑定的 Azure 函数的 Azure 事件中心存储容器配置

【中文标题】具有输入绑定的 Azure 函数的 Azure 事件中心存储容器配置【英文标题】:Azure Event Hub Storage Container Configuration for Azure Function with Input Binding 【发布时间】:2020-07-19 02:03:18 【问题描述】:

在处理带有输入绑定 Azure 函数的事件中心事件时,是否可以更改为事件中心分区检查点配置的存储帐户?

是否可以单独使用高级存储帐户(即与设置期间为 Azure 功能选择的帐户不同的存储帐户)执行此操作?

EventProcessorHost 似乎可以做到这一点,但函数似乎没有公开 EventProcessorHost 配置。

【问题讨论】:

【参考方案1】:

我为其他一些提议做了类似的修改。

首先,停止你的 Azure 函数

然后,创建新的高级存储帐户并复制现有的 blob 和容器(所有 eventthub 检查点文件具有相同的文件夹结构)。

然后将 Azure 函数存储帐户连接字符串更改为新的高级存储帐户。

然后启动你的 Azure 函数。

【讨论】:

我无法让它最终发挥作用。我无法将源 blob 容器的内容复制到高级 blob 容器中 - 既不是通过 azcopy 也不是通过存储资源管理器。如果我只更新天蓝色函数存储帐户连接字符串,该函数将永远不会再次启动,因为它抱怨不兼容。这就是我希望将事件中心客户端存储帐户与 azure 函数存储帐户分开配置的原因。 我在容器下手动创建了文件夹并复制了blob

以上是关于Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3的主要内容,如果未能解决你的问题,请参考以下文章

spring cloud

spring cloud

远程控制/监控 - Azure IoT中心

Spring Cloud Azure 4.0 GA – 实现 Spring 框架与 Azure 服务的无缝集成

Spring Cloud Azure 参考文档

SpringCloud 第七篇: 高可用的分布式配置中心(Spring Cloud Config)