1,主要类是CallExecuteObservable
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {// Since Call is a one-shot type, clone it for each new observer.Call<T> call = originalCall.clone();CallDisposable disposable = new CallDisposable(call);observer.onSubscribe(disposable);boolean terminated = false;try {//执行网络请求Response<T> response = call.execute();//添加是否取消订阅的判断if (!disposable.isDisposed()) {//正常会执行这里,调用我们的观察者的onNextobserver.onNext(response);}if (!disposable.isDisposed()) {terminated = true;//调用我们的观察者的onCompleteobserver.onComplete();}} catch (Throwable t) {Exceptions.throwIfFatal(t);if (terminated) {RxJavaPlugins.onError(t);} else if (!disposable.isDisposed()) {try {//调用我们的观察者的onErrorobserver.onError(t);} catch (Throwable inner) {Exceptions.throwIfFatal(inner);RxJavaPlugins.onError(new CompositeException(t, inner));}}}
}
2,用retrofit +rxjava+okhttp封装的网络调用
以下是简单的调用,具体回调怎么实现,看了上面的源码应该很清晰了
OkhttpManager.getInstance().test1(map).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
});