RxJava2+Retrofit2+RxLifecycle2使用MVP模式构建项目

Posted Ruffian-痞子

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava2+Retrofit2+RxLifecycle2使用MVP模式构建项目相关的知识,希望对你有一定的参考价值。

前言

眼下Retrofit+RxJava搭配的网络请求框架很是流行,本着学习的态度,写了一个相关的demo。写着写着就想朝着搭建一个项目框架的方向走。于是使用了一下MVP模式。

RxJava 确实挺好用,个人特别喜欢这种“流式”的代码风格,逻辑很清晰,起码提供了一种相对的规范,开发者按照对应的流程写代码,后期的维护和拓展会简单很多。

MVP模式简单说就是为了解耦,各行各职,阅读代码,拓展功能代价不会那么大(或许有些人认为没必要用MVP,直接在activity/fragment中写代码就好了,那只能说你没遇到到过相对大一点的项目,或者没遇到“实习生”写代码,那酸爽,看代码会看得你怀疑人生)

MVP

在使用MVC开发android应用的时候,原理上

  • View:对应于布局文件xml
  • Model:业务逻辑和实体模型
  • Controllor:对应于Activity

但是写代码的时候,你就会发现,好多跟view相关的操作都在activity中实现完成了,导致activity既是Controllor又是View,网上有人称为是MV模式,也因此导致activity的代码量特别大,1000+的代码很常见
后来,Presenter的出现,将Actvity,xml 视为View层,Model不变,Presenter负责完成View层与Model层的交互。于是MVP是这样的:

  • View 对应于Activity,xml 负责View的绘制以及与用户交互
  • Model 依然是业务逻辑和实体模型
  • Presenter 负责完成View于Model间的交互

在网上看了一下MVP的使用demo,挺多人将一个页面需要完成的操作,以及需要用到的控件全部定义到相关的View接口中,示例:

public interface IUserLoginView

    String getUserName();

    String getPassword();

    void clearUserName();

    void clearPassword();

    void showLoading();

    void hideLoading();

    void toMainActivity(User user);

    void showFailedError();

个人觉得,这个有点蛋疼
1.这样view接口需要的方法太多了,有些实现(clearUserName())可以放在activity中操作,第一不需将控件通过接口传到Presenter中,第二我认为这种算是对View的操作,还是可以看作View相关的。
2.我们很难知道一个界面都要实现些什么方法(如果包括对某个控件内容清空等),但是我们不难知道一个activity需要实现哪些主要的功能,比如登录页面就一个登录功能,或者再加多一个第三方登录咯。

所以我觉得View接口中定义一些常用的方法,以及一些需要实现的方法就可以了,通过回调内容,把控件赋值,数据展示等还是放回在activity中操作,presenter只需要将对应的实体或者数据给activity就好了,activity怎么展示,不用管,不关我的事情,做到各行各职。

或许有人会说,咦~你这都不是MVP,网上的MVP这些操作要放在presenter中的,这时我上去就给你一巴掌,我们使用优秀的框架/架构是为了学习它优秀的模式或者编码风格,如果一味的按部就班照着用,那将毫无意义!同时,我们只有不断改进,不断推成出新才能使得技术不断进步,如果前人不对MCV提出质疑,就不会有今天的MVP。

所以我觉得View的接口应该这样
一个BaseView,定义常用的方法,其他页面View接口继承基类

public interface IBaseView 

    //显示loading
    void showLoading();

    //关闭loading
    void closeLoading();

    //显示吐司
    void showToast(String msg);


这个基类怎么写看项目需要,按照开发者各自需求编写。

现在写一个登录的LoginView,继承BaseView同时添加特定的接口

public interface ILoginView extends IBaseView 

    //显示结果
    void showResult(UserBean bean);

这里定义一个showResult(UserBean bean) 将User实体类传给activity,用于展示用户信息。

写到这里的时候的我忽然更加坚定我所理解的MVP是对的,解耦嘛
Presenter:负责获取或者构建UserBean
Activity:负责展示Presenter给过来的数据

之前看到过有人通过View接口将activity的控件几乎“拷贝”到了presenter中,虽然实现了逻辑处理在Presenter,但是如果Presenter逻辑改动还是会牵一发动全身,要改动很多
现在这种方式挺好的,负责构建数据,不参与展示,也方便单元测试。对,就是这样的。

Retrofit2+RxJava2+RxLifecycle2

Retrofit+RxJava确实是一种很不错的搭配,RxJava可以指定运行的线程,在网络请求时,开启线程耗时操作,响应结果时切换为主线程操作UI。非常漂亮,代码风格也赞,我个人称为流式操作,从上到下一步步代表操作的主要逻辑,比起传统的迷之嵌套,迷之缩进好多了。

我们知道RxJava使用订阅模式,如果没有及时取消订阅,会导致内存泄漏,这个是非常糟糕的行为,当然解决方式也很简单,在对应的生命周期取消订阅就好,不过我还是怀着好奇之心Github一下,果然已经有人对此作出了贡献RxLifecycle 通过绑定生命周期可以很方便的管理订阅与取消订阅。

Github: https://github.com/trello/RxLifecycle

Retrofit+RxJava的使用还是挺简单的,不过相对于大家已经用在项目的网络请求框架,它还是需要进行加工的。比如说错误处理,配合RxLifecycle使用,以及很多人会问的,在MVP中使用的时候怎么取消订阅。

先看下最简单的使用
接口代码

 public interface TestApi 
        @GET("v1/mobile/address/query")
        Observable<String> request(@QueryMap Map<String, Object> request);
    

使用代码

        Map<String, Object> request = new HashMap<>();
        Retrofit retrofit = new Retrofit.Builder()
                .client(new OkHttpClient())
                .baseUrl("http://apicloud.mob.com/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        retrofit.create(TestApi.class).request(request).subscribe(new Observer<String>() 
            @Override
            public void onSubscribe(@NonNull Disposable d) 

            

            @Override
            public void onNext(@NonNull String s) 

            

            @Override
            public void onError(@NonNull Throwable e) 

            

            @Override
            public void onComplete() 

            
        );

代码简介,清晰,构建请求参数,构建被观察者对象,以及传入一个观察者对象实现
订阅回调onSubscribe 也可以是开始的操作
成功回调onNext
失败回调onError
监听完成onComplete

或许开发者一眼就看出了onError 回调的对象是Throwable 这个不能忍啊,投入使用的框架肯定得封装,那就从这里开始

错误处理

在常见的网络请求框架中一般会有两个回调函数

    /**
     * 错误/异常回调
     */
    protected abstract void onError(ApiException e);

    /**
     * 成功回调
     */
    protected abstract void onSuccess(T response);

定义onError回调函数触发的场景是:1.异常2.错误
1.异常:请求异常,解析数据出错,网络异常等等
2.错误:某一次请求逻辑错误,(例如:登录错误)
将上述两种情况交给onError回调函数处理
在请求逻辑成功的时候触发一个onSuccess函数。这样监听者就只需要两个函数,一个失败,一个成功,失败提示给用户,成功负责展示数据,跳转页面等

定义一个异常处理类ExceptionEngine

public class ExceptionEngine 

    public static final int UN_KNOWN_ERROR = 1000;//未知错误
    public static final int ANALYTIC_SERVER_DATA_ERROR = 1001;//解析(服务器)数据错误
    public static final int ANALYTIC_CLIENT_DATA_ERROR = 1002;//解析(客户端)数据错误
    public static final int CONNECT_ERROR = 1003;//网络连接错误
    public static final int TIME_OUT_ERROR = 1004;//网络连接超时

    public static ApiException handleException(Throwable e) 
        ApiException ex;
        if (e instanceof HttpException)              //HTTP错误
            HttpException httpExc = (HttpException) e;
            ex = new ApiException(e, httpExc.code());
            ex.setMsg("网络错误");  //均视为网络错误
            return ex;
         else if (e instanceof ServerException)     //服务器返回的错误
            ServerException serverExc = (ServerException) e;
            ex = new ApiException(serverExc, serverExc.getCode());
            ex.setMsg(serverExc.getMsg());
            return ex;
         else if (e instanceof JsonParseException
                || e instanceof JSONException
                || e instanceof ParseException || e instanceof MalformedJsonException)   //解析数据错误
            ex = new ApiException(e, ANALYTIC_SERVER_DATA_ERROR);
            ex.setMsg("解析错误");
            return ex;
         else if (e instanceof ConnectException) //连接网络错误
            ex = new ApiException(e, CONNECT_ERROR);
            ex.setMsg("连接失败");
            return ex;
         else if (e instanceof SocketTimeoutException) //网络超时
            ex = new ApiException(e, TIME_OUT_ERROR);
            ex.setMsg("网络超时");
            return ex;
         else   //未知错误
            ex = new ApiException(e, UN_KNOWN_ERROR);
            ex.setMsg("未知错误");
            return ex;
        
    


异常处理类中,都是常见的错误类型,我们通过解析Throwable转换成统一的错误类ApiException

public class ApiException extends Exception 
    private int code;//错误码
    private String msg;//错误信息

    public ApiException(Throwable throwable, int code) 
        super(throwable);
        this.code = code;
    

    public ApiException(int code, String msg) 
        this.code = code;
        this.msg = msg;
    

    public int getCode() 
        return code;
    

    public void setCode(int code) 
        this.code = code;
    

    public String getMsg() 
        return msg;
    

    public void setMsg(String msg) 
        this.msg = msg;
    

这个类非常简单,一个状态码,一个错误信息,方便我们开发调试。

不过仔细看代码的同学会发现,ServerException这个是什么鬼?“服务器返回的错误”?Throwable怎么知道这个错误类型是ServerException

其实这个ServerException是我们自定义的错误类型,一般我们开发中都会跟服务器约定一种接口请求返回的数据。比如:

  • int code:表示接口请求状态,0表示成功,-101表示密码错误等等
  • String msg:表示接口请求返回的描述。success,”token过期”等等
  • Object result:成功是返回的数据

那么我们就可以在解析服务端返回数据的时候,当code!=0,就抛出ServerException

public class ServerException extends RuntimeException 
    private int code;
    private String msg;

    public ServerException(int code, String msg) 
        this.code = code;
        this.msg = msg;
    

    public int getCode() 
        return code;
    

    public String getMsg() 
        return msg;
    

OK,到这里我们常见的错误类型,异常等都处理完了,通过ExceptionEngine转化为统一的错误类型ApiException,在订阅者回调onError(ApiException e)就可以很方便知道错误的状态码以及对应的描述信息。

 Observable observable = apiObservable
                .map(new ServerResultFunction())
                .onErrorResumeNext(new HttpResultFunction<>())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

我们使用onErrorResumeNext(new HttpResultFunction<>())操作符对Retrofit网络请求抛出的Exception进行处理,我们定义HttpResultFunction处理Retrofit抛出的Exception,通过ExceptionEngine转化为统一的错误类型ApiException

public class HttpResultFunction<T> implements Function<Throwable, Observable<T>> 
    @Override
    public Observable<T> apply(@NonNull Throwable throwable) throws Exception 
        //打印具体错误
        LogUtils.e("HttpResultFunction:" + throwable);
        return Observable.error(ExceptionEngine.handleException(throwable));
    

这一步是对错误,异常等的处理,如果某一个http请求没有发生异常,或者网络错误,就会走onNext回调。

前面我们约定,将服务器返回的逻辑错误也归类到onError。所以我们在.map(new ServerResultFunction())操作符中处理服务器返回的结果是否正确(这里指逻辑正确,即code==0),如果code!=0,就抛出ServerException

public class ServerResultFunction implements Function<HttpResponse, Object> 
    @Override
    public Object apply(@NonNull HttpResponse response) throws Exception 
        //打印服务器回传结果
        LogUtils.e(response.toString());
        if (!response.isSuccess()) 
            throw new ServerException(response.getCode(), response.getMsg());
        
        return new Gson().toJson(response.getResult());
    

解析服务器返回结果 HttpResponse,这个类由开发者自行设置,根据实际情况来定义字段,这里我假设后台返回的字段有

String msg; int retCode; Object result;

/**
 * http响应参数实体类
 * 通过Gson解析属性名称需要与服务器返回字段对应,或者使用注解@SerializedName
 * 备注:这里与服务器约定返回格式
 *
 * @author ZhongDaFeng
 */
public class HttpResponse 

    /**
     * 描述信息
     */
    @SerializedName("msg")
    private String msg;

    /**
     * 状态码
     */
    @SerializedName("retCode")
    private int code;

    /**
     * 数据对象[成功返回对象,失败返回错误说明]
     */
    @SerializedName("result")
    private Object result;

    /**
     * 是否成功(这里约定200)
     *
     * @return
     */
    public boolean isSuccess() 
        return code == 200 ? true : false;
    

    public String toString() 
        String response = "[http response]" + "\\"code\\": " + code + ",\\"msg\\":" + msg + ",\\"result\\":" + new Gson().toJson(result) + "";
        return response;
    


    public String getMsg() 
        return msg;
    

    public void setMsg(String msg) 
        this.msg = msg;
    

    public int getCode() 
        return code;
    

    public void setCode(int code) 
        this.code = code;
    

    public Object getResult() 
        return result;
    

    public void setResult(Object result) 
        this.result = result;
    

这里Result我们使用Object因为接口时通用的,服务端返回的接口类型也是多样的,可能是列表,也可能是JSON对象,或者String字符串,所以这里我们使用Object,在数据解析的时候在转化成为具体的类型

嗯,错误处理搞定了,那就是简单的封装一下Retrofit 和RxJava以及使用RxLifecycle
RxJava使用订阅模式,那我们就需要封装一个被订阅者,一个订阅者,以及使用RxLifecycle自动管理订阅的生命周期

构建Api接口类

public interface UserApi 

    @GET("user/login")
    Observable<HttpResponse> login(@QueryMap Map<String, Object> request);

构建Retrofit工具类获取retrofit实例

public class RetrofitUtils 
    /**
     * 接口地址
     */
    public static final String BASE_API = "http://apicloud.mob.com/";
    public static final int CONNECT_TIME_OUT = 30;//连接超时时长x秒
    public static final int READ_TIME_OUT = 30;//读数据超时时长x秒
    public static final int WRITE_TIME_OUT = 30;//写数据接超时时长x秒
    private static RetrofitUtils mInstance = null;

    private RetrofitUtils() 
    

    public static RetrofitUtils get() 
        if (mInstance == null) 
            synchronized (RetrofitUtils.class) 
                if (mInstance == null) 
                    mInstance = new RetrofitUtils();
                
            
        
        return mInstance;
    

    /**
     * 设置okHttp
     *
     * @author ZhongDaFeng
     */
    private static OkHttpClient okHttpClient() 
        //开启Log
        HttpLoggingInterceptor logging = new HttpLoggingInterceptor(new HttpLoggingInterceptor.Logger() 
            @Override
            public void log(String message) 
                LogUtils.e("okHttp:" + message);
            
        );
        logging.setLevel(HttpLoggingInterceptor.Level.BASIC);
        OkHttpClient client = new OkHttpClient.Builder()
                .connectTimeout(CONNECT_TIME_OUT, TimeUnit.SECONDS)
                .writeTimeout(WRITE_TIME_OUT, TimeUnit.SECONDS)
                .readTimeout(READ_TIME_OUT, TimeUnit.SECONDS)
                .addInterceptor(logging)
                .build();
        return client;
    

    /**
     * 获取Retrofit
     *
     * @author ZhongDaFeng
     */
    public Retrofit retrofit() 
        Retrofit retrofit = new Retrofit.Builder()
                .client(okHttpClient())
                .baseUrl(BASE_API)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();
        return retrofit;
    

构建网络请求(被订阅对象)

/**
 * 适用Retrofit网络请求Observable(被监听者)
 *
 * @author ZhongDaFeng
 */
public class HttpRxObservable 
    /**
     * 获取被监听者
     * 备注:网络请求Observable构建
     * data:网络请求参数
     * <h1>补充说明</h1>
     * 传入LifecycleProvider自动管理生命周期,避免内存溢出
     * 备注:需要继承RxActivity.../RxFragment...
     *
     * @author ZhongDaFeng
     */
    public static Observable getObservable(Observable<HttpResponse> apiObservable, LifecycleProvider lifecycle) 
        //showLog(request);
        Observable observable;
        //随生命周期自动管理.eg:onCreate(start)->onStop(end)
        observable =apiObservable
             .map(new ServerResultFunction())
             .compose(lifecycle.bindToLifecycle())//需要在这个位置添加
             .onErrorResumeNext(new HttpResultFunction<>())
             .subscribeOn(Schedulers.io())
             .observeOn(AndroidSchedulers.mainThread());
        return observable;
    

HttpRxObservable我们构建一个被订阅者Observable并且设置了错误处理,同时添加了生命周期管理。

处理订阅者Observer

/**
 * 适用Retrofit网络请求Observer(监听者)
 * 备注:
 * 1.重写onSubscribe,添加请求标识
 * 2.重写onError,封装错误/异常处理,移除请求
 * 3.重写onNext,移除请求
 * 4.重写cancel,取消请求
 *
 * @author ZhongDaFeng
 */
public abstract class HttpRxObserver<T> implements Observer<T>, HttpRequestListener 

    private String mTag;//请求标识

    public HttpRxObserver() 
    

    public HttpRxObserver(String tag) 
        this.mTag = tag;
    

    @Override
    public void onError(Throwable e) 
        RxActionManagerImpl.getInstance().remove(mTag);
        if (e instanceof ApiException) 
            onError((ApiException) e);
         else 
            onError(new ApiException(e, ExceptionEngine.UN_KNOWN_ERROR));
        
    

    @Override
    public void onComplete() 
    

    @Override
    public void onNext(@NonNull T t) 
        if (!TextUtils.isEmpty(mTag)) 
            RxActionManagerImpl.getInstance().remove(mTag);
        
        onSuccess(t);
    

    @Override
    public void onSubscribe(@NonNull Disposable d) 
        if (!TextUtils.isEmpty(mTag)) 
            RxActionManagerImpl.getInstance().add(mTag, d);
        
        onStart(d);
    

    @Override
    public void cancel() 
        if (!TextUtils.isEmpty(mTag)) 
            RxActionManagerImpl.getInstance().cancel(mTag);
        
    

    /**
     * 是否已经处理
     *
     * @author ZhongDaFeng
     */
    public boolean isDisposed() 
        if (TextUtils.isEmpty(mTag)) 
            return true;
        
        return RxActionManagerImpl.getInstance().isDisposed(mTag);
    

    protected abstract void onStart(Disposable d);

    /**
     * 错误/异常回调
     *
     * @author ZhongDaFeng
     */
    protected abstract void onError(ApiException e);

    /**
     * 成功回调
     *
     * @author ZhongDaFeng
     */
    protected abstract void onSuccess(T response);

使用网络请求的过程中我们肯定会遇到取消请求的场景,这里我们实现一个HttpRequestListener,为每一个请求添加唯一的TAG用来标识具体的每一个请求,开始请求时保存TAG,请求成功/失败移除标志,同时TAG也用做取消请求的标志。

编写一个测试类,示例如何使用Retrofit

/**
 * Retrofit使用demo/测试类
 *
 * @author ZhongDaFeng
 */
public class RetrofitTest 
    public final String TAG = RetrofitTest.class.getSimpleName();//每个网络请求唯一TAG,用于取消网络请求使用
    /**
     * 模拟在activity中调用
     *
     * @author ZhongDaFeng
     */
    public void test(RxActivity activity, String account, String psw) 
        //设置唯一TAG
        HttpRxObserver httpRxObserver = new HttpRxObserver(TAG + "login") 
            @Override
            protected void onStart(Disposable d) 
                /**
                 * 开启loading等
                 */
            
            @Override
            protected void onError(ApiException e) 
                /**
                 * 错误信息
                 */
                LogUtils.w("onError code:" + e.getCode() + " msg:" + e.getMsg());
            
            @Override
            protected void onSuccess(Object response) 
                /**
                 * 成功处理
                 */
                LogUtils.w("onSuccess response:" + response.toString());
            
        ;

        new RetrofitTest().login(activity, account, psw).subscribe(httpRxObserver);

        //取消请求
        /*if(!httpRxObserver.isDisposed())
            httpRxObserver.cancel();
        */

    

    /**
     * 登录demo
     *
     * @author ZhongDaFeng
     */
    public Observable login(RxActivity activity, String phone, String psw) 
        //构建请求数据
        Map<String, Object> request = HttpRequest.getRequest();
        request.put("phone", phone);
        request.put("psw", psw);
        /**
         * 获取请求Observable
         * 1.RxActivity,RxFragment...所在页面继承RxLifecycle支持的组件
         * 2.ActivityEvent指定监听函数解绑的生命周期(手动管理,未设置则自动管理)
         * 以上两点作用防止RxJava监听没解除导致内存泄漏,ActivityEvent若未指定则按照activity/fragment的生命周期
         */
        // return HttpRxObservable.getObservable(ApiUtils.getPhoneApi().phoneQuery(request), activity);
        return HttpRxObservable.getObservable(ApiUtils.getUserApi().login(request), activity, ActivityEvent.PAUSE);
    

使用RxLifecycle 管理订阅生命周期activity需要继承RxActivity。开篇也提到MVP的时候,接下来就说说在MVP中如何使用。
定义一个Presenter基类

public class BasePresenter<V, T> implements LifeCycleListener 

    protected Reference<V> mViewRef;
    protected V mView;
    protected Reference<T> mActivityRef;
    protected T mActivity;


    public BasePresenter(V view, T activity) 
        attachView(view);
        attachActivity(activity);
        setListener(activity);
    

    /**
     * 设置生命周期监听
     *
     * @author ZhongDaFeng
     */
    private void setListener(T activity) 
        if (getActivity() != null) 
            if (activity instanceof BaseActivity) 
                ((BaseActivity) getActivity()).setOnLifeCycleListener(this);
             else if (activity instanceof BaseFragmentActivity) 
                ((BaseFragmentActivity) getActivity()).setOnLifeCycleListener(this);
            
        
    

    /**
     * 关联
     *
     * @param view
     */
    private void attachView(V view) 
        mViewRef = new WeakReference<V>(view);
        mView = mViewRef.get();
    

    /**
     * 关联
     *
     * @param activity
     */
    private void attachActivity(T activity) 
        mActivityRef = new WeakReference<T>(activity);
        mActivity = mActivityRef.get();
    

    /**
     * 销毁
     */
    private void detachView() 
        if (isViewAttached()) 
            mViewRef.clear();
            mViewRef = null;
        
    

    /**
     * 销毁
     */
    private void detachActivity() 
        if (isActivityAttached()) 
            mActivityRef.clear();
            mActivityRef = null;
        
    

    /**
     * 获取
     *
     * @return
     */
    public V getView() 
        if (mViewRef == null) 
            return null;
        
        return mViewRef.get();
    

    /**
     * 获取
     *
     * @return
     */
    public T getActivity() 
        if (mActivityRef == null) 
            return null;
        
        return mActivityRef.get();
    

    /**
     * 是否已经关联
     *
     * @return
     */
    public boolean isViewAttached() 
        return mViewRef != null && mViewRef.get() != null;
    

    /**
     * 是否已经关联
     *
     * @return
     */
    public boolean isActivityAttached() 
        return mActivityRef != null && mActivityRef.get() != null;
    

    @Override
    public void onCreate(Bundle savedInstanceState) 

    

    @Override
    public void onStart() 

    

    @Override
    public void onRestart() 

    

    @Override
    public void onResume() 

    

    @Override
    public void onPause() 

    

    @Override
    public void onStop() 

    

    @Override
    public void onDestroy() 
        detachView();
        detachActivity();
    

JAVA弱引用,管理View的引用,以及activity的引用,避免强引用导致资源无法释放而造成的内存溢出,
写代码的时候想到了一个很巧妙的方式:Presenter中传如一个activity,同时实现Activity生命周期监听,在onDestroy中移除View和Activity的引用,个人觉得这个非常不错。看官可以客观评价一下优劣。

登录Presenter

public class LoginPresenter extends BasePresenter<ILoginView, LoginActivity> 

    private final String TAG = PhoneAddressPresenter.class.getSimpleName();

    public LoginPresenter(ILoginView view, LoginActivity activity) 
        super(view, activity);
    

    /**
     * 登录
     *
     * @author ZhongDaFeng
     */
    public void login(String userName, String password) 

        //构建请求数据
        Map<String, Object> request = HttpRequest.getRequest();
        request.put("username", userName);
        request.put("password", password);

        HttpRxObserver httpRxObserver = new HttpRxObserver(TAG + "getInfo") 

            @Override
            protected void onStart(Disposable d) 
                if (getView() != null)
                    getView().showLoading();
            

            @Override
            protected void onError(ApiException e) 
                LogUtils.w("onError code:" + e.getCode() + " msg:" + e.getMsg());
                if (getView() != null) 
                    getView().closeLoading();
                    getView().showToast(e.getMsg());
                
            

            @Override
            protected void onSuccess(Object response) 
                LogUtils.w("onSuccess response:" + response.toString());
                UserBean bean = new Gson().fromJson(response.toString(), UserBean.class);
                if (getView() != null) 
                    getView().closeLoading();
                    getView().showResult(bean);
                
            
        ;

        /**
         * 切入后台移除RxJava监听
         * ActivityEvent.PAUSE(FragmentEvent.PAUSE)
         * 手动管理移除RxJava监听,如果不设置此参数默认自动管理移除RxJava监听(onCrete创建,onDestroy移除)
         */
        HttpRxObservable.getObservable(ApiUtils.getUserApi().login(request), getActivity(), ActivityEvent.PAUSE).subscribe(httpRxObserver);


        /**
         * ******此处代码为了测试取消请求,不是规范代码*****
         */
        /*try 
            Thread.sleep(50);
            //取消请求
            if (!httpRxObserver.isDisposed()) 
                httpRxObserver.cancel();
            
         catch (InterruptedException e) 
            e.printStackTrace();
        */


    

LoginActivity

public class LoginActivity extends BaseActivity implements ILoginView 

    @BindView(R.id.et_user_name)
    EditText etUserName;
    @BindView(R.id.et_password)
    EditText etPassword;

    private LoginPresenter mLoginPresenter = new LoginPresenter(this, this);

    private RLoadingDialog mLoadingDialog;

    @Override
    protected int getContentViewId() 
        return R.layout.activity_login;
    

    @Override
    protected void init() 
        mLoadingDialog = new RLoadingDialog(this, true);
    

    @Override
    protected void initBundleData() 

    

    @OnClick(R.id.login)
    public void onClick(View v) 
        switch (v.getId()) 
            case R.id.login:
                String userName = etUserName.getText().toString();
                String password = etPassword.getText().toString();

                if (TextUtils.isEmpty(userName) || TextUtils.isEmpty(password)) 
                    return;
                

                mLoginPresenter.login(userName, password);
                break;
        
    


    @Override
    public void showResult(UserBean bean) 
        if (bean == null) 
            return;
        
        showToast(bean.getUid());
    

    @Override
    public void showLoading() 
        mLoadingDialog.show();
    

    @Override
    public void closeLoading() 
        mLoadingDialog.dismiss();
    

    @Override
    public void showToast(String msg) 
        ToastUtils.showToast(mContext, msg);
    

以上是我对MVP的一点理解,以及对RxJava2+Retrofit2+RxLifecycle2进行的封装,由于编写demo的时候是朝着构建一个项目框架走的,所以没有贴完的代码就请移步到我的Github克隆完整版本,尽情的撸吧。
个人认为这个搭配还是可行的,并且已经投入都项目中。好产品一定要自己先使用,认可。哈哈哈

Github地址

https://github.com/RuffianZhong/Rx-Mvp

以上是关于RxJava2+Retrofit2+RxLifecycle2使用MVP模式构建项目的主要内容,如果未能解决你的问题,请参考以下文章

Retrofit2+Rxjava2的用法

RxJava2+Retrofit2+RxLifecycle2使用MVP模式构建项目

Android RxJava2+Retrofit2单文件下载监听进度封装

使用 RxJava2 和 Retrofit2 时如何访问响应头?

一款基于RxJava2+Retrofit2实现简单易用的网络请求框架

retrofit2+MVP+rxjava2+rxlifecycle2 为啥无法解决内存泄露