Rx学习
Posted 西北野狼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rx学习相关的知识,希望对你有一定的参考价值。
RXjava学习资料:
https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details
如下只是学习笔记而已,后面添加实战案例,现在只是理论总结:
Rxjava语言特点:
1,易于并发从而更好的利用服务器的能力;
2,易于有条件的异步执行;
3,一种更好的方式来避免回调地狱;
4,一种响应式方法。
RXjava源于观察者模式:
添加了如下三个缺少的功能:
1,生产者在没有更多数据可用时能够发出信号通知:oncompleted()事件。
2,生产者在发生错误的时候能够发出信号通知:onError()事件。
3,RxJavaObservables能够组合而不是嵌套,从而避免开发者陷入回调地狱。
RXjava中的四个角色:
1,Observable:观察得到的,看的见得
2,Observer:观察者
3,Subscriber:订阅者
4,Subjects:服从。
Observables和Subjects是两个“生产”实体
Observers和Subscribers是两个“消费”实体。
Rxjava之Observable
onNext(T)检索数据
onError(Throwable)发现错误
onCompleted()完成
RXjava之热Observables和冷Observables
热Observables:只要创建完就发射数据,订阅他的观察者从序列中某位置接受数据。
冷Observables:一直等待,知道观察者订阅他才发射数据,可确保能收到数据序列。
Observale创建的两种方式:
1:create
Observable.create(new Observable.OnSubscribe<Object>(){ @Override public void call(Subscriber<? super Object> subscriber) { } });
2:from:从列表,数组来创建Observable,并一个个发射数据
List<Integer> items = new ArrayList<Integer>(); items.add(1); Observable<Integer> observableString = Observable.from(items); Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh,no! Something wrong happened!"); } @Override public void onNext(Integer item) { System.out.println("Item is " + item); } });
3,just
just()方法可以传入一到九个参数,它们会按照传入的参数的顺序来发射它
们。
无理由不发射数据并结束操作:
Observable.empty(),Observable.never(),和
Observable.throw()
RxJava提供四种不同的Subject:
PublishSubject
BehaviorSubject:向订阅者发送截止订阅前最新的一个数据对象,然后发送订阅后数据流;
ReplaySubject:缓存所订阅的所有数据,向任意一个订阅他的观察者重发数据流;
AsyncSubject:当Observable完成AnsySubject只会发布最后一个数据给已定订阅的每一个观察者。
as中开发使用RXjava,RXandroid:
gradle依赖:
compile ‘io.reactivex:rxandroid:1.1.0‘
compile ‘io.reactivex:rxjava:1.1.0‘
其他推荐的as插件:
1,lombok:getsettostringequal等注解代码
2,butterknife:findviewbyidonclick代码等的注解代码
3,retrolambda:java8lambda相关函数
具体作用情百度或者谷歌进行搜索
just主要是得到原始的observable版本,在一个新的响应式架构的基础上迁移已存在的代码,这个方法可
能是一个有用的开始点。
private void loadApps(AppInfo appOne,AppInfo appTwo,AppInfo appThree) mRecyclerView.setVisibility(View.VISIBLE); Observable.just(appOne,appTwo,appThree) .subscribe(new Observer<AppInfo>() { @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), "Here is the list!" } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!" mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfo appInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1,appInfo); } }); }
repeat()重复发数据:
repeat(3):重复发送三次数据;
private void loadApps(AppInfo appOne,AppInfo appTwo,AppInfo appThree) mRecyclerView.setVisibility(View.VISIBLE); Observable.just(appOne,appTwo,appThree) .repeat(3) .subscribe(new Observer<AppInfo>() { @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), "Here is the list!" } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!" mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfo appInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1,appInfo); } }); }
defer()声明一个Observable但是你又想推迟这个Observable的创建直到观察者订阅时
1 private Observable<Integer> getInt(){ return Observable.create(subscriber -> { if(subscriber.isUnsubscribed()){ return; } App.L.debug("GETINT"); subscriber.onNext(42); subscriber.onCompleted(); }); } 2 Observable<Integer> deferred = Observable.defer(this::getInt); 3 deferred.subscribe(number -> { App.L.debug(String.valueOf(number)); });
range()
你需要从一个指定的数字X开始发射N个数字吗?你可以用 range,下面是从10开始,发送3个数字:
Observable.range(10,3) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { Toast.makeText(getActivity(), "Yeaaah!", Toast.LENGTH_LONG).show(); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show(); } @Override public void onNext(Integer number) { Toast.makeText(getActivity(), "I say " + number, Toast.LENGTH_SHORT).show(); } });
interval()
interval() 函数在你需要创建一个轮询程序时非常好用。
如下每隔3秒toast下数据:
Subscription stopMePlease = Observable.interval(3,TimeUnit.SECONDS) .subscribe(new Observer<Integer>() { @Override public void onCompleted() { Toast.makeText(getActivity(), "Yeaaah!", Toast.LENGTH_LONG).show(); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show(); } @Override public void onNext(Integer number) { Toast.makeText(getActivity(), "I say " + number, Toast.LENGTH_SHORT).show(); } });
timer()
如果你需要一个一段时间之后才发射的Observable,你可以像下面的例子使用timer():
如下3秒后发送数据:
Observable.timer(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(Long number) { Log.d("RXJAVA", "I say " + number); } });
如何过滤数据 Observables?
RxJava让我们使用 filter() 方法来过滤我们观测序列中不想要的值,filter((appInfo) ->appInfo.getName().startsWith("C"))是只想展示c开头的数据。
1,过滤序列:
private void loadList(List<AppInfo> apps) { mRecyclerView.setVisibility(View.VISIBLE); Observable.from(apps) .filter((appInfo) -> appInfo.getName().startsWith("C")) .subscribe(new Observer<AppInfo>() { @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!" mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfo appInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1, appInfo); } }); }
过滤空数据:
.filter(new Func1<AppInfo, Boolean>() { @Override public Boolean call(AppInfo appInfo) { return appInfo != null; } })
获取开头或者结尾的几个数据:take() 或 takeLast()。
take前几个数据;
takelast后几个数据。
如下是获取前三个数据:
private void loadList(List<AppInfo> apps) { mRecyclerView.setVisibility(View.VISIBLE); Observable.from(apps) .take(3) .subscribe(new Observer<AppInfo>() { @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!" mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfo appInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1, appInfo); } }); }
获取后三个数据:
Observable.from(apps) .takeLast(3) .subscribe(...);
数据去重Distinct:
制造重复数据:
Observable<AppInfo> fullOfDuplicates = Observable.from(apps) .take(3) .repeat(3);
去重:
fullOfDuplicates.distinct() .subscribe(new Observer<AppInfo>() { @Override public void onCompleted() { mSwipeRefreshLayout.setRefreshing(false); } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), "Something went wrong!" mSwipeRefreshLayout.setRefreshing(false); } @Override public void onNext(AppInfo appInfo) { mAddedApps.add(appInfo); mAdapter.addApplication(mAddedApps.size() - 1, appInfo); } }); }
以上是关于Rx学习的主要内容,如果未能解决你的问题,请参考以下文章