retrofit 2 支持rxjava2了吗

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了retrofit 2 支持rxjava2了吗相关的知识,希望对你有一定的参考价值。

参考技术A ,一、 简介 Retrofit是Square公司开发的一款针对android网络请求的框架,Retrofit2底层基于OkHttp实现的,OkHttp现在已经得到Google官方认可,大量的app都采用OkHttp做网络请求,其源码详见OkHttp Github。 本文全部是在Retrofit2.0+版本基础上论... 参考技术B retrofit使用RxJava2的Adapter适配器已经出来了。
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
或者
compile 'com.squareup.retrofit2:adapter-rxjava2:latest.version'
参考技术C 貌似还没有支持

retrofit2+rxjava+mockserver使用和理解

特点概要

Square的retrofit作为目前android比较火的网络框架,具有以下特点:

1、rest api 风格
2、网络核心使用优秀开源框架okttp,其本身致力于组装request和便捷转换response。
3、代码简洁,拓展性高,开发包只有90k左右
4、支持RxJava、Guava、Java8等适配器。

简单使用

案例是一个简单的登陆请求案例。使用name和pwd参数登陆接口,返回获取response并直接转换成User对象,User有两个成员变量name,age。

使用步骤分解为:
1、创建retrofit

private void initRetrofit(){
         retrofit = new Retrofit.Builder()
                .baseUrl("http://192.168.56.1:12306/")
                .addConverterFactory(GsonConverterFactory.create())
                .build();
}

2、定义接口

public interface ApiServicce {

    @FormUrlEncoded
    @POST("/user/login/")
    Call<User> login(@Field("name")String name, @Field("pwd")String pwd);
}

3、创建Call代理对象

 ApiServicce apiService = retrofit.create(ApiServicce.class);
 Call<User> userCall = apiService.login("name","retrofit");

4、发起请求并获取回调内容

    @Override
    public void onResponse(Call<User> call, Response<User> response) {
              User user = response.body();
          }

            @Override
            public void onFailure(Call<User> call, Throwable t) {
                System.err.println(call.toString());
            }
   });

以上四个步骤就完成了一个完成的数据请求和回复的解析。高度解耦的特点只能用简洁优雅来形容。那么他怎么做到的呢我们稍后分析。这里涉及到一个的接口是使用mockserver来模拟完成的,有关于mockserver的工作可以从github mockserver了解。这里我使用的是简易的moco,只需要是简单使用其jar包和定义一个配置便可以轻松的完成一次模拟过程。详情可以参考这里

服务启动命令:
java -jar moco-runner-0.11.0-standalone.jar http -p 12306 -c foo.json

foo.json内容:

[
  {
    "request" :
    {
      "uri" :"/user/login/",
      "method" :"post"
    },
   "response" :
      {
        "json" :
        {
            "name" : "jerry",
            "age" :"24"
        }         
      }
  },
  {
    "request":
    {
        "uri" : "/"
    },
    "response" :
    {
        "text" : "hello"
    }
  }
]

可以看出调用
http://192.168.56.1:12306/返回hello的文本内容,调用http://192.168.56.1:12306/user/login 返回上述的json串再经由retrofit转换成对应的java对象。ok,大体了解了使用方法,开始剖析他的工作原理。

原理剖析

retrofit 工作流程
可以看出其工作流程主要分为下面四步

  • 1、首先build request参数
  • 2、因为不能在主线程请求HTTP,所以你得有个Executer或者线程
  • 3、enqueue后,通过线程去run你的请求
  • 4、得到服务器数据后,callback回调给你的上层。
  • 下面对其源码进行分析:
    结合我们使用过程 retrofit build

    retrofit build

    public Retrofit build() {
          if (baseUrl == null) {
            throw new IllegalStateException("Base URL required.");
          }
    
          okhttp3.Call.Factory callFactory = this.callFactory;
          if (callFactory == null) {
            callFactory = new OkHttpClient();
          }
    
          Executor callbackExecutor = this.callbackExecutor;
          if (callbackExecutor == null) {
            callbackExecutor = platform.defaultCallbackExecutor();
          }
    
          // Make a defensive copy of the adapters and add the default Call adapter.
          List<CallAdapter.Factory> adapterFactories = new ArrayList<>(this.adapterFactories);
          adapterFactories.add(platform.defaultCallAdapterFactory(callbackExecutor));
    
          // Make a defensive copy of the converters.
          List<Converter.Factory> converterFactories = new ArrayList<>(this.converterFactories);
    
          return new Retrofit(callFactory, baseUrl, converterFactories, adapterFactories,
              callbackExecutor, validateEagerly);
        }

    实例CallFactory,CallFactory决定调用的网络模块使用的是哪个,这里默认使用的是OkHttpClient。成员callbackExecutor为执行回调的任务或线程。成员adapterFactories存放CallAdapter的一个列表,并且add一个默认的AdapterFactory,AdapterFactory中get方法用来获取到指定的CallAdapter。先来看下默认的AdapterFactory是什么,位置ExecutorCallAdapterFactory类。

    CallAdapter

     @Override
      public CallAdapter<Call<?>> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
        if (getRawType(returnType) != Call.class) {
          return null;
        }
        final Type responseType = Utils.getCallResponseType(returnType);
        return new CallAdapter<Call<?>>() {
          @Override public Type responseType() {
            return responseType;
          }
    
          @Override public <R> Call<R> adapt(Call<R> call) {
            return new ExecutorCallbackCall<>(callbackExecutor, call);
          }
        };
     }

    return new ExecutorCallbackCall,那么看下这个ExecutorCallbackCall是什么

    static final class ExecutorCallbackCall<T> implements Call<T> {
        final Executor callbackExecutor;
        final Call<T> delegate;
    
        ExecutorCallbackCall(Executor callbackExecutor, Call<T> delegate) {
          this.callbackExecutor = callbackExecutor;
          this.delegate = delegate;
        }
    
        @Override public void enqueue(final Callback<T> callback) {
          if (callback == null) throw new NullPointerException("callback == null");
    
          delegate.enqueue(new Callback<T>() {
            @Override public void onResponse(Call<T> call, final Response<T> response) {
              callbackExecutor.execute(new Runnable() {
                @Override public void run() {
                  if (delegate.isCanceled()) {
                    // Emulate OkHttp's behavior of throwing/delivering an IOException on cancellation.
                    callback.onFailure(ExecutorCallbackCall.this, new IOException("Canceled"));
                  } else {
                    callback.onResponse(ExecutorCallbackCall.this, response);
                  }
                }
              });
            }
    
            @Override public void onFailure(Call<T> call, final Throwable t) {
              callbackExecutor.execute(new Runnable() {
                @Override public void run() {
                  callback.onFailure(ExecutorCallbackCall.this, t);
                }
              });
            }
          });
        }
    
        @Override public boolean isExecuted() {
          return delegate.isExecuted();
        }
    
        @Override public Response<T> execute() throws IOException {
          return delegate.execute();
        }
    
        @Override public void cancel() {
          delegate.cancel();
        }
    
        @Override public boolean isCanceled() {
          return delegate.isCanceled();
        }
    
        @SuppressWarnings("CloneDoesntCallSuperClone") // Performing deep clone.
        @Override public Call<T> clone() {
          return new ExecutorCallbackCall<>(callbackExecutor, delegate.clone());
        }
    
        @Override public Request request() {
          return delegate.request();
        }
      }

    ok,很简单。只是一个Call的代理类,不了解Call对象的可以去研究下okhttp。delegate.enqueue回复之后使用callbackExecutor将回复处理放回到一个子线程交由callback处理。做好这些准备工作之后。需要看下他是如果做到create代理对象的解耦和连接整个的工作流程。

    retrofit create

    retrofit.create(申明的接口class)

    public <T> T create(final Class<T> service) {
        Utils.validateServiceInterface(service);
        if (validateEagerly) {
          eagerlyValidateMethods(service);
        }
        return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
            new InvocationHandler() {
              private final Platform platform = Platform.get();
    
              @Override public Object invoke(Object proxy, Method method, Object... args)
                  throws Throwable {
                // If the method is a method from Object then defer to normal invocation.
                if (method.getDeclaringClass() == Object.class) {
                  return method.invoke(this, args);
                }
                if (platform.isDefaultMethod(method)) {
                  return platform.invokeDefaultMethod(method, service, proxy, args);
                }
                ServiceMethod serviceMethod = loadServiceMethod(method);
                OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
                return serviceMethod.callAdapter.adapt(okHttpCall);
              }
            });
      }

    可以看出这里使用了一个动态代理来代理service类,观察其InvocationHandler代理改变的部分,其Object的方法都自动被原样的执行,当调用service里面的请求方法的时候会被Proxy拦截,loadServiceMethod方法讲原方法包装成ServiceMethod,ServiceMethod类主要工作是解析注解、传参,将他们分装成request,然后再根据具体的返回值,将之前配置的工场Factory生成具体的CallAdapter和ResponseConvert。 OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args)。显式的实例了一个OkhttpCall对象,OkHttpCall封装了Call(okhttp中的对象),说明retrofit是使用了okttp最为网络的核心模块。最后一步serviceMethod.callAdapter.adapt(okHttpCall),callAdater是用来做什么的呢,其本质是将call转成一个T型的策略。下面先对CallAdapter如何工作的进行剖析,并且如何整合Rxjava来使用的。

    结合Rxjava

    如果我们想使用Rxjava 那么接口方法定义得是这样的

        @FormUrlEncoded
        @POST
        Observable<User> rxLogin(@Field("name")String name, @Field("pwd")String pwd);

    那么如何才能转换成我们想要的Observable类型呢,前面我们说过CallAdapter旨在转换Call成为我们想要的T类型。那么需要在CallAdapter上做文章了。重点看下RxJavaCallAdapterFactory类

    public final class RxJavaCallAdapterFactory implements CallAdapter.Factory

    CallAdapter.Factory是Retrofit这个库中的接口,用来给我们自定义去解析我们自己想要的类型用的。

    get方法获取CallAdapter的方法

    @Override
    public CallAdapter<?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
        Class<?> rawType = Utils.getRawType(returnType);
        boolean isSingle = "rx.Single".equals(rawType.getCanonicalName());
        if (rawType != Observable.class && !isSingle) {
          return null;
        }
        if (!(returnType instanceof ParameterizedType)) {
          String name = isSingle ? "Single" : "Observable";
          throw new IllegalStateException(name + " return type must be parameterized"
              + " as " + name + "<Foo> or " + name + "<? extends Foo>");
        }
    
        CallAdapter<Observable<?>> callAdapter = getCallAdapter(returnType);
        if (isSingle) {
          // Add Single-converter wrapper from a separate class. This defers classloading such that
          // regular Observable operation can be leveraged without relying on this unstable RxJava API.
          return SingleHelper.makeSingle(callAdapter);
        }
        return callAdapter;
    }

    如果不是Observable的类型,则不去处理.Observable类型则调用callAdapter()方法获取对应的CallAdapter。callAdapter()方法中return SimpleCallAdapter的实例,那么观察SimpleCallAdapter的实现

    static final class SimpleCallAdapter implements CallAdapter<Observable<?>> {
        private final Type responseType;
        private final Scheduler scheduler;
    
        SimpleCallAdapter(Type responseType, Scheduler scheduler) {
          this.responseType = responseType;
          this.scheduler = scheduler;
        }
    
        @Override public Type responseType() {
          return responseType;
        }
    
        @Override public <R> Observable<R> adapt(Call<R> call) {
          Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
              .lift(OperatorMapResponseToBodyOrError.<R>instance());
          if (scheduler != null) {
            return observable.subscribeOn(scheduler);
          }
          return observable;
        }
      }

    好了,最终我们看到了adapter返回类型就是Observeble了。再回顾下retrofit create的过程中我们代理proxy return的结果是serviceMethod.callAdapter.apater(call)那么我们取得的是不是就一个Observerble对象了!!!那么create拿到的代理对象就是一个observable对象。就可以轻松的使用Rxjava了!!!如果不了解rxjava的可以了解下github rxjava

    Request and Response

    retrofit的重点是是组装request和回复response类型的转换。serviceMethod类是用来根据注解组装请求参数的。serviceMethod的build方法

    public ServiceMethod build() {
          callAdapter = createCallAdapter();
          responseType = callAdapter.responseType();
          if (responseType == Response.class || responseType == okhttp3.Response.class) {
            throw methodError("'"
                + Utils.getRawType(responseType).getName()
                + "' is not a valid response body type. Did you mean ResponseBody?");
          }
          responseConverter = createResponseConverter();
    
          for (Annotation annotation : methodAnnotations) {
            parseMethodAnnotation(annotation);
          }
    
          if (httpMethod == null) {
            throw methodError("HTTP method annotation is required (e.g., @GET, @POST, etc.).");
          }
    
          if (!hasBody) {
            if (isMultipart) {
              throw methodError(
                  "Multipart can only be specified on HTTP methods with request body (e.g., @POST).");
            }
            if (isFormEncoded) {
              throw methodError("FormUrlEncoded can only be specified on HTTP methods with "
                  + "request body (e.g., @POST).");
            }
          }
    
          int parameterCount = parameterAnnotationsArray.length;
          parameterHandlers = new ParameterHandler<?>[parameterCount];
          for (int p = 0; p < parameterCount; p++) {
            Type parameterType = parameterTypes[p];
            if (Utils.hasUnresolvableType(parameterType)) {
              throw parameterError(p, "Parameter type must not include a type variable or wildcard: %s",
                  parameterType);
            }
    
            Annotation[] parameterAnnotations = parameterAnnotationsArray[p];
            if (parameterAnnotations == null) {
              throw parameterError(p, "No Retrofit annotation found.");
            }
    
            parameterHandlers[p] = parseParameter(p, parameterType, parameterAnnotations);
          }
    
          if (relativeUrl == null && !gotUrl) {
            throw methodError("Missing either @%s URL or @Url parameter.", httpMethod);
          }
          if (!isFormEncoded && !isMultipart && !hasBody && gotBody) {
            throw methodError("Non-body HTTP method cannot contain @Body.");
          }
          if (isFormEncoded && !gotField) {
            throw methodError("Form-encoded method must contain at least one @Field.");
          }
          if (isMultipart && !gotPart) {
            throw methodError("Multipart method must contain at least one @Part.");
          }
    
          return new ServiceMethod<>(this);
        }

    createCallAdapter创建对应的callAdapter,即从CallAdapterFactory中取出需要的适配器,responseConverter创建对应的回复处理类,从List

    @Override public void enqueue(final Callback<T> callback) {
        if (callback == null) throw new NullPointerException("callback == null");
    
        okhttp3.Call call;
        Throwable failure;
    
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already executed.");
          executed = true;
    
          call = rawCall;
          failure = creationFailure;
          if (call == null && failure == null) {
            try {
              call = rawCall = createRawCall();
            } catch (Throwable t) {
              failure = creationFailure = t;
            }
          }
        }
    
        if (failure != null) {
          callback.onFailure(this, failure);
          return;
        }
    
        if (canceled) {
          call.cancel();
        }
    
        call.enqueue(new okhttp3.Callback() {
          @Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse)
              throws IOException {
            Response<T> response;
            try {
              response = parseResponse(rawResponse);
            } catch (Throwable e) {
              callFailure(e);
              return;
            }
            callSuccess(response);
          }
    
          @Override public void onFailure(okhttp3.Call call, IOException e) {
            try {
              callback.onFailure(OkHttpCall.this, e);
            } catch (Throwable t) {
              t.printStackTrace();
            }
          }
    
          private void callFailure(Throwable e) {
            try {
              callback.onFailure(OkHttpCall.this, e);
            } catch (Throwable t) {
              t.printStackTrace();
            }
          }
    
          private void callSuccess(Response<T> response) {
            try {
              callback.onResponse(OkHttpCall.this, response);
            } catch (Throwable t) {
              t.printStackTrace();
            }
          }
        });
      }

    使用okttp的call对象拿到回复之后调用parseResponse获得到指定的response对象。最终是调用serviceMethod中的toResponse进行转换成对应的T类型。

     /** Builds a method return value from an HTTP response body. */
      T toResponse(ResponseBody body) throws IOException {
        return responseConverter.convert(body);
      }

    所以这里就是使用了我们指定的responseConverter进行转换。如GsonConverterFactory。转换的方式根据不同Converter来定,不做详述。由此可见,通过ServiceMethod的协调完成了request的组装和response回复的转换。

以上是关于retrofit 2 支持rxjava2了吗的主要内容,如果未能解决你的问题,请参考以下文章

网络框架封装(retrofit2+rxjava2+okhttp3)

java RxJava2和Retrofit 2.2.0兼容工厂,它包装{@link RxJava2CallAdapterFactory}并负责错误转换。

浅谈Retrofit2+Rxjava2

Retrofit2+Rxjava2的用法

Android实战——RxJava2+Retrofit+RxBinding解锁各种新姿势

使用 RxJava2 和 Retrofit2 时如何访问响应头?