Observable 发射器对 HystrixCommands 产生了太大的压力
Posted
技术标签:
【中文标题】Observable 发射器对 HystrixCommands 产生了太大的压力【英文标题】:Observable emitter generates too much pressure for HystrixCommands 【发布时间】:2018-07-27 05:38:29 【问题描述】:我有一个发出文件行的 Observable(从 GCS 读取的许多 GB)。
return StringObservable.byLine(
Observable.using(
() -> storage.get(blobId).reader(),
reader -> Observable.create(
new OnSubscribeReadChannel(reader, 64 * 1024)
),
ReadChannel::close
)
)
每一行都会导致多次(在某些情况下多次)调用各种数据库,所有这些调用都包含在 Hystrix 命令中。显然,这些线路最终压倒了 Hystrix 命令,电路开始打开,每个人都度过了糟糕的一天。
这大概就是我正在做的事情:
readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
.map(this::deserializeLine)
.flatMap(this::addDataToObjectFromSomeDb)
.flatMap(this::writeObj)
.map(Set::size)
.reduce(0, (a, b) -> a + b)
.toBlocking().single()
有没有办法可以施加一些背压,或者限制一次处理的行数或其他什么?
【问题讨论】:
也许maxConcurrent
有帮助,在this answer 中描述。
这很有趣。使用concatMap
而不是flatMap
会有效地序列化写入吗?
从输出的角度来看,concatMap
和 flatMap(maxConcurrency=1)
看起来是一样的。从输入的角度来看,concatMap
会预先获取至少 1 个项目,即使有 1 个内部源正在处理,而 flatMap(1)
仅在最后一个完成时才要求下一个内部源。
你使用的是 RxJava 1.x 还是 2.x?
RxJava 1.x - Hystrix 被锁定了。
【参考方案1】:
你需要使用Emitter.BackpressureMode.BUFFER
BUFFER
Buffers (unbounded) all onNext calls until the downstream can consume them.
http://reactivex.io/RxJava/1.x/javadoc/index.html?rx/Emitter.BackpressureMode.html
【讨论】:
这个问题是缓冲区可能会变得很多 GB,在某些时候必须限制阅读器。以上是关于Observable 发射器对 HystrixCommands 产生了太大的压力的主要内容,如果未能解决你的问题,请参考以下文章
如何让一个 Observable 序列在发射前等待另一个完成?