RxJava 手动发出项目

Posted

技术标签:

【中文标题】RxJava 手动发出项目【英文标题】:RxJava emit items manually 【发布时间】:2017-03-04 20:06:07 【问题描述】:

我是 RxJava 世界的新手。我几天前就开始了,我正在尝试做一些非常具体的事情。

我想使用 RxJava 将一些事件从一个类传递给其他类。我的发射器类做了一些工作,完成后我想将该事件通知给每个订阅的类。

另外,我不想丢失任何事件,所以如果发射器完成一个动作并通知订阅者但没有人消费该事件,它应该保留它。

我一直在寻找Subjects,因为我读到了按需排放的好方法,但我找不到解决我的问题的方法,也许我可以按照他们在该链接中所说的方式尝试PublishSubject

请注意,PublishSubject 可能会在创建后立即开始发出项目(除非您已采取措施防止这种情况发生),因此在创建主题和观察者订阅之间存在一个或多个项目可能丢失的风险给它。如果您需要保证从源 Observable 交付所有项目,则需要使用 Create 形成该 Observable 以便您可以手动重新引入“冷” Observable 行为(在开始发射项目之前检查所有观察者是否已订阅),或者改用 ReplaySubject。

目前我有一个Observable,它会向每个新订阅发送完整的事件列表:

protected List<T> commands = new ArrayList<>();
Observable<T> observe = Observable.defer(() -> Observable.fromIterable(commands));

Disposable subscribe(Consumer<T> onNext, Consumer<Throwable> onError) 
    return observe.subscribe(onNext/*, onError*/);

但这与我想要的相差甚远。正如我所说的,我对 Rx 很陌生,如果我找不到方法,我最终会使用 ReplaySubject(每次订阅都会重复完整的命令列表,但至少我永远不会丢失一个事件)。

编辑:

最后我把我的问题分成两部分。对于手动发射项目,我使用PublishSubject

PublishSubject<T> actionSubject = PublishSubject.create();
actionSubject.subscribe(onceAction);
actionSubject.onNext(action);

【问题讨论】:

【参考方案1】:

您可能正在寻找一个BehaviorSubject,它会重播为每个新订阅者发出的最后一个值。

无论如何,您可以使用名为createWithSize(capacity) 的工厂方法创建ReplaySubject 来定义大小有界的缓冲区,该缓冲区在溢出时丢弃最旧的项目。还有一种方法可以创建具有类似行为的限时缓冲区。

【讨论】:

I would like to don't loose any event, so if emitter finish an action and notifies to subscribers but no one consumes that event it should keep it.【参考方案2】:

这不是你要找的吗?

    Observable<Integer> observable = Observable.create((ObservableEmitter<Integer> e) -> 
        for (int i = 0; i < 10; i++) 
            e.onNext(i);
        
        e.onComplete();
    );

    System.out.println("FIRST");
    observable
            .doOnComplete(() -> System.out.println("FIRST COMPLETE"))
            .subscribe(System.out::println);
    System.out.println("SECOND");
    observable
            .doOnComplete(() -> System.out.println("SECOND COMPLETE"))
            .subscribe(System.out::println);

输出:

FIRST
0
1
2
3
4
5
6
7
8
9
FIRST COMPLETE
SECOND
0
1
2
3
4
5
6
7
8
9
SECOND COMPLETE

【讨论】:

不是真的,因为换一种说法,我想要一个“活跃的”订阅。观察者将在监听,并且 observable 必须能够按需与订阅者进行通信,例如 Subjects。 我不想在create 上使用它,我在不可预知的时刻需要它。此外,不鼓励使用create 您在问如何解决特定问题,但没有提供有关该问题的足够信息。每个人都试图猜测你真正需要什么。 我尽量解释得最好,可能是我需要提高我的英语。我将寻找其他方式来解释一个例子,并尽快编辑我的帖子。

以上是关于RxJava 手动发出项目的主要内容,如果未能解决你的问题,请参考以下文章

Android 常用开源框架源码解析 系列 Rxjava 异步框架

防止快速单击按钮并使用 rxjava 发出请求

RxJava 的基本使用

Rxjava - 如何获取当前和上一个项目?

RxJava在项目中的特殊使用场景

android学习笔记--RxJava