Observable 发射器对 HystrixCommands 产生了太大的压力

Posted

技术标签:

【中文标题】Observable 发射器对 HystrixCommands 产生了太大的压力【英文标题】:Observable emitter generates too much pressure for HystrixCommands 【发布时间】:2018-07-27 05:38:29 【问题描述】:

我有一个发出文件行的 Observable(从 GCS 读取的许多 GB)。

return StringObservable.byLine(
    Observable.using(
        () -> storage.get(blobId).reader(),
            reader -> Observable.create(
                    new OnSubscribeReadChannel(reader, 64 * 1024)
                ),
            ReadChannel::close
    )
)

每一行都会导致多次(在某些情况下多次)调用各种数据库,所有这些调用都包含在 Hystrix 命令中。显然,这些线路最终压倒了 Hystrix 命令,电路开始打开,每个人都度过了糟糕的一天。

这大概就是我正在做的事情:

readLinesFromCloudStorageFile.readLines(blobInfo.getBlobId()))
            .map(this::deserializeLine)
            .flatMap(this::addDataToObjectFromSomeDb)
            .flatMap(this::writeObj)
            .map(Set::size)
            .reduce(0, (a, b) -> a + b)
            .toBlocking().single()

有没有办法可以施加一些背压,或者限制一次处理的行数或其他什么?

【问题讨论】:

也许maxConcurrent 有帮助,在this answer 中描述。 这很有趣。使用concatMap 而不是flatMap 会有效地序列化写入吗? 从输出的角度来看,concatMapflatMap(maxConcurrency=1) 看起来是一样的。从输入的角度来看,concatMap 会预先获取至少 1 个项目,即使有 1 个内部源正在处理,而 flatMap(1) 仅在最后一个完成时才要求下一个内部源。 你使用的是 RxJava 1.x 还是 2.x? RxJava 1.x - Hystrix 被锁定了。 【参考方案1】:

你需要使用Emitter.BackpressureMode.BUFFER

BUFFER
Buffers (unbounded) all onNext calls until the downstream can consume them.

http://reactivex.io/RxJava/1.x/javadoc/index.html?rx/Emitter.BackpressureMode.html

【讨论】:

这个问题是缓冲区可能会变得很多 GB,在某些时候必须限制阅读器。

以上是关于Observable 发射器对 HystrixCommands 产生了太大的压力的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.0中flatMap操作符用法和源码分析

如何让一个 Observable 序列在发射前等待另一个完成?

RxJava之错误处理

RxJava 错误处理

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

Android RxJava使用介绍 RxJava的操作符