Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3
Posted 云中一盏灯,路边形影重
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3相关的知识,希望对你有一定的参考价值。
问题描述
开发Java Spring Cloud应用,需要发送消息到Azure Event Hub中。使用 Spring Cloud Stream Event Hubs Binder 依赖,应用执行一会就会遇见报错:reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
问题解答
从错误来看,这明显是多线程并发处理时,多个线程同时触发了onSubscribe 或 onNext 或 onError 或 onComplete 事件,而这些事件在与 Subscriber处理时只能一个一个串行处理。
因为SpringCloud的 Controller 并发请求时,会分配多个线程同时调用many.emitNext(),这时如果之前请求线程处理还未结束,新请求的线程会直接这样的报错。
异常产生的代码为:
private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitFailureHandler.FAIL_FAST);
Sinks 对 FAIL_FAST 和 FAIL_NON_SERIALIZED的枚举值说明参考:reactor-core/Sinks.java at main · reactor/reactor-core · GitHub
- FAIL_FAST:表示对失败不会进行任何重试,会马上触发异常处理机制,这里就是抛出EmissionException异常。( A pre-made handler that will not instruct to retry any failure and trigger the failure handling immediately.)
- FAIL_NON_SERIALIZED:表示会持续重试,直至成功。
如果发送到Event Hub的消息允许丢失,可以通过Try Catch捕获异常后记录日志即可。
但是,如果发送的消息不能丢失,必须成功传递到Event Hub中,就可以使用 FAIL_NON_SERIALIZED 模式。
修改为:
private final Sinks.EmitFailureHandler emitFailureHandler = (signalType, emitResult) -> emitResult.equals(Sinks.EmitResult.FAIL_NON_SERIALIZED);
参考资料
ReactorDispatcher: Making sure spec 1.3 is not violated and under race, signals are not lost upon concurrent ClosedChannelException : https://github.com/Azure/azure-sdk-for-java/issues/27320
FAIL NON SERIALIZED :
/**
* Has successfully emitted the signal
*/
OK,
/**
* Has failed to emit the signal because the sink was previously terminated successfully or with an error
*/
FAIL_TERMINATED,
/**
* Has failed to emit the signal because the sink does not have buffering capacity left
*/
FAIL_OVERFLOW,
/**
* Has failed to emit the signal because the sink was previously interrupted by its consumer
*/
FAIL_CANCELLED,
/**
* Has failed to emit the signal because the access was not serialized
*/
FAIL_NON_SERIALIZED,
/**
* Has failed to emit the signal because the sink has never been subscribed to has no capacity
* to buffer the signal.
*/
FAIL_ZERO_SUBSCRIBER;
当在复杂的环境中面临问题,格物之道需:浊而静之徐清,安以动之徐生。 云中,恰是如此!
具有输入绑定的 Azure 函数的 Azure 事件中心存储容器配置
【中文标题】具有输入绑定的 Azure 函数的 Azure 事件中心存储容器配置【英文标题】:Azure Event Hub Storage Container Configuration for Azure Function with Input Binding 【发布时间】:2020-07-19 02:03:18 【问题描述】:在处理带有输入绑定 Azure 函数的事件中心事件时,是否可以更改为事件中心分区检查点配置的存储帐户?
是否可以单独使用高级存储帐户(即与设置期间为 Azure 功能选择的帐户不同的存储帐户)执行此操作?
EventProcessorHost 似乎可以做到这一点,但函数似乎没有公开 EventProcessorHost 配置。
【问题讨论】:
【参考方案1】:我为其他一些提议做了类似的修改。
首先,停止你的 Azure 函数
然后,创建新的高级存储帐户并复制现有的 blob 和容器(所有 eventthub 检查点文件具有相同的文件夹结构)。
然后将 Azure 函数存储帐户连接字符串更改为新的高级存储帐户。
然后启动你的 Azure 函数。
【讨论】:
我无法让它最终发挥作用。我无法将源 blob 容器的内容复制到高级 blob 容器中 - 既不是通过 azcopy 也不是通过存储资源管理器。如果我只更新天蓝色函数存储帐户连接字符串,该函数将永远不会再次启动,因为它抱怨不兼容。这就是我希望将事件中心客户端存储帐户与 azure 函数存储帐户分开配置的原因。 我在容器下手动创建了文件夹并复制了blob以上是关于Azure 事件中心Spring Cloud Stream Event Hubs Binder 发送Event Hub消息遇见 Spec. Rule 1.3的主要内容,如果未能解决你的问题,请参考以下文章