如何在线程池队列上观察我的快速源

Posted

技术标签:

【中文标题】如何在线程池队列上观察我的快速源【英文标题】:How can observe my fast source on thread-pool queue 【发布时间】:2016-11-24 02:45:08 【问题描述】:

需要帮助在主线程上进行可观察的启动,然后转到线程池以允许源继续发出新项目(无论它们是否仍在线程池中处理)。

这是我的例子:

public static void main(String[] args) 
    Observable<Integer> source = Observable.range(1,10);

    source.map(i -> sleep(i, 10))
            .doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
            .observeOn(Schedulers.computation())
            .map(i -> sleep(i * 10, 300))
            .subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));

    sleep(-1, 30000);


private static int sleep(int i, int time) 
    try 
        Thread.sleep(time);
     catch (InterruptedException e) 
        e.printStackTrace();
    
    return i;

总是打印:

Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationScheduler-1
Emitting 4 on thread main
Emitting 5 on thread main
Emitting 6 on thread main
Received 20 on thread RxComputationScheduler-1
Emitting 7 on thread main
Emitting 8 on thread main
Emitting 9 on thread main
Received 30 on thread RxComputationScheduler-1
Emitting 10 on thread main
Received 40 on thread RxComputationScheduler-1
Received 50 on thread RxComputationScheduler-1
Received 60 on thread RxComputationScheduler-1
Received 70 on thread RxComputationScheduler-1
Received 80 on thread RxComputationScheduler-1
Received 90 on thread RxComputationScheduler-1
Received 100 on thread RxComputationScheduler-1

虽然项目按预期在主线程上发出,但我希望它们之后继续移动到计算/IO 线程池。

应该是这样的:

【问题讨论】:

我认为在这个例子中一切都发生得如此之快,它似乎在 observeOn() 之前被阻止,即使它不是。让我看看我是否可以夸大睡眠时间以证明它是有效的。 【参考方案1】:

我认为您没有充分减缓源排放,而且它们的排放速度如此之快,以至于在observeOn() 有机会安排它们之前所有项目都已排放。

尝试睡眠到 500 毫秒而不是 10 毫秒。然后你会看到你所期望的交错。

public class JavaLauncher 
    public static void main(String[] args) 
        Observable<Integer> source = Observable.range(1,10);

        source.map(i -> sleep(i, 500))
                .doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
                .observeOn(Schedulers.computation())
                .map(i -> sleep(i * 10, 250))
                .subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));

        sleep(-1, 30000);
    

    private static int sleep(int i, int time) 
        try 
            Thread.sleep(time);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        return i;
    

输出

Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationThreadPool-3
Emitting 4 on thread main
Received 20 on thread RxComputationThreadPool-3
Emitting 5 on thread main
Emitting 6 on thread main
Received 30 on thread RxComputationThreadPool-3
Emitting 7 on thread main
Emitting 8 on thread main
Received 40 on thread RxComputationThreadPool-3
Emitting 9 on thread main
Emitting 10 on thread main
Received 50 on thread RxComputationThreadPool-3
Received 60 on thread RxComputationThreadPool-3
Received 70 on thread RxComputationThreadPool-3
Received 80 on thread RxComputationThreadPool-3
Received 90 on thread RxComputationThreadPool-3
Received 100 on thread RxComputationThreadPool-3

更新 - 并行版本

public class JavaLauncher 
    public static void main(String[] args) 
        Observable<Integer> source = Observable.range(1,10);

        source.map(i -> sleep(i, 250))
                .doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
                .flatMap(i -> 
                    Observable.just(i)
                        .subscribeOn(Schedulers.computation())
                        .map(i2 -> sleep(i2 * 10, 500))
                )
                .subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));

        sleep(-1, 30000);
    

    private static int sleep(int i, int time) 
        try 
            Thread.sleep(time);
         catch (InterruptedException e) 
            e.printStackTrace();
        
        return i;
    

输出

Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationThreadPool-3
Emitting 4 on thread main
Received 20 on thread RxComputationThreadPool-4
Received 30 on thread RxComputationThreadPool-1
Emitting 5 on thread main
Received 40 on thread RxComputationThreadPool-2
Emitting 6 on thread main
Received 50 on thread RxComputationThreadPool-3
Emitting 7 on thread main
Received 60 on thread RxComputationThreadPool-4
Emitting 8 on thread main
Received 70 on thread RxComputationThreadPool-1
Emitting 9 on thread main
Received 80 on thread RxComputationThreadPool-2
Emitting 10 on thread main
Received 90 on thread RxComputationThreadPool-3
Received 100 on thread RxComputationThreadPool-4

【讨论】:

还要记住,observeOn() 不会处理“线程池”上的项目。它只是该池中的一个线程。如果你想利用池中的所有线程,你需要utilize some parallelization patterns 在输出中,您显示它仍然始终使用池中的 1 个线程。但就像你在评论中提到的那样,你期待着,但我没有。 是的,subscribeOn()observeOn() 只会将发射重定向到另一个 single 线程。如果您想真正并行化并使用调度程序中可用的所有线程,您需要做一些 FlatMap 功夫。我把代码的并行版本放在这里。 gist.github.com/thomasnield/81b783303564376b904d789480405e50 我也会仔细阅读有关并行化的文章,因为很多人假设并且对如何正确执行它感到困惑。 tomstechnicalblog.blogspot.com/2015/11/… 我害怕那个解决方案。通过创建一个新序列(Observable),您可以将其发送到一个新线程,但这样我不确定背压运算符将如何工作(有时源可能会快很多)。

以上是关于如何在线程池队列上观察我的快速源的主要内容,如果未能解决你的问题,请参考以下文章

Java线程池快速学习教程

一时技痒,撸了个动态线程池,源码放Github了

一时技痒,撸了个动态线程池,源码放Github了

Java线程池

使用线程池的 JMS 侦听器

阻塞队列和线程池原理