当前位置: 代码迷 >> 综合 >> rxjava3 源码解析
  详细解决方案

rxjava3 源码解析

热度:65   发布时间:2023-12-06 10:54:22.0

Rxjava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

RxJava

– JVM的反应式扩展

– 一个用于使用Java虚拟机的可观察序列编写异步和基于事件的程序的库。

?
使用的主要设计模式包括:
观察者模式装饰器模式,适配器模式(函数传参不同有不同实现)

?
RxJava源码分析最主要的点在于

  • RxJava是如何从事件流的发送方发送到事件流的接收方的
  • RxJava是如何对操作符进行封装和操作的
  • RxJava是如何随意切换线程的

?
RxJava的分析三步骤

  • 创建:被观察者创建的过程
  • 订阅:被观察者订阅观察者的过程
  • 发射:发射器发射的过程

?

1 简单使用

在 IO 线程中执行业务逻辑,在主线程中对执行的结果进行后续的处理。

Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
    @Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {
    // 在订阅线程执行业务逻辑emitter.onNext(new Object());emitter.onComplete();}
}).subscribeOn(Schedulers.io())//订阅线程.observeOn(AndroidSchedulers.mainThread())//观察线程.subscribe(new Observer<String>() {
    // 在观察线程中进行后续的处理@Overridepublic void onSubscribe(Disposable d) {
    Log.d(TAG, "onSubscribe: " + d);}@Overridepublic void onNext(String value) {
    Log.d(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {
    Log.d(TAG, "onError: " + e);}@Overridepublic void onComplete() {
    Log.d(TAG, "onComplete: ");}});
disposable.dispose();
  1. create()
  2. 线程调度
    • subscribeOn(Schedulers.io())
    • observeOn(AndroidSchedulers.mainThread())
  3. subscribe 订阅
  4. dispose 取消订阅

?

2 主要原理

2.1 主要类

  • Observable(被观察者):被观察的对象

  • Observer(观察者):观察的对象

  • ObservableOnSubscribe(被观察者被订阅时):被订阅时的回调,同时创建出发射器

  • Emitter(发射器):发射数据的对象

  • Disposable(释放者):释放RxJava的对象

1. Observable(被观察者)

最外层的调用接口,可以通过Observable.xxx(create、subscribe) 使用Rxjava

public abstract class Observable<@NonNull T> implements ObservableSource<T> {
    public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}public final void subscribe(@NonNull Observer<? super T> observer) {
    observer = RxJavaPlugins.onSubscribe(this, observer);subscribeActual(observer);//...}protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
}

2. Observer(观察者)

public interface Observer<@NonNull T> {
    void onSubscribe(@NonNull Disposable d);void onNext(@NonNull T t);void onError(@NonNull Throwable e);void onComplete();
}

Observer是一个接口,定义的这个四个方法就是我们在订阅时,观察者需要重写的四个方法,注意与上面的Emitter接口及其三个方法进行区分。 看这行observer.onSubscribe(parent);,由上面我们知道observer.onSubscribe()是接受Disposable类型,而这里的parent是CreateEmitter类型,这里的CreateEmitter也是一个适配器,前面的ObservableCreate对被观察者进行了适配,CreateEmitter则对观察者进行了适配,将observer类型转化为Disposable类型

3. ObservableOnSubscribe(被观察者被订阅时)

ObservableOnSubscribe是一个接口,是create方法传入的参数,需要重写subscribe方法

Observable.create(new ObservableOnSubscribe() { })

@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}

4. Emitter(发射器)

Emitter接口定义了三个方法,onNext、onError、onComplete

所以实例化的ObservableEmitter对象(继承Emitter接口)可以调用onNext()、onError()、onComplete()这三个方法了

public interface Emitter<@NonNull T> {
    void onNext(@NonNull T value);void onError(@NonNull Throwable error);void onComplete();
}

2.2 RxJava原理图解

  • 第一排表示各个对象的创建关系,A->B->C->D
  • 第二排表示各个对象的订阅关系,D->C->B->A
  • 第三排表示各个对象的发射关系,A->B->C->D

images

?

3 源码解析

3.1 create

//Observable.javapublic static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
    Objects.requireNonNull(source, "source is null");//直接返回了 ObservableCreate return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}

ObservableOnSubscribe是一个接口,是create方法传入的参数,需要重写subscribe方法

Observable.create(new ObservableOnSubscribe() { })

@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}

RxJavaPlugins.onAssembly(ObservableCreate)

//RxJavaPlugins.java@SuppressWarnings({
     "rawtypes", "unchecked" })@NonNullpublic static <@NonNull T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {
    return apply(f, source);}return source;}

传入的ObservableCreate是Observable的子类,适配到上方法。只是直接返回传入的ObservableCreate对象。

3.2 subscribe()

//Observable.javapublic final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) {
    //将三种类型的观察者回调统一包装到 LambdaObserver 方法中LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());subscribe(ls);return ls;}public final void subscribe(@NonNull Observer<? super T> observer) {
    try {
    //这里也是没有特别的逻辑,可以理解为直接返回 observer observer = RxJavaPlugins.onSubscribe(this, observer); //subscribeActualsubscribeActual(observer);} catch (NullPointerException e) {
     // NOPMDthrow e;} catch (Throwable e) {
    Exceptions.throwIfFatal(e);RxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}//subscribeActual是抽象方法,此处是由ObservableCreate实现的subscribeActual方法protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

ObservableCreate是Observable的子类,重点在于实现的subscribeActual方法

//ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {
    //1. 对传入的观察者进行包装CreateEmitter<T> parent = new CreateEmitter<>(observer);//2. 订阅回调observer.onSubscribe(parent);try {
    //3. 真正执行订阅,执行传入的ObservableOnSubscribe的subscribe方法//即执行传入的重写subscribe方法source.subscribe(parent);} catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);parent.onError(ex);}}
//...
}

1 包装观察者 CreateEmitter

CreateEmitter 订阅者包装类 继承ObservableEmitter接口,实现以下方法

//ObservableEmitter.java
public interface ObservableEmitter<@NonNull T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();ObservableEmitter<T> serialize();boolean tryOnError(@NonNull Throwable t);
}
//CreateEmitter.java 订阅者包装类static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
    private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {
    this.observer = observer;}@Overridepublic void onNext(T t) {
    if (t == null) {
    onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));return;}if (!isDisposed()) {
    //调用 onNextobserver.onNext(t);}}@Overridepublic void onError(Throwable t) {
    if (!tryOnError(t)) {
    RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {
    if (t == null) {
    t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");}if (!isDisposed()) {
    try {
    observer.onError(t);} finally {
    dispose();}return true;}return false;}@Overridepublic void onComplete() {
    if (!isDisposed()) {
    try {
    observer.onComplete();} finally {
    dispose();}}}@Overridepublic void dispose() {
    //取消订阅DisposableHelper.dispose(this);}//...}

2 订阅回调(observer.onSubscribe(parent))

Observer接口

public interface Observer<@NonNull T> {
    void onSubscribe(@NonNull Disposable d);void onNext(@NonNull T t);void onError(@NonNull Throwable e);void onComplete();
}

Observer是一个接口,定义的这个四个方法就是我们在订阅时,观察者需要重写的四个方法。

这里在观察线程中回调执行重写的onSubscribe方法

observer.onSubscribe()是接受Disposable类型,而这里的parent是CreateEmitter类型(继承Disposable),这里的CreateEmitter也是一个适配器,前面的ObservableCreate对被观察者进行了适配,CreateEmitter则对观察者进行了适配

?

3.3 dispose()

        @Overridepublic void dispose() {
    //this 指当前的 CreateEmitter (订阅者包装类)DisposableHelper.dispose(this);}

DisposableHelper

//DisposableHelper.javapublic static boolean dispose(AtomicReference<Disposable> field) {
    Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {
    current = field.getAndSet(d);if (current != d) {
    if (current != null) {
    current.dispose();}return true;}}return false;}

AtomicReference是一个原子类型的引用。这里正式通过对该原子类型引用的赋值来完成取消订阅的——通过一个原子操作将其设置为 DISPOSED.

?

3.2 线程调度

  • Schedulers:调度器的管理者。管理着多种不同种类的Scheduler

  • Scheduler:调度器。负责线程Worker的创建createWorker(),调度Worker的执行schedule()

  • Worker:抽象的工作线程。被线程调度器管理,负责线程的创建和执行

3.2.1 subscribeOn()

//Observable.javapublic final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
    Objects.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));}

返回值Observable情理之中,return返回RxJavaPlugins.onAssembly()也是一样,两点不同:

  • 装饰类是ObservableSubscribeOn
  • 传入参数还有Scheduler

装饰类ObservableSubscribeOn:

//ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);this.scheduler = scheduler;}@Overridepublic void subscribeActual(final Observer<? super T> observer) {
    //1. 对传入的观察者进行包装final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);//2. 订阅回调observer.onSubscribe(parent);//3. 真正在传入线程进行逻辑操作parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}//...
}

create() 方法类似,都是传入了一个 source 之后对其进行包装,然后在 subscribeActual() 方法中,得到一个 经过装饰的parent,然后调用 onSubscribe() 继而进行后续处理,都使用了装饰者设计模式

不同在于最后还调用了parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

SubscribeOnObserver装饰类:SubscribeOnObserver是ObservableSubscribeOn的内部类

//ObservableSubscribeOn.javastatic final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    private static final long serialVersionUID = 8094547886072529208L;final Observer<? super T> downstream;final AtomicReference<Disposable> upstream;SubscribeOnObserver(Observer<? super T> downstream) {
    this.downstream = downstream;this.upstream = new AtomicReference<>();}@Overridepublic void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this.upstream, d);}@Overridepublic void onNext(T t) {
    downstream.onNext(t);}@Overridepublic void dispose() {
    DisposableHelper.dispose(upstream);DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {
    return DisposableHelper.isDisposed(get());}void setDisposable(Disposable d) {
    DisposableHelper.setOnce(this, d);}}

在Scheduler中真正处理线程调用逻辑的是Worker类,这里setDisposable()的作用就是将你传入的Scheduler返回的worker加入管理。

回到subscribeActual()中,调用观察者的onSubscribe()之后,马上调用了parent.setDisposable()。

传入的参数是scheduler.scheduleDirect(new SubscribeTask(parent))。 先看SubscribeTask这个类

//ObservableSubscribeOn.javafinal class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {
    this.parent = parent;}@Overridepublic void run() {
    //这里的订阅操作发生在了Scheduler的线程中source.subscribe(parent);}}
//Scheduler.javapublic Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);}public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);DisposeTask task = new DisposeTask(decoratedRun, w);w.schedule(task, delay, unit);return task;}

传入的子线程被包装配置之后,开始在Worker也就是Scheduler线程中执行 我们继续看DisposeTask这个类,具体的订阅子线程的启动就在这里

//Scheduler.javastatic final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    @NonNullfinal Runnable decoratedRun;@NonNullfinal Worker w;@NullableThread runner;DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
    this.decoratedRun = decoratedRun;this.w = w;}@Overridepublic void run() {
    runner = Thread.currentThread();try {
    try {
    decoratedRun.run();} catch (Throwable ex) {
    RxJavaPlugins.onError(ex);throw ex;}} finally {
    dispose();runner = null;}}@Overridepublic void dispose() {
    if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
    ((NewThreadWorker)w).shutdown();} else {
    w.dispose();}}@Overridepublic boolean isDisposed() {
    return w.isDisposed();}@Overridepublic Runnable getWrappedRunnable() {
    return this.decoratedRun;}}

可以看到run()中调用了decoratedRun.run();来启动线程,注意这里是使用的run()而不是start(),而且整个rxjava流程走完后会自己调用dispose();关闭线程。

到这里,你应该明白了subscribeOn()线程调度的过程,正如它的效果描述一样:将观察者的操作运行在传入的Scheduler.io()线程中
–> subscribeOn(Schedulers.io())
–> 返回一个ObservableSubscribeOn的包装类
–> 当上游的被观察者被订阅之后,回调ObservableSubscribeOn包装类中的subscribeActual()
–> 线程切换至Schedulers.io(),并进行订阅操作source.subscribe(parent)

?

RxJava 的整体使用的是装饰者设计模式。按照装饰者设计模式的思路,RxJava 的包装过程和调用 subscribe() 方法之后的回调过程将如下所示:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cuRFK2j0-1642584817789)(https://raw.githubusercontent.com/rachel-lly/image_host/master/rxjava/image.2ks41b597qu0.webp)]

?

为什么subscribeOn(Schedulers.xxx())多次切换线程,是第一次生效?

我们知道使用subscribeOn()进行线程调度时订阅的顺序是从下往上,所以有多个subscribeOn()时,从最后一个开始执行,一直执行到第一个,最后的结果还是以第一个为准

?

3.2.2 observeOn()

//Observable.javapublic final Observable<T> observeOn(@NonNull Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());}public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
    Objects.requireNonNull(scheduler, "scheduler is null");ObjectHelper.verifyPositive(bufferSize, "bufferSize");return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));}

observeOn()与先前create()、subscribeOn()逻辑一致,都是通过不同的装饰类包装观察者形成。

observeOn()传入的参数(装饰类)是ObservableObserveOn

//ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;final boolean delayError;final int bufferSize;public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);this.scheduler = scheduler;this.delayError = delayError;this.bufferSize = bufferSize;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
    //判断是不是在当前线程//如果传入的线程是当前线程,直接在当前线程中执行source.subscribe(observer);} else {
    Scheduler.Worker w = scheduler.createWorker();source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));}}//...
}

装饰类ObserveOnObserver是ObservableObserveOn的内部类

//ObservableObserveOn.javastatic final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>implements Observer<T>, Runnable {
    @Overridepublic void onSubscribe(Disposable d) {
    if (DisposableHelper.validate(this.upstream, d)) {
    this.upstream = d;if (d instanceof QueueDisposable) {
    @SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) d;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {
    sourceMode = m;queue = qd;done = true;downstream.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {
    sourceMode = m;queue = qd;downstream.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<>(bufferSize);downstream.onSubscribe(this);}}@Overridepublic void onNext(T t) {
    if (done) {
    return;}if (sourceMode != QueueDisposable.ASYNC) {
    queue.offer(t);}schedule();}void schedule() {
    if (getAndIncrement() == 0) {
    worker.schedule(this);}}}

?

为什么多次调用ObserverOn(),是最后一次生效的

schedule();之前都是取数据的操作,并没有对数据进行发送,所以说即使使用线程调用将被观察者的操作放在主线程,他的数据准备阶段是在原线程执行的,

schedule();之后,进入上面传入Workder线程,也就是传入的观察线程,然后才将queue中的T取出,继而发送给下游的观察者。其他方法也是一样的流程,比如onError()、onComplete(),都是将错误或完成的信息先保存,等待切换线程后在执行发送操作

由此,我们可知ObserverOn()是向下作用的,每次调用都对下游的代码产生作用,所以多次调用ObserverOn(),是最后一次生效的

?

3.3 操作符

Map操作符为例

Observable.create(new ObservableOnSubscribe<String>() {
    @Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {
    e.onNext("next");e.onComplete();}}).map(new Function<String, Integer>() {
    @Overridepublic Integer apply(String s) throws Exception {
    return Integer.parseInt(s);}}).subscribe(new Observer<Integer>() {
    @Overridepublic void onSubscribe(Disposable d) {
    Log.d(TAG, "onSubscribe: " + d);}@Overridepublic void onNext(Integer value) {
    Log.d(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {
    Log.d(TAG, "onError: " + e);}@Overridepublic void onComplete() {
    Log.d(TAG, "onComplete: ");}});

查看Observable.map源码

//Observable.javapublic final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
    Objects.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));}

传入T,返回R,符合Map操作符传入两个数据类型进行转换的效果。 与前面类似,都是调用了RxJavaPlugins.onAssembly(),仅是传入的参数不同,大多操作符都是这样的,他们的不同仅仅是传入参数的不同,也就是适配器的不同,这说明,操作符的具体实现(比如Map的类型转换)都是在各自的适配器中做的。

Function是map()的传入参数,Function接口只定义了一个apply方法,使用时重写apply方法即可

//Function.java
public interface Function<@NonNull T, @NonNull R> {
    R apply(T t) throws Throwable;
}

Function接口还有3456789,其中区别是输入参数的个数

RxJavaPlugins.onAssembly传入了ObservableMap装饰类对象,查看ObservableMap源码:

//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));}static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
    super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {
    if (done) {
    return;}if (sourceMode != NONE) {
    downstream.onNext(null);return;}U v;try {
    v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {
    fail(ex);return;}downstream.onNext(v);}@Overridepublic int requestFusion(int mode) {
    return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Throwable {
    T t = qd.poll();return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}}
}

①在构造方法中,将传入的Observable也就是本身抛给父类;

②对转换逻辑funtion进行保存;

③重写subscribeActual()方法并在其中实现订阅,这里与ObservableCreate是一样的,只是传递的参数不同

create以及对大多数操作符的第一层适配器中都会重写subscribeActual()并实现订阅逻辑

?

这里的进行订阅操作的source.subscribe()传入的是MapObserver的类,除onNext()之外的三个方法是在它的父类BasicFuseableObserver中重写的,MapObserver中只对onNext()进行的重写,而且在其中进行了数据类型转换的工作

        @Overridepublic void onNext(T t) {
    if (done) {
    return;}if (sourceMode != NONE) {
    downstream.onNext(null);return;}U v;try {
    v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {
    fail(ex);return;}downstream.onNext(v);}

可以看到再代码中利用ObjectHelper将上游传过来的T,转换成了下游需要的U

?

3 总结

RxJava 核心的实现依赖了两个设计模式,一个是观察者模式,另一个是装饰器模式。每个装饰器负责自己一种任务,这符合单一责任原则;各个装饰器之间相互协作,来完成复杂的功能。使用了装饰者模式之后,链的每个环节只需要实现自己的功能,可以根据不同的需求在链上面增加环节,类似于转换、过滤、统计等等不同功能,每个类的责任变得单一,从整个调用链上面解耦出来。

但我们也可以从源码分析得出,Rxjava的缺点:使用了大量不同的装饰类来装饰观察者和被观察者,导致调用栈很长。