RxJava响应式编程之初级了解
Posted 进击的小猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava响应式编程之初级了解相关的知识,希望对你有一定的参考价值。
据说现在流行的开发模式是 Retrofit+RxJava+MVP+ButterKnife
如果想要简单学习ButterKnife、MVP模式,可以参考我以前的例子
使用butterknife注解框架
Android—MVP设计模式高级(三)
今天我就简单来学习下RxJava的相关知识
以前我也只是听说过RxJava,RxJava这个到底是什么东西呢?
呵呵,它其实是一个库,所以我们使用里面的方法,得需要下载库,所以我们需要在AS中进行配置
1.RxJava 地址以及添加
github地址:
https://github.com/ReactiveX/RxJava
或者
https://github.com/ReactiveX/RxAndroid
依赖库添加:
compile ‘io.reactivex:rxjava:1.1.6’
或者
compile ‘io.reactivex:rxandroid:1.2.1’
2.RxJava是什么类型的库?它的原理是什么?
RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。
Rx:函数响应式编程,也许这个词对你我都很只可意会,不可言传,先抛开Rx不说,我们接触到的类似的这样的思路,大概有接口回调、Handler通讯、广播通讯、还有一个开源的EventBus、以及ContentPorivider里面的观察者模式、AsyncTask 我们朦胧中也许就大概了解RxJava是怎么个东西了。
RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。
至于观察者我就拿ContentProvider来说吧,比如我们在一个ContentProvider中有一个insert方法,插入完毕后,去通知某个监听该URI变化的界面
getContext().getContentResolver().notifyChange(URI, null);
比如在MainActivity中去registerContentObserver注册一个内容观察者
private static final Uri URI = Uri
.parse("content://com.example.contentprovider.MyContentProvider/student");
contentResolver.registerContentObserver(uri, true, observer);
private ContentObserver observer = new ContentObserver(null) {
public void onChange(boolean selfChange) {
// 说明数据有改变,重新查询一直所有记录
Uri uri = Uri.parse("content://com.example.contentprovider.MyContentProvider/student");
Cursor cursor = contentResolver.query(uri, null, null, null, null);
Log.e("TAG", "onChange() count=" + cursor.getCount());
};
};
那么对于RxJava 的观察者模式呢?
RxJava 有四个基本概念:Observable (被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。
对比ContentProvider,RxJava里面的被观察者就是某个Uri(也就是某个数据库),观察者就是某个界面(比如MainActivity),订阅就是registerContentObserver,事件就是insert方法
Rxjava本质主要就是异步任务 外层构建了一个观察者的设计模式 它更简洁 我们调用api方法 几乎看不到 方法里面的 分线程数据操作 它只是利用了 一种观察者的设计模式 来进行 主分线程的通讯 来进行响应操作
我们来看看RxJava的察者模式流程交互图:
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 insert)之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
我们来看下Observable是如何定义的?
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
subscriber.onStart();
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
})
我们看看create源码里面做了什么?
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用一次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
然后在看Observer中做了什么操作?
subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
Log.i(TAG, "onNext: s = " + s);
}
});
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
接下来我们将全部的代码贴出来,看看Log日志打印:
同步方式
package com.example.administrator.myapplication;
import android.app.Activity;
import android.os.Bundle;
import android.util.Log;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
public class MainActivity extends Activity {
private String TAG = "MainActivity";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
subscriber.onStart();
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
})
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
Log.i(TAG, "onNext: s = " + s);
}
});
}
}
call: threadId:1
onNext: threadId:1
onNext: s = Hello World!
onCompleted: threadId:1
从上可以看出,事件的处理和结果的接收都是在同一个线程里面处理的。但是,Rxjava的意义何在,异步呢?别急,看以下代码的处理,你就会发现了,异步原来是这么的简单。
异步方式
我们将上面的代码稍微改下,增加2行代码
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.d(TAG, "call: threadId:" + Thread.currentThread().getId());
subscriber.onStart();
subscriber.onNext("Hello World!");
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: threadId:" + Thread.currentThread().getId());
Log.i(TAG, "onNext: s = " + s);
}
});
我们添上如下2行代码
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
我们看下Log输出
03-08 07:20:18.101 22734-22734/? I/MainActivity: testFunction: threadId:1
03-08 07:20:18.123 22734-22755/? D/MainActivity: call: threadId:180
03-08 07:20:18.142 22734-22734/? D/MainActivity: onNext: threadId:1
03-08 07:20:18.142 22734-22734/? I/MainActivity: onNext: s = Hello World!
03-08 07:20:18.143 22734-22734/? D/MainActivity: onCompleted: threadId:1
看见了没,第二行log日志threadId与其它的threadId很明显的不一样啊,说明我们在处理事件的时候,发生在了一个新的线程里面,而结果的接收,还是在主线程里面操作的。怎么样,只要添加两句话,异步立马就实现了,异步处理耗时操作,就是这么easy。
我们简单看下源码
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
什么意思呢?
Scheduler scheduler参数就是执行订阅操作,返回一个源观察到的修改,使其订阅发生在指定的线程
observeOn(AndroidSchedulers.mainThread())
句话说,observeOn() 指定的是它之后的操作所在的线程。
打印字符串数组 from和just方式
以上是RxJava的很基础很简单的一个用法,那么我们接着往下看,比如我们有一组需求把一个String数组的字符串,单个打印出来,我们用Rxjava怎么实现呢?看代码:
Log.i(TAG, "testFunction: threadId:" + Thread.currentThread().getId());
Observable.from(new String[]{"one","two","three","four"})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
});
看Log输出日志如下:
testFunction: threadId:1
onNext: s = one
onNext: s = two
onNext: s = three
onNext: s = four
onCompleted: threadId:1
From操作符用来将某个对象转化为Observable对象,并且依次将其内容发射出去。这个类似于just,但是just会将这个对象整个发射出去。比如说一个含有3个字符串的数组,使用from就会发射4次,每次发射一个数字,而使用just会发射一次来将整个的数组发射出去。
Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
Observable.just("one", "two", "three", "four")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d(TAG, "onCompleted: threadId:" + Thread.currentThread().getId());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: threadId:" + Thread.currentThread().getId());
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: s = " + s);
}
});
03-08 08:09:25.743 32155-32155/? I/MainActivity: testFunction: threadId:1
03-08 08:09:25.784 32155-32155/? I/MainActivity: onNext: s = one
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = two
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = three
03-08 08:09:25.785 32155-32155/? I/MainActivity: onNext: s = four
03-08 08:09:25.785 32155-32155/? D/MainActivity: onCompleted: threadId:1
一对一转换
Log.i(TAG, "testFunction: threadId:"+Thread.currentThread().getId());
Observable.just("1", "2", "3", "4")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
Log.i(TAG, "call: s = "+s);
return Integer.parseInt(s);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.i(TAG, "call: integer = "+integer);
}
});
testFunction: threadId:1
call: s = 1
call: integer = 1
call: s = 2
call: integer = 2
call: s = 3
call: integer = 3
call: s = 4
call: integer = 4
简单说一下Func1,其中的T表示传入的参数类型,R表示方法返回的参数类型。源码如下:
public interface Func1<T, R> extends Function {
R call(T t);
}
上例中还有一个叫做 Action1的类。也是 RxJava 的一个接口,用于包装含有无参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。
可以看到,map() 方法将参数中的 String 对象转换成一个 Integer对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Integer。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。
封装Observable一对多转换
map转换,是一对一的转换,像示例当中,我们把string转成int,但是当我们需要一对多的转换,该怎么做呢?比如说,定义一个学生类:
package com.example.administrator.myapplication;
import java.util.List;
public class Student {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
//////////////////////////
private List<String> courses;
public List<String> getCourses() {
return courses;
}
public void setCourses(List<String> courses) {
this.courses = courses;
}
}
Student student1 = new Student();
student1.setName("safly");
List<String> courses = new ArrayList<>();
courses.add("语文");
courses.add("数学");
courses.add("英语");
student1.setCourses(courses);
Student student2 = new Student();
student2.setName("wyf");
List<String> courses2 = new ArrayList<>();
courses2.add("化学");
courses2.add("地理");
courses2.add("政治");
student2.setCourses(courses2);
Observable.just(student1,student2)
.subscribe(new Action1<Student>() {
@Override
public void call(Student student) {
Log.i(TAG, "call: name = "+student.getName());
List<String> course = student.getCourses();
for(String str:course){
Log.i(TAG, "call: str = "+str);
}
}
});
这里我们没有进行转换,直接just发送过来,没有用到转换,然后在call中进行直接输出了
call: name = safly
call: str = 语文
call: str = 数学
call: str = 英语
call: name = wyf
call: str = 化学
call: str = 地理
call: str = 政治
我们用转换来试试看?
这里我们用到了flatmap这一函数,按通俗的一点理解:我们首先把Student转成了Observable,然后呢,又把student.getCourses()转成string挨个打印出来,结果如下:
Observable.just(student1,student2)
.flatMap(new Func1<Student, Observable<String>>() {
@Override
public Observable<String> call(Student student) {
Log.i(TAG, "Observable " );
return Observable.from(student.getCourses());
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.i(TAG, "call: s = "+s);
}
});
输出
Observable
call: s = 语文
call: s = 数学
call: s = 英语
Observable
call: s = 化学
call: s = 地理
call: s = 政治
我们还记得Observable.from嘛?
From操作符用来将某个对象转化为Observable对象
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(new OnSubscribeFromIterable<T>(iterable));
}
以上是关于RxJava响应式编程之初级了解的主要内容,如果未能解决你的问题,请参考以下文章