rxJava体系比较大,为了收缩范围还是从我实际开发用的着手。
private Observable request(Observable observable, int flag) {
if (flag == SINGLE) {
//Observable 映射return observable.map(this.functionSingle()).compose(schedulersTransformer());} else {
return observable.map(this.functionList()).compose(schedulersTransformer());}
}
这里对observable操作有两种,一种是map,一种compose。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));}
这个map方法传入的参数是一个泛型类。Function,拥有两个泛型参数一个是T,一个是R。这个T参数属于Observable 构造对象携带的参数,大家要注意研究这个rxjava的时候,心中一定随时关注这个Observable 对象,因为这个是RxJava的核心落脚点。看下Funtion 类型实际上是一个
public interface Function<T, R> {
/*** Apply some calculation to the input value and return some other value.* @param t the input value* @return the output value* @throws Exception on error*/R apply(@NonNull T t) throws Exception;
}
然后看下new ObservableMap<T, R>(this, mapper) 构造了一个ObservableMap。
有必要把整个ObservableMap 都贴出来。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;//构造方法传入参数。 public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {
//实际上是订阅了一个观察者。接下来关注这个观察者的onNext方法,最终可以遇见肯定是要调到function的apply方法。其实RxJava本质就是传入一堆对象,然后进行接口调用。source.subscribe(new MapObserver<T, U>(t, function));}static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {
if (done) {
return;}if (sourceMode != NONE) {
downstream.onNext(null);return;}U v;try {
//这个地方就是调用之前传递到的function的方法,其实这个也是对结果进行处理属于增加观察者的一种方式。v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {
fail(ex);return;}downstream.onNext(v);}@Overridepublic int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Exception {
T t = qd.poll();return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}}
}
现在再来分析 observalbe.compose(schedulersTransformer()); composer方法传入的是ObservableTransformer 。从名称来看,属于观察源的转换。这个接口实际上就只有一个apply方法。我们现在看下。我们的实际用法中,subscribeOn(Scheduler)和 observeOn(Scheduler scheduler)这两个方法对于安卓来说是非常重要并且明显的。
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));}
首先调用的apply方法。我们看下
private ObservableTransformer schedulersTransformer() {
return new ObservableTransformer() {
@Overridepublic ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());}};}
现在把结论说明一下,subscribeOn 代表观察源所跑的Scheduler,observeOn 代表观察者也就是结果所在的线程,由于安卓都要求在主线中刷新UI,所以这里面直接用了跑在主线程的Scheduler。这两个方法里面的逻辑还是有点多。我们分开来讲。
首先对subscribeOn 有个直观解释。我贴一个rx官网对这个api的介绍,相信都能看明白。
the SubscribeOn operator designates which thread the Observable will begin operating on。
指明Observable 应该被Schedulers.io()里的thread 运行。observeOn 是指在指定shcedule的thread来接收这个事件。这两个方法其实体现了rxjava的精髓,那就是可以做到线程切换,链式调用。事件产生
可以指定线程,并且可以对事件进行多次apply。传递到observe的时候也能做到线程指定,并且可以做到层层的拦截。
首先subscribeOn(Schedulers.io())
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);this.scheduler = scheduler;}@Overridepublic void subscribeActual(final Observer<? super T> observer) {
//此处方法是构造了一个SubscribeOnObserver 的parent。final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);observer.onSubscribe(parent);parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;final Observer<? super T> downstream;final AtomicReference<Disposable> upstream;SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;this.upstream = new AtomicReference<Disposable>();}@Overridepublic void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);}@Overridepublic void onNext(T t) {
downstream.onNext(t);}@Overridepublic void onError(Throwable t) {
downstream.onError(t);}@Overridepublic void onComplete() {
downstream.onComplete();}@Overridepublic void dispose() {
DisposableHelper.dispose(upstream);DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {
return DisposableHelper.isDisposed(get());}void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);}}final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;}@Overridepublic void run() {
source.subscribe(parent);}}
}
现在要定位到Scheduler了,rxjava概念的另一个核心,(Obserable,Observe,Scheduler)。结合上面代码来分析下这个调度器。
IO = RxJavaPlugins.initIoScheduler(new IOTask());
这里最终使用的是IoScheduler.