如何在线程池队列上观察我的快速源
Posted
技术标签:
【中文标题】如何在线程池队列上观察我的快速源【英文标题】:How can observe my fast source on thread-pool queue 【发布时间】:2016-11-24 02:45:08 【问题描述】:需要帮助在主线程上进行可观察的启动,然后转到线程池以允许源继续发出新项目(无论它们是否仍在线程池中处理)。
这是我的例子:
public static void main(String[] args)
Observable<Integer> source = Observable.range(1,10);
source.map(i -> sleep(i, 10))
.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(i -> sleep(i * 10, 300))
.subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));
sleep(-1, 30000);
private static int sleep(int i, int time)
try
Thread.sleep(time);
catch (InterruptedException e)
e.printStackTrace();
return i;
总是打印:
Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationScheduler-1
Emitting 4 on thread main
Emitting 5 on thread main
Emitting 6 on thread main
Received 20 on thread RxComputationScheduler-1
Emitting 7 on thread main
Emitting 8 on thread main
Emitting 9 on thread main
Received 30 on thread RxComputationScheduler-1
Emitting 10 on thread main
Received 40 on thread RxComputationScheduler-1
Received 50 on thread RxComputationScheduler-1
Received 60 on thread RxComputationScheduler-1
Received 70 on thread RxComputationScheduler-1
Received 80 on thread RxComputationScheduler-1
Received 90 on thread RxComputationScheduler-1
Received 100 on thread RxComputationScheduler-1
虽然项目按预期在主线程上发出,但我希望它们之后继续移动到计算/IO 线程池。
应该是这样的:
【问题讨论】:
我认为在这个例子中一切都发生得如此之快,它似乎在observeOn()
之前被阻止,即使它不是。让我看看我是否可以夸大睡眠时间以证明它是有效的。
【参考方案1】:
我认为您没有充分减缓源排放,而且它们的排放速度如此之快,以至于在observeOn()
有机会安排它们之前所有项目都已排放。
尝试睡眠到 500 毫秒而不是 10 毫秒。然后你会看到你所期望的交错。
public class JavaLauncher
public static void main(String[] args)
Observable<Integer> source = Observable.range(1,10);
source.map(i -> sleep(i, 500))
.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
.observeOn(Schedulers.computation())
.map(i -> sleep(i * 10, 250))
.subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));
sleep(-1, 30000);
private static int sleep(int i, int time)
try
Thread.sleep(time);
catch (InterruptedException e)
e.printStackTrace();
return i;
输出
Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationThreadPool-3
Emitting 4 on thread main
Received 20 on thread RxComputationThreadPool-3
Emitting 5 on thread main
Emitting 6 on thread main
Received 30 on thread RxComputationThreadPool-3
Emitting 7 on thread main
Emitting 8 on thread main
Received 40 on thread RxComputationThreadPool-3
Emitting 9 on thread main
Emitting 10 on thread main
Received 50 on thread RxComputationThreadPool-3
Received 60 on thread RxComputationThreadPool-3
Received 70 on thread RxComputationThreadPool-3
Received 80 on thread RxComputationThreadPool-3
Received 90 on thread RxComputationThreadPool-3
Received 100 on thread RxComputationThreadPool-3
更新 - 并行版本
public class JavaLauncher
public static void main(String[] args)
Observable<Integer> source = Observable.range(1,10);
source.map(i -> sleep(i, 250))
.doOnNext(i -> System.out.println("Emitting " + i + " on thread " + Thread.currentThread().getName()))
.flatMap(i ->
Observable.just(i)
.subscribeOn(Schedulers.computation())
.map(i2 -> sleep(i2 * 10, 500))
)
.subscribe( i -> System.out.println("Received " + i + " on thread " + Thread.currentThread().getName()));
sleep(-1, 30000);
private static int sleep(int i, int time)
try
Thread.sleep(time);
catch (InterruptedException e)
e.printStackTrace();
return i;
输出
Emitting 1 on thread main
Emitting 2 on thread main
Emitting 3 on thread main
Received 10 on thread RxComputationThreadPool-3
Emitting 4 on thread main
Received 20 on thread RxComputationThreadPool-4
Received 30 on thread RxComputationThreadPool-1
Emitting 5 on thread main
Received 40 on thread RxComputationThreadPool-2
Emitting 6 on thread main
Received 50 on thread RxComputationThreadPool-3
Emitting 7 on thread main
Received 60 on thread RxComputationThreadPool-4
Emitting 8 on thread main
Received 70 on thread RxComputationThreadPool-1
Emitting 9 on thread main
Received 80 on thread RxComputationThreadPool-2
Emitting 10 on thread main
Received 90 on thread RxComputationThreadPool-3
Received 100 on thread RxComputationThreadPool-4
【讨论】:
还要记住,observeOn()
不会处理“线程池”上的项目。它只是该池中的一个线程。如果你想利用池中的所有线程,你需要utilize some parallelization patterns
在输出中,您显示它仍然始终使用池中的 1 个线程。但就像你在评论中提到的那样,你期待着,但我没有。
是的,subscribeOn()
和 observeOn()
只会将发射重定向到另一个 single 线程。如果您想真正并行化并使用调度程序中可用的所有线程,您需要做一些 FlatMap 功夫。我把代码的并行版本放在这里。 gist.github.com/thomasnield/81b783303564376b904d789480405e50
我也会仔细阅读有关并行化的文章,因为很多人假设并且对如何正确执行它感到困惑。 tomstechnicalblog.blogspot.com/2015/11/…
我害怕那个解决方案。通过创建一个新序列(Observable),您可以将其发送到一个新线程,但这样我不确定背压运算符将如何工作(有时源可能会快很多)。以上是关于如何在线程池队列上观察我的快速源的主要内容,如果未能解决你的问题,请参考以下文章