RxJava 观察调用/订阅线程

Posted

技术标签:

【中文标题】RxJava 观察调用/订阅线程【英文标题】:RxJava Observing on calling/subscribing thread 【发布时间】:2015-06-04 00:44:47 【问题描述】:

我很难理解 subscribeOn/observeOn 在 RxJava 中是如何工作的。我用 observable 创建了一个简单的应用程序,它发出太阳系行星名称,进行一些映射和过滤并打印结果。

据我了解,将工作调度到后台线程是通过 subscribeOn 运算符完成的(而且它似乎工作正常)。

在后台线程上观察也适用于observeOn 运算符。

但是我很难理解,如何观察调用线程(无论是主线程还是其他线程)。在 android 上使用AndroidSchedulers.mainThread() 运算符很容易完成,但我不知道如何在纯 java 中实现。

这是我的代码:

public class Main 

    public static void main(String[] args) throws InterruptedException 

        ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        System.out.println("Main thread: " + getCurrentThreadInfo());

        Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton"))
                .map(in -> 
                    System.out.println("map on: " + getCurrentThreadInfo());
                    return in.toUpperCase();
                )
                .filter(in -> 
                    System.out.println("filter on: " + getCurrentThreadInfo());
                    return in.contains("A");
                )
                .subscribeOn(Schedulers.from(executor));

        for (int i = 0; i < 5; i++) 
            Thread thread = new Thread("Thread-" + i) 
                @Override
                public void run() 
                    stringObservable
                            .buffer(5)
                            .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo()));
                
            ;
            thread.start();
        

    

    private static String getCurrentThreadInfo() 
        return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")";
    

在创建和工作中​​可观察到从执行程序的三个线程之一订阅。这按预期工作。但是如何在 for 循环中观察那些动态创建的线程的结果呢?有没有办法从当前线程创建调度程序?

另外,我发现运行这段代码后,它永远不会终止,我不知道为什么? :(

【问题讨论】:

Android 中的主线程不一定是调用线程。这是一个重要的区别。它更相当于 Java Swing 中的 EDT。 当然它并不总是调用线程,但通常它被用作应该传递结果的线程。我应该更精确。 【参考方案1】:

要回答你的问题,让我从头开始,这样可以让其他人了解你已经知道的。

调度器

Scheduler 的作用与 Java 的 Executors 相同。简而言之 - 他们决定执行哪些线程操作。

通常是 Observable 和操作符在当前线程中执行。有时您可以将 Scheduler 作为参数传递给 Observable 或运算符(例如 Observable.timer())。

另外 RxJava 提供了 2 个操作符来指定调度器:

subscribeOn - 指定 Observable 将在其上运行的调度程序 observeOn - 指定观察者观察这个 Observable 的调度器

为了快速理解它们,我使用了示例代码:

在所有示例中,我将使用 helper createObservable,它会发出 Observable 在其上运行的线程的名称:

 public static Observable<String> createObservable()
        return Observable.create((Subscriber<? super String> subscriber) -> 
                subscriber.onNext(Thread.currentThread().getName());
                subscriber.onCompleted();
            
        );
    

没有调度器:

createObservable().subscribe(message -> 
        System.out.println("Case 1 Observable thread " + message);
        System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
    );
    //will print:
    //Case 1 Observable thread main
    //Case 1 Observer thread main

订阅:

createObservable()
            .subscribeOn(Schedulers.newThread())
            .subscribe(message -> 
                System.out.println("Case 2 Observable thread " + message);
                System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
            );
            //will print:
            //Case 2 Observable thread RxNewThreadScheduler-1
            //Case 2 Observer thread RxNewThreadScheduler-1

订阅和观察:

reateObservable()
            .subscribeOn(Schedulers.newThread())
            .observeOn(Schedulers.newThread())
            .subscribe(message -> 
                System.out.println("Case 3 Observable thread " + message);
                System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
            );
            //will print:
            //Case 3 Observable thread RxNewThreadScheduler-2
            //Case 3 Observer thread RxNewThreadScheduler-1

观察:

createObservable()
            .observeOn(Schedulers.newThread())
            .subscribe(message -> 
                System.out.println("Case 4 Observable thread " + message);
                System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
            );
            //will print:
            //Case 4 Observable thread main
            //Case 4 Observer thread RxNewThreadScheduler-1

答案:

AndroidSchedulers.mainThread() 返回一个调度器,它将工作委托给与主线程关联的 MessageQueue。 为此,它使用 android.os.Looper.getMainLooper() 和 android.os.Handler。

换句话说,如果你想指定特定的线程,你必须提供在线程上调度和执行任务的方法。

在它下面可以使用任何类型的 MQ 来存储任务和循环队列并执行任务的逻辑。

在 java 中,我们有为此类任务指定的 Executor。 RxJava 可以从这样的 Executor 轻松创建 Scheduler。

下面的例子展示了如何在主线程上观察(不是特别有用,但显示了所有必需的部分)。

public class RunCurrentThread implements Executor 

    private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException 
        RunCurrentThread sample = new RunCurrentThread();
        sample.observerOnMain();
        sample.runLoop();
    

    private void observerOnMain() 
        createObservable()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.from(this))
                .subscribe(message -> 
                    System.out.println("Observable thread " + message);
                    System.out.println("Observer thread " + Thread.currentThread().getName());
                );
        ;
    

    public Observable<String> createObservable() 
        return Observable.create((Subscriber<? super String> subscriber) -> 
                    subscriber.onNext(Thread.currentThread().getName());
                    subscriber.onCompleted();
                
        );
    

    private void runLoop() throws InterruptedException 
        while(!Thread.interrupted())
            tasks.take().run();
        
    

    @Override
    public void execute(Runnable command) 
        tasks.add(command);
    

最后一个问题,为什么你的代码没有终止:

ThreadPoolExecutor 默认使用非守护线程,因此您的程序在它们存在之前不会结束。 您应该使用shutdown 方法来关闭线程。

【讨论】:

在“SubscribeOn 和 ObserveOn:”示例中,您可能无法获得结果,可能是因为在 'subscribeOn' 中使用了 newThread。有什么方便的方法来屏蔽它吗?使用“toBlocking”方法不允许使用“订阅”方法。 可以通过多种方式解决问题,最简单的就是使用Thread.sleep(1000) 你的例子不是倒过来的吗?订阅者收到的“消息”应该是 Observable 线程的名称,而“Thread.currentThread().getName()”是 Observer 线程的名称。 我同意罗德里戈的观点。否则我无法理解这些消息。尽管如此,这是一个很好的简单示例,说明 subscribeOn 和 observeOn 是如何工作的。 作为给那些阅读@RodrigoQuesada 评论的人的信息,这个小错误现在已经得到纠正。【参考方案2】:

这是一个针对 RxJava 2 更新的简化示例。它与 Marek 的回答的概念相同:将可运行对象添加到调用者线程上正在使用的 BlockingQueue 的 Executor。

public class ThreadTest 

    @Test
    public void test() throws InterruptedException 

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();

        System.out.println("Caller thread: " + Thread.currentThread().getName());

        Observable.fromCallable(new Callable<Integer>() 
            @Override
            public Integer call() throws Exception 
                System.out.println("Observable thread: " + Thread.currentThread().getName());
                return 1;
            
        )
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.from(new Executor() 
                @Override
                public void execute(@NonNull Runnable runnable) 
                    tasks.add(runnable);
                
            ))
            .subscribe(new Consumer<Integer>() 
                @Override
                public void accept(@NonNull Integer integer) throws Exception 
                    System.out.println("Observer thread: " + Thread.currentThread().getName());
                
            );
        tasks.take().run();
    



// Output: 
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main

【讨论】:

嘿,在你的回答中,.observeOn(Schedulers.from(new Executor() ... 会在主线程上执行吗?比如subscribeOn跑后台,然后observeOn又跑在主线程上?我来自 RxJava 2,我可以简单地输入 AndroidSchedulers.mainThread(),我正在寻找 RxJava 1 中的等价物。 @Tom 来自调用线程。如果那是主线程,那么在主线程上。但是这个例子看起来并不打算在主线程上运行,因为由于 BlockingQueue,整个执行过程都会阻塞调用线程。 这个例子只调用了一次tasks.take(),所以在它处理完一个事件之后就完成了。这与大多数生产 BlockingQueue 代码不同,在这些代码中,您将有某种无限循环来继续处理项目。

以上是关于RxJava 观察调用/订阅线程的主要内容,如果未能解决你的问题,请参考以下文章

从正确的线程调用 RxJava2 可取消/可处置

RxJava入门

浅谈RxJava源码解析(观察者),创建(createfromjust),变换(MapflatMap)线程调度

为啥 RxJava 的“订阅”方法会被多次调用?

Android异步框架RxJava 1.x系列 - 线程调度器Scheduler

RxJava Observable.create 包装可观察订阅