Rx学习

Posted 西北野狼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rx学习相关的知识,希望对你有一定的参考价值。

  RXjava学习资料:
  
  https://www.gitbook.com/book/yuxingxin/rxjava-essentials-cn/details
  
  如下只是学习笔记而已,后面添加实战案例,现在只是理论总结:
  
  Rxjava语言特点:
  
  1,易于并发从而更好的利用服务器的能力;
  
  2,易于有条件的异步执行;
  
  3,一种更好的方式来避免回调地狱;
  
  4,一种响应式方法。
  
  RXjava源于观察者模式:
  
  添加了如下三个缺少的功能:
  
  1,生产者在没有更多数据可用时能够发出信号通知:oncompleted()事件。
  
  2,生产者在发生错误的时候能够发出信号通知:onError()事件。
  
  3,RxJavaObservables能够组合而不是嵌套,从而避免开发者陷入回调地狱。
  
  RXjava中的四个角色:
  
  1,Observable:观察得到的,看的见得
  
  2,Observer:观察者
  
  3,Subscriber:订阅者
  
  4,Subjects:服从。
  
  Observables和Subjects是两个“生产”实体
  
  Observers和Subscribers是两个“消费”实体。
  
  Rxjava之Observable
  
  onNext(T)检索数据
  
  onError(Throwable)发现错误
  
  onCompleted()完成
  
  RXjava之热Observables和冷Observables
  
  热Observables:只要创建完就发射数据,订阅他的观察者从序列中某位置接受数据。
  
  冷Observables:一直等待,知道观察者订阅他才发射数据,可确保能收到数据序列。

 

Observale创建的两种方式:

1:create

Observable.create(new Observable.OnSubscribe<Object>(){
@Override
public void call(Subscriber<? super Object> subscriber) {

}
});

2:from:从列表,数组来创建Observable,并一个个发射数据

List<Integer> items = new ArrayList<Integer>();
items.add(1);
Observable<Integer> observableString = Observable.from(items);
Subscription subscriptionPrint = observableString.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no! Something wrong happened!");
}
@Override
public void onNext(Integer item) {
System.out.println("Item is " + item);
}
});

 

  3,just

  just()方法可以传入一到九个参数,它们会按照传入的参数的顺序来发射它

  们。

  无理由不发射数据并结束操作:

  Observable.empty(),Observable.never(),和

  Observable.throw()

  RxJava提供四种不同的Subject:

  PublishSubject

  BehaviorSubject:向订阅者发送截止订阅前最新的一个数据对象,然后发送订阅后数据流;

  ReplaySubject:缓存所订阅的所有数据,向任意一个订阅他的观察者重发数据流;

  AsyncSubject:当Observable完成AnsySubject只会发布最后一个数据给已定订阅的每一个观察者。

  as中开发使用RXjava,RXandroid:

  gradle依赖:

 

compile ‘io.reactivex:rxandroid:1.1.0‘
compile ‘io.reactivex:rxjava:1.1.0‘

  其他推荐的as插件:

  1,lombok:getsettostringequal等注解代码

  2,butterknife:findviewbyidonclick代码等的注解代码

  3,retrolambda:java8lambda相关函数

  具体作用情百度或者谷歌进行搜索

  just主要是得到原始的observable版本,在一个新的响应式架构的基础上迁移已存在的代码,这个方法可

  能是一个有用的开始点。

private void loadApps(AppInfo appOne,AppInfo appTwo,AppInfo appThree)
mRecyclerView.setVisibility(View.VISIBLE);
Observable.just(appOne,appTwo,appThree)
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!"
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!"
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1,appInfo);
}
});
}

  repeat()重复发数据:

  repeat(3):重复发送三次数据;

private void loadApps(AppInfo appOne,AppInfo appTwo,AppInfo appThree)
mRecyclerView.setVisibility(View.VISIBLE);
Observable.just(appOne,appTwo,appThree)
.repeat(3)
.subscribe(new Observer<AppInfo>() {
@Override
public void onCompleted() {
mSwipeRefreshLayout.setRefreshing(false);
Toast.makeText(getActivity(), "Here is the list!"
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!"
mSwipeRefreshLayout.setRefreshing(false);
}
@Override
public void onNext(AppInfo appInfo) {
mAddedApps.add(appInfo);
mAdapter.addApplication(mAddedApps.size() - 1,appInfo);
}
});
}

defer()声明一个Observable但是你又想推迟这个Observable的创建直到观察者订阅

1
private Observable<Integer> getInt(){
return Observable.create(subscriber -> {
if(subscriber.isUnsubscribed()){
return;
}
App.L.debug("GETINT");
subscriber.onNext(42);
subscriber.onCompleted();
});
}

2
Observable<Integer> deferred = Observable.defer(this::getInt);

3
deferred.subscribe(number -> {
App.L.debug(String.valueOf(number));
});

range()
你需要从一个指定的数字X开始发射N个数字吗?你可以用 range,下面是从10开始,发送3个数字:

Observable.range(10,3)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Yeaaah!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Integer number) {
Toast.makeText(getActivity(), "I say " + number, Toast.LENGTH_SHORT).show();
}
});

interval()
interval() 函数在你需要创建一个轮询程序时非常好用。

如下每隔3秒toast下数据:

Subscription stopMePlease = Observable.interval(3,TimeUnit.SECONDS)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Toast.makeText(getActivity(), "Yeaaah!", Toast.LENGTH_LONG).show();
}
@Override
public void onError(Throwable e) {
Toast.makeText(getActivity(), "Something went wrong!", Toast.LENGTH_SHORT).show();
}
@Override
public void onNext(Integer number) {
Toast.makeText(getActivity(), "I say " + number, Toast.LENGTH_SHORT).show();
}
});

  timer()

  如果你需要一个一段时间之后才发射的Observable,你可以像下面的例子使用timer():

  如下3秒后发送数据:

Observable.timer(3, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(Long number) {
        Log.d("RXJAVA", "I say " + number);
    }
});

 

  如何过滤数据 Observables?
  
  RxJava让我们使用 filter() 方法来过滤我们观测序列中不想要的值,filter((appInfo) ->appInfo.getName().startsWith("C"))是只想展示c开头的数据。

1,过滤序列:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps)
    .filter((appInfo) ->
            appInfo.getName().startsWith("C"))
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!"
                           mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

  过滤空数据:

.filter(new Func1<AppInfo, Boolean>() {
    @Override
    public Boolean call(AppInfo appInfo) {
        return appInfo != null;
    }
})

  获取开头或者结尾的几个数据:take() 或 takeLast()。
  
  take前几个数据;
  
  takelast后几个数据。
  
  如下是获取前三个数据:

private void loadList(List<AppInfo> apps) {
    mRecyclerView.setVisibility(View.VISIBLE);
    Observable.from(apps)
    .take(3)
    .subscribe(new Observer<AppInfo>() {
        @Override
        public void onCompleted() {
            mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onError(Throwable e) {
            Toast.makeText(getActivity(), "Something went wrong!"
                           mSwipeRefreshLayout.setRefreshing(false);
        }
        @Override
        public void onNext(AppInfo appInfo) {
            mAddedApps.add(appInfo);
            mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
        }
    });
}

  获取后三个数据:

Observable.from(apps)
.takeLast(3)
.subscribe(...);

  数据去重Distinct:

  制造重复数据:

Observable<AppInfo> fullOfDuplicates = Observable.from(apps)
                                       .take(3)
                                       .repeat(3);

  去重:

  

fullOfDuplicates.distinct()
.subscribe(new Observer<AppInfo>() {
    @Override
    public void onCompleted() {
        mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onError(Throwable e) {
        Toast.makeText(getActivity(), "Something went wrong!"
                       mSwipeRefreshLayout.setRefreshing(false);
    }
    @Override
    public void onNext(AppInfo appInfo) {
        mAddedApps.add(appInfo);
        mAdapter.addApplication(mAddedApps.size() - 1, appInfo);
    }
});
}

 

以上是关于Rx学习的主要内容,如果未能解决你的问题,请参考以下文章

反应式框架(RX)和异步处理事件

RX学习笔记:JavaScript数组操作

IOS开发-OC学习-常用功能代码片段整理

java SpringRetry学习的代码片段

python 机器学习有用的代码片段

使用 Rx 中预定义的 Subject