rxjava-几类变换2

Posted 左手指月的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rxjava-几类变换2相关的知识,希望对你有一定的参考价值。

1.2Rxjava之变换操作符

1.1 RxJava系列之组合操作符

=========

1.2Rxjava之变换操作符

 

Rxjava常见的变化操作符如下:

 

1.map()变换符

//转换操作符之map()
//通过map()操作符对被观察者发送的每一个事件都通过指定的Function对象的apply()方法进行转换处理
//将之前的事件类型转换成为另外的一种事件类型
//即通过map()操作符将我们的被观察者发送的事件转换成为任意的类型的事件
//应用场景:数据类型的转换
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
//通过apply将事件的类型有integer转换成为了String
return String.valueOf("通过map()变换操作符将int类型的数据转换成为了String字符串类型的数据 : " + integer.intValue());
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: thread = " + Thread.currentThread().getName());
}

@Override
public void onNext(String s) {
//最终在onNext中调用得到的是String类型的事件
Log.d(TAG, "onNext: " + s + " thread = " + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: thread = " + Thread.currentThread().getName());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: thread = " + Thread.currentThread().getName());
}
});
打印结果:

 

2.flatMap()变换操作符

//转换操作符之flatMap()
//flatMap()操作符的作用是将被观察者发送的事件序列进行拆分 & 单独转换 在合并成为一个新的事件序列最后在进行发送
//原理:将被观察者发送的事件序列进行拆分成一个个事件 在将每个事件都生成创建一个新的Observable对象
//每个原始事件都会生成一个新的Observable对象
//每个拆分的新的事件生成的新的Observable对象最终会汇总到一个新建总的Observable对象中
//新建的总的Observable对象最终将新合并的事件序列发送给观察者Observer
//应用场景:无序的将整个被观察者发送的事件序列进行变换

//注意:新生成的事件序列的顺序是无序的 和旧的事件序列的顺序无关
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
ArrayList<String> transforDatas = new ArrayList<>();

for (int j = 0; j < 3; j++) {
Log.d(TAG, "apply: 被观察者的原始事件 " + integer + " 分解成的子事件 " + j + " thread : " + Thread.currentThread().getName());
}

return Observable.fromIterable(transforDatas);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: thread = " + Thread.currentThread().getName());
}

@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s + " thread = " + Thread.currentThread().getName());
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: thread = " + Thread.currentThread().getName());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: thread = " + Thread.currentThread().getName());
}
});
打印结果:    

 

3.concatMap()变换操作符

//转换操作符之concatMap()
final String[] courseName = {"语文","数学","英语"};
ArrayList<Student> students = new ArrayList<>();
for (int j = 0; j < 5; j++) {
List<Course> courses = new ArrayList<>();
for (int k = 0; k < 3; k++) {
courses.add(new Course(courseName[k] + k, (int) (Math.random() * 31 + 70)));
}
Student childStudent = new Student("何乐" + j, 22 + j, courses);
students.add(childStudent);
}

Observable.fromIterable(students).subscribe(new Consumer<Student>() {
@Override
public void accept(@NonNull Student student) throws Exception {
List<Course> courses = student.getCourses();
for (int j = 0; j < courses.size(); j++) {
Log.d(TAG, "accept: 学生姓名: " + student.getName() + " 学生年龄 : " + student.getAge() + " 学生学习课程名: " + courses.get(j).getCourseName() + " 学生该门成绩: " + courses.get(j).getScore());
}
}
});

Log.d(TAG, "------------------------------------------------------------------------------------------------");

//使用flatMap() / concatMap()直接获取Course对象 不用进行频繁的转换
//将被观察者发送的原始事件进行拆分 生成一个新的事件序列
//concatMap()和flatMap()之间的不同点
//concatMap() 拆分 & 重新合并生成的事件序列的顺序和 旧的的事件序列的顺序是一致的
//即新生成的事件序列的顺序和严格按照旧的事件序列的顺序来发射
Observable.fromIterable(students).concatMap(new Function<Student, ObservableSource<Course>>() {
@Override
public ObservableSource<Course> apply(@NonNull Student student) throws Exception {
return Observable.fromIterable(student.getCourses());
}
}).subscribe(new Consumer<Course>() {
@Override
public void accept(@NonNull Course course) throws Exception {
Log.d(TAG, "accept: 课程名 " + course.getCourseName() + " 课程成绩 " + course.getScore());
}
});
打印结果:

 

4. switchMap()转换操作符

//转换操作符之switchMap()
//switchMap()操作符和flatMap()操作符的功能很像,除了一点
//对于switchMap()每当源Observable发射一个新的数据项,它就取消订阅并停止监视之前的那个数据项
//变换产生的Observable,并开始监视当前最新发射的这个数据项

//使用测试调度器
final TestScheduler testScheduler = new TestScheduler();


Observable.just(10,20,30)
.switchMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(@NonNull Integer integer) throws Exception {
int delay = 0;
// if(integer == 10){
// delay = 100;
// }
//
// if(integer == 20){
// delay = 50;
// }
//
// if(integer == 30){
// delay = 30;
// }

//生成一个随机产生的延迟值,来延迟变换三个上游发射的三个数据项
delay = new Random().nextInt(101);

return Observable.fromArray(new Integer[]{integer,integer / 2}).delay(delay,TimeUnit.MILLISECONDS,testScheduler);
}
}).observeOn(androidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer + " " + Thread.currentThread().getName());
}
});

//使用测试调度器直接将时间向前推进到1分钟的时间
testScheduler.advanceTimeBy(1,TimeUnit.MINUTES);
打印项:

 

 以上SwitchMap代码不管运行多少次打印的结果始终是30 15 不管10 20 30 这三个数据项在switchMap()中变换时生成时间的先后 说明对于switchMap()只要上游源Observable发射了新的数据就会停止订阅和监视之前的旧的数据项的变换产生的Observable,同时会直接订阅监视最新的这个数据项

5. flatMapIterable()转换操作符

//变换操作符之flatMapIterable()
//flatMapIterable()和flatMap()几乎是一样的,不同的是flatMapIterable()它转化的多个Observable是使用Iterable作为源数据的。
ArrayList<Integer> list1 = new ArrayList<>();
ArrayList<Integer> list2 = new ArrayList<>();
ArrayList<Integer> list3 = new ArrayList<>();

for(int i = 0;i < 10;i++){
list1.add(i);
list2.add(i);
list3.add(i);
}

Observable.just(list1,list2,list3)
.flatMapIterable(new Function<ArrayList<Integer>, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(@NonNull ArrayList<Integer> integers) throws Exception {
return integers;
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
    打印结果:

            

concatMapEager()变换操作符:

//变换操作符之concatMapEager()
//和flatMap()不同concatMap() concatMapEager()操作符能够保证最终变换生成的数据是有序的
Observable.just(1,2,3,4,5,6)
.concatMapEager(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
return Observable.just(String.valueOf(integer));
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
6. buffer()转换操作符:

//转换操作符之buffer()缓冲
//buffer()变换操作符可以Observable发射的事件序列进行缓冲转换成一个新的Observable对象
//而且新转换的Observable发射事件时是发射一组列表值而不是一个个发射
//buffer(count) 每次新生成的Observable事件序列中的事件数量为count个
Observable.just(1,2,3,4,5).buffer(3).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
Log.d(TAG, "缓冲区数据");
for (int j = 0; j < integers.size(); j++) {
Log.d(TAG, "accept: " + integers.get(j));
}
}
});

Log.d(TAG, "--------------------------------------");
//buffer(count,skip) 每次新转换的Observable事件序列中的事件数量为count
//并且每次需要跳skip个事件数量的位置重新生成count个事件的新的Observable事件序列
Observable.just(1,2,3,4,5).buffer(3,2).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(@NonNull List<Integer> integers) throws Exception {
Log.d(TAG, "缓冲区数据");
for (int j = 0; j < integers.size(); j++) {
Log.d(TAG, "accept: " + integers.get(j));
}
}
});
打印结果:

  

7. scan()操作符:

//Scan()变换操作符
//Scan操作符对原始Observable发射的第一项数据应用一个函数,
// 然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。
// 它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做accumulator 累加器
Observable.just(1,2,3,4,5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
Log.d(TAG, "integer : " + integer + " integer2 : " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
Log.d(TAG, "-----------------------------------------------------");

//scan操作符的变体,你可以传递一个种子值给累加器函数的第一次调用(Observable发射的第一项数据)
// 如果你使用这个版本,scan将发射种子值作为自己的第一项数据。 下游会比源Observable发射的数据项多接收响应一次数据多的就时初始种子值
Observable.just(1,2,3)
.scan(10, new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
Log.d(TAG, "intgera : " + integer + " integerb : " + integer2);
return integer + integer2;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
打印结果:

       

8. groupBy()操作符:

//变换操作符groupBy()
//GroupBy操作符将原始Observable分拆为一些Observables集合,
// 它们中的每一个发射原始Observable数据序列的一个子序列。
// 哪个数据项由哪一个Observable发射是由一个函数判定的,
// 这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。
//RxJava实现了groupBy操作符。它返回Observable的一个特殊子类GroupedObservable,
//实现了GroupedObservable接口的对象有一个额外的方法getKey,这个Key用于将数据分组到指定的Observable

//注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,
//每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,
//这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。
//你应该使用像take(0)这样会丢弃自己的缓存的操作符。
//如果你取消订阅一个GroupedObservable,那个Observable将会终止
//如果之后原始的Observable又发射了一个与这个Observable的Key匹配的数据,groupBy将会为这个Key创建一个新的GroupedObservable。
//groupBy默认不在任何特定的调度器上执行。
Observable.range(1,10)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return (integer >= 5) ? "大于5" : "小于5";
}
})/*.elementAt(1)*/ //只获取索引为1的那个位置的元素(默认索引从0开始)
.take(1) //只获取事件序列中的前一个事件和element()是不同的意思
.subscribe(new Consumer<GroupedObservable<String, Integer>>() {
@Override
public void accept(@NonNull GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
String key = stringIntegerGroupedObservable.getKey();
Log.d(TAG, "accept: key : " + key + " groupObservable : " + stringIntegerGroupedObservable);

stringIntegerGroupedObservable
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.d(TAG, "accept: " + throwable.getMessage());
}
});

//传递一个变换函数,这样它可以在发射结果GroupedObservable之前改变数据项。
Observable.range(1,10)
.groupBy(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return integer % 2 == 0 ? "偶数" : "奇数";
}
}, new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return String.valueOf(integer + " x");
}
}).subscribe(new Consumer<GroupedObservable<String, String>>() {
@Override
public void accept(@NonNull GroupedObservable<String, String> stringStringGroupedObservable) throws Exception {
Log.d(TAG, "accept: " + stringStringGroupedObservable.getKey());
if(stringStringGroupedObservable.getKey().equals("奇数")){
stringStringGroupedObservable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
}
});
打印结果:

 

9. window操作符:

//变换操作符window()
//window操作符非常类似于buffer操作符,区别在于buffer操作符产生的结果是一个List缓存,
// 而window操作符产生的结果是一个Observable,订阅者可以对这个结果Observable重新进行订阅处理
//https://mcxiaoke.gitbooks.io/rxdocs/operators/Window.html
Observable.range(1,10)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: thread : " + Thread.currentThread().getName());
}

@Override
public void onNext(@NonNull Observable<Integer> integerObservable) {
integerObservable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe: inner thread : " + Thread.currentThread().getName());
}

@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext: inner : " + integer + "thread : " + Thread.currentThread().getName());
}

@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: inner thread : " + Thread.currentThread().getName());
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: inner thread : " + Thread.currentThread().getName());
}
});
}

@Override
public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError: ");
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
}); 

 

1.1 RxJava系列之组合操作符

 这类operators可以同时处理多个Observable来创建我们所需要的Observable。

组合操作符主要包含: Merge,StartWith,Concat,Zip,CombineLatest,SwitchOnNext,Join等等。

  • Merge

merge(Observable, Observable)将两个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。你可以简单的将它理解为两个Obsrvable合并成了一个Observable,合并后的数据是无序的。

技术图片

我们看下面的例子,一共有两个Observable:一个用来发送字母,另一个用来发送数字;现在我们需要两连个Observable发射的数据合并。

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);

Observable.merge(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString()+" ");
            }
        });   

程序输出:

A 0 B C 1 D E 2 F 3 G H 4 

merge(Observable[])将多个Observable发射的事件序列组合并成一个事件序列,就像是一个Observable发射的一样。

技术图片

 

  • StartWith

startWith(T)用于在源Observable发射的数据前插入数据。使用startWith(Iterable<T>)我们还可以在源Observable发射的数据前插入Iterable。官方示意图:

技术图片

startWith(Observable<T>)用于在源Observable发射的数据前插入另一个Observable发射的数据(这些数据会被插入到 源Observable发射数据的前面)。官方示意图:

技术图片

 

  • Concat

concat(Observable<? extends T>, Observable<? extends T>) concat(Observable<? extends Observable<T>>)用于将多个obserbavle发射的的数据进行合并发射,concat严格按照顺序发射数据,前一个Observable没发射玩是不会发射后一个Observable的数据的。它和merge、startWitch和相似,不同之处在于:

  1. merge:合并后发射的数据是无序的;
  2. startWitch:只能在源Observable发射的数据前插入数据。

技术图片

这里我们将前面Merge操作符的例子拿过来,并将操作符换成Concat,然后我们看看执行结果:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(300, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(500, TimeUnit.MILLISECONDS).take(5);

Observable.concat(letterSequence, numberSequence)
        .subscribe(new Observer<Serializable>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(Serializable serializable) {
                System.out.print(serializable.toString() + " ");
            }
        });

程序输出:

A B C D E F G H 0 1 2 3 4 
  • Zip

zip(Observable, Observable, Func2)用来合并两个Observable发射的数据项,根据Func2函数生成一个新的值并发射出去。当其中一个Observable发送数据结束或者出现异常后,另一个Observable也将停在发射数据。

技术图片

和前面的例子一样,我们将操作符换成了zip:

String[] letters = new String[]{"A", "B", "C", "D", "E", "F", "G", "H"};
Observable<String> letterSequence = Observable.interval(120, TimeUnit.MILLISECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return letters[position.intValue()];
            }
        }).take(letters.length);

Observable<Long> numberSequence = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);

Observable.zip(letterSequence, numberSequence, new Func2<String, Long, String>() {
    @Override
    public String call(String letter, Long number) {
        return letter + number;
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error:" + e.getMessage());
    }

    @Override
    public void onNext(String result) {
        System.out.print(result + " ");
    }
});

程序输出:

A0 B1 C2 D3 E4
  • CombineLatest

combineLatest(Observable, Observable, Func2)用于将两个Observale最近发射的数据已经Func2函数的规则进展组合。下面是官方提供的原理图:

技术图片

下面这张图应该更容易理解:

技术图片

 

List<String> communityNames = DataSimulator.getCommunityNames();
List<Location> locations = DataSimulator.getLocations();
Observable<String> communityNameSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, String>() {
            @Override
            public String call(Long position) {
                return communityNames.get(position.intValue());
            }
        }).take(communityNames.size());
Observable<Location> locationSequence = Observable.interval(1, TimeUnit.SECONDS)
        .map(new Func1<Long, Location>() {
            @Override
            public Location call(Long position) {
                return locations.get(position.intValue());
            }
        }).take(locations.size());

Observable.combineLatest(
        communityNameSequence,
        locationSequence,
        new Func2<String, Location, String>() {
            @Override
            public String call(String communityName, Location location) {
                return "小区名:" + communityName + ", 经纬度:" + location.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.exit(0);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("Error:" + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

程序输出:

小区名:竹园新村, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(21.827, 23.323)
小区名:康桥半岛, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(11.923, 16.309)
小区名:中粮·海景壹号, 经纬度:(22.273, 53.623)
小区名:浦江名苑, 经纬度:(22.273, 53.623)
小区名:南辉小区, 经纬度:(22.273, 53.623)
  • SwitchOnNext

switchOnNext(Observable<? extends Observable<? extends T>>用来将一个发射多个小Observable的源Observable转化为一个Observable,然后发射这多个小Observable所发射的数据。如果一个小的Observable正在发射数据的时候,源Observable又发射出一个新的小Observable,则前一个Observable发射的数据会被抛弃,直接发射新 的小Observable所发射的数据。

结合下面的原理图大家应该很容易理解,我们可以看到下图中的黄色圆圈就被丢弃了。

技术图片

  • Join

join(Observable, Func1, Func1, Func2)我们先介绍下join操作符的4个参数:

  • Observable:源Observable需要组合的Observable,这里我们姑且称之为目标Observable;
  • Func1:接收从源Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了源Obsrvable发射出来的数据的有效期;
  • Func1:接收目标Observable发射来的数据,并返回一个Observable,这个Observable的声明周期决定了目标Obsrvable发射出来的数据的有效期;
  • Func2:接收从源Observable和目标Observable发射出来的数据,并将这两个数据组合后返回。

所以Join操作符的语法结构大致是这样的:onservableA.join(observableB, 控制observableA发射数据有效期的函数, 控制observableB发射数据有效期的函数,两个observable发射数据的合并规则)

join操作符的效果类似于排列组合,把第一个数据源A作为基座窗口,他根据自己的节奏不断发射数据元素,第二个数据源B,每发射一个数据,我们都把它和第一个数据源A中已经发射的数据进行一对一匹配;举例来说,如果某一时刻B发射了一个数据“B”,此时A已经发射了0,1,2,3共四个数据,那么我们的合并操作就会把“B”依次与0,1,2,3配对,得到四组数据: [0, B][1, B] [2, B] [3, B]

再看看下面的图是不是好理解了呢?!

技术图片

读懂了上面的文字,我们再来写段代码加深理解。

final List<House> houses = DataSimulator.getHouses();//模拟的房源数据,用于测试

//用来每秒从houses总取出一套房源并发射出去
Observable<House> houseSequence =
        Observable.interval(1, TimeUnit.SECONDS)
                .map(new Func1<Long, House>() {
                    @Override
                    public House call(Long position) {
                        return houses.get(position.intValue());
                    }
                }).take(houses.size());//这里的take是为了防止houses.get(position.intValue())数组越界

//用来实现每秒发送一个新的Long型数据
Observable<Long> tictoc = Observable.interval(1, TimeUnit.SECONDS);

houseSequence.join(tictoc,
        new Func1<House, Observable<Long>>() {
            @Override
            public Observable<Long> call(House house) {
                return Observable.timer(2, TimeUnit.SECONDS);
            }
        },
        new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.timer(0, TimeUnit.SECONDS);
            }
        },
        new Func2<House, Long, String>() {
            @Override
            public String call(House house, Long aLong) {
                return aLong + "-->" + house.getDesc();
            }
        }
).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.exit(0);
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error:"+e.getMessage());
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
});

程序输出:

0-->中粮海景壹号新出大平层!总价4500W起
1-->中粮海景壹号新出大平层!总价4500W起
1-->满五唯一,黄金地段
2-->中粮海景壹号新出大平层!总价4500W起
2-->满五唯一,黄金地段
2-->一楼自带小花园
3-->一楼自带小花园
3-->毗邻汤臣一品
4-->毗邻汤臣一品
4-->顶级住宅,给您总统般尊贵体验
5-->顶级住宅,给您总统般尊贵体验
5-->顶层户型,两室一厅
6-->顶层户型,两室一厅
6-->南北通透,豪华五房
7-->南北通透,豪华五房

通过转换操作符过滤操作符组合操作符三个篇幅将RxJava主要的操作符也介绍的七七八八了。更多操作符的介绍建议大家去查阅官方文档,并自己动手实践一下。 

Demo源码地址:GitHub - BaronZ88/HelloRxJava: RxJavaDemo

以上是关于rxjava-几类变换2的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava2 Observable的数据变换详解及实例

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

如何取消订阅RxKotlin / RxJava中的Flowable?

RxJava 教程-1 简介 原理 线程控制 变换