对有延迟的 Rxjava 可观察对象进行单元测试

Posted

技术标签:

【中文标题】对有延迟的 Rxjava 可观察对象进行单元测试【英文标题】:Unit testing Rxjava observables that have a delay 【发布时间】:2018-04-28 01:31:12 【问题描述】:

我希望能够对具有延迟发射但实际上不等待延迟时间的Observable 进行单元测试。有没有办法做到这一点?

我目前正在使用 CountDownHatch 来延迟断言,效果很好,但会增加测试运行时间。

例子:

val myObservable: PublishSubject<Boolean> = PublishSubject.create<Boolean>()
fun myObservable(): Observable<Boolean> = myObservable.delay(3, TimeUnit.SECONDS)

@Test
fun testMyObservable() 
    val testObservable = myObservable().test()
    myObservable.onNext(true)

    // val lock = CountDownLatch(1)
    // lock.await(3100, TimeUnit.MILLISECONDS)
    testObservable.assertValue(true)

【问题讨论】:

【参考方案1】:

感谢@aaron-he 的回答,我能够想出一个完整的解决方案。

它使用TestScheduler 的组合来提前时间,还使用RxJavaPlugins 使用testScheduler 覆盖默认调度程序。这允许在不修改或需要传入Scheduler 的情况下测试myObservable() 函数。

@Before
fun setUp() = RxJavaPlugins.reset()

@After
fun tearDown() = RxJavaPlugins.reset()

@Test
fun testMyObservable() 
    val testScheduler = TestScheduler()
    RxJavaPlugins.setComputationSchedulerHandler  testScheduler 

    val testObservable = myObservable().test()
    myObservable.onNext(true)

    testScheduler.advanceTimeBy(2, TimeUnit.SECONDS)
    testObservable.assertEmpty()
    testScheduler.advanceTimeBy(2, TimeUnit.SECONDS)
    testObservable.assertValue(true)

【讨论】:

【参考方案2】:

TestScheduler 非常适合这个。它有一个方便的方法advanceTimeBy(long, TimeUnit) 可以让你控制时间。而Observable.delay 有一个overload method 有一个Scheduler

所以只需在myObservable函数中使用默认的Scheduler.computation(),并使用TestScheduler进行单元测试。

【讨论】:

刚刚用谷歌搜索了这个问题,然后就到了这里。可能应该只是在 Slack 中问过它。大声笑【参考方案3】:

发现PublishSubject有一些奇怪的行为,订阅需要一些时间,例如测试失败:

private val scheduler = TestScheduler()
    @Test
    fun publishSubjectFailedTest() 
        val callback: DelayCallback = mock()

        val myPublishSubject: PublishSubject<Boolean> = PublishSubject.create()
        myPublishSubject
            .delay(10, TimeUnit.SECONDS, scheduler)
            .subscribeOn(scheduler)
            .observeOn(scheduler)
            .subscribe(
                Consumer<Boolean> 
                    callback.onCalldown()
                ,
                Consumer<Throwable> 

                ,
                Action 

                
            )
        myPublishSubject.onNext(true)
        scheduler.advanceTimeBy(20, TimeUnit.SECONDS)
        verify(callback, times(1)).onCalldown()
    

但是如果我们在调用onNext之前添加一个scheduler的时间,例如scheduler.advanceTimeBy(1, TimeUnit.NANOSECONDS) 那么测试就成功了:

 @Test
    fun publishSubjectSuccessTest() 
        val callback: DelayCallback = mock()

        val myPublishSubject: PublishSubject<Boolean> = PublishSubject.create()
        myPublishSubject
            .delay(10, TimeUnit.SECONDS, scheduler)
            .subscribeOn(scheduler)
            .observeOn(scheduler)
            .subscribe(
                Consumer<Boolean> 
                    callback.onCalldown()
                ,
                Consumer<Throwable> 

                ,
                Action 

                
            )
        scheduler.advanceTimeBy(1, TimeUnit.NANOSECONDS)//added time of scheduler
        myPublishSubject.onNext(true)
        scheduler.advanceTimeBy(20, TimeUnit.SECONDS)
        verify(callback, times(1)).onCalldown()
    

【讨论】:

以上是关于对有延迟的 Rxjava 可观察对象进行单元测试的主要内容,如果未能解决你的问题,请参考以下文章

RxJava:链接可观察对象

如何延迟创建可观察对象

测试可观察对象 Angular 2 业力

RxJava Observable.create 包装可观察订阅

如何对这个从管道可观察到的错误捕获的 Angular 打字稿 Http 错误拦截器进行单元测试?

使用 Completable.timer 对 RxJava 进行单元测试