Rxjava
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
RxJava
– JVM的反应式扩展
– 一个用于使用Java虚拟机的可观察序列编写异步和基于事件的程序的库。
?
使用的主要设计模式包括:
观察者模式,装饰器模式,适配器模式(函数传参不同有不同实现)
?
RxJava源码分析最主要的点在于
- RxJava是如何从事件流的发送方发送到事件流的接收方的
- RxJava是如何对操作符进行封装和操作的
- RxJava是如何随意切换线程的
?
RxJava的分析三步骤
- 创建:被观察者创建的过程
- 订阅:被观察者订阅观察者的过程
- 发射:发射器发射的过程
?
1 简单使用
在 IO 线程中执行业务逻辑,在主线程中对执行的结果进行后续的处理。
Disposable disposable = Observable.create(new ObservableOnSubscribe<Object>() {
@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {
// 在订阅线程执行业务逻辑emitter.onNext(new Object());emitter.onComplete();}
}).subscribeOn(Schedulers.io())//订阅线程.observeOn(AndroidSchedulers.mainThread())//观察线程.subscribe(new Observer<String>() {
// 在观察线程中进行后续的处理@Overridepublic void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d);}@Overridepublic void onNext(String value) {
Log.d(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {
Log.d(TAG, "onError: " + e);}@Overridepublic void onComplete() {
Log.d(TAG, "onComplete: ");}});
disposable.dispose();
- create()
- 线程调度
- subscribeOn(Schedulers.io())
- observeOn(AndroidSchedulers.mainThread())
- subscribe 订阅
- dispose 取消订阅
?
2 主要原理
2.1 主要类
-
Observable(被观察者):被观察的对象
-
Observer(观察者):观察的对象
-
ObservableOnSubscribe(被观察者被订阅时):被订阅时的回调,同时创建出发射器
-
Emitter(发射器):发射数据的对象
-
Disposable(释放者):释放RxJava的对象
1. Observable(被观察者)
最外层的调用接口,可以通过Observable.xxx(create、subscribe) 使用Rxjava
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
public static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}public final void subscribe(@NonNull Observer<? super T> observer) {
observer = RxJavaPlugins.onSubscribe(this, observer);subscribeActual(observer);//...}protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
}
2. Observer(观察者)
public interface Observer<@NonNull T> {
void onSubscribe(@NonNull Disposable d);void onNext(@NonNull T t);void onError(@NonNull Throwable e);void onComplete();
}
Observer是一个接口,定义的这个四个方法就是我们在订阅时,观察者需要重写的四个方法,注意与上面的Emitter接口及其三个方法进行区分。 看这行observer.onSubscribe(parent);
,由上面我们知道observer.onSubscribe()是接受Disposable类型,而这里的parent是CreateEmitter类型,这里的CreateEmitter也是一个适配器,前面的ObservableCreate对被观察者进行了适配,CreateEmitter则对观察者进行了适配,将observer类型转化为Disposable类型
3. ObservableOnSubscribe(被观察者被订阅时)
ObservableOnSubscribe是一个接口,是create方法传入的参数,需要重写subscribe方法
Observable.create(new ObservableOnSubscribe() { })
@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
4. Emitter(发射器)
Emitter接口定义了三个方法,onNext、onError、onComplete
所以实例化的ObservableEmitter对象(继承Emitter接口)可以调用onNext()、onError()、onComplete()这三个方法了
public interface Emitter<@NonNull T> {
void onNext(@NonNull T value);void onError(@NonNull Throwable error);void onComplete();
}
2.2 RxJava原理图解
- 第一排表示各个对象的创建关系,A->B->C->D
- 第二排表示各个对象的订阅关系,D->C->B->A
- 第三排表示各个对象的发射关系,A->B->C->D
?
3 源码解析
3.1 create
//Observable.javapublic static <@NonNull T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");//直接返回了 ObservableCreate return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));}
ObservableOnSubscribe是一个接口,是create方法传入的参数,需要重写subscribe方法
Observable.create(new ObservableOnSubscribe() { })
@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
RxJavaPlugins.onAssembly(ObservableCreate)
//RxJavaPlugins.java@SuppressWarnings({
"rawtypes", "unchecked" })@NonNullpublic static <@NonNull T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;if (f != null) {
return apply(f, source);}return source;}
传入的ObservableCreate是Observable的子类,适配到上方法。只是直接返回传入的ObservableCreate对象。
3.2 subscribe()
//Observable.javapublic final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) {
//将三种类型的观察者回调统一包装到 LambdaObserver 方法中LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());subscribe(ls);return ls;}public final void subscribe(@NonNull Observer<? super T> observer) {
try {
//这里也是没有特别的逻辑,可以理解为直接返回 observer observer = RxJavaPlugins.onSubscribe(this, observer); //subscribeActualsubscribeActual(observer);} catch (NullPointerException e) {
// NOPMDthrow e;} catch (Throwable e) {
Exceptions.throwIfFatal(e);RxJavaPlugins.onError(e);NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");npe.initCause(e);throw npe;}}//subscribeActual是抽象方法,此处是由ObservableCreate实现的subscribeActual方法protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
ObservableCreate是Observable的子类,重点在于实现的subscribeActual方法
//ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {
//1. 对传入的观察者进行包装CreateEmitter<T> parent = new CreateEmitter<>(observer);//2. 订阅回调observer.onSubscribe(parent);try {
//3. 真正执行订阅,执行传入的ObservableOnSubscribe的subscribe方法//即执行传入的重写subscribe方法source.subscribe(parent);} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);parent.onError(ex);}}
//...
}
1 包装观察者 CreateEmitter
CreateEmitter 订阅者包装类 继承ObservableEmitter接口,实现以下方法
//ObservableEmitter.java
public interface ObservableEmitter<@NonNull T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);void setCancellable(@Nullable Cancellable c);boolean isDisposed();ObservableEmitter<T> serialize();boolean tryOnError(@NonNull Throwable t);
}
//CreateEmitter.java 订阅者包装类static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;final Observer<? super T> observer;CreateEmitter(Observer<? super T> observer) {
this.observer = observer;}@Overridepublic void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));return;}if (!isDisposed()) {
//调用 onNextobserver.onNext(t);}}@Overridepublic void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);}}@Overridepublic boolean tryOnError(Throwable t) {
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");}if (!isDisposed()) {
try {
observer.onError(t);} finally {
dispose();}return true;}return false;}@Overridepublic void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();} finally {
dispose();}}}@Overridepublic void dispose() {
//取消订阅DisposableHelper.dispose(this);}//...}
2 订阅回调(observer.onSubscribe(parent))
Observer接口
public interface Observer<@NonNull T> {
void onSubscribe(@NonNull Disposable d);void onNext(@NonNull T t);void onError(@NonNull Throwable e);void onComplete();
}
Observer是一个接口,定义的这个四个方法就是我们在订阅时,观察者需要重写的四个方法。
这里在观察线程中回调执行重写的onSubscribe方法
observer.onSubscribe()是接受Disposable类型,而这里的parent是CreateEmitter类型(继承Disposable),这里的CreateEmitter也是一个适配器,前面的ObservableCreate对被观察者进行了适配,CreateEmitter则对观察者进行了适配
?
3.3 dispose()
@Overridepublic void dispose() {
//this 指当前的 CreateEmitter (订阅者包装类)DisposableHelper.dispose(this);}
DisposableHelper
//DisposableHelper.javapublic static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();Disposable d = DISPOSED;if (current != d) {
current = field.getAndSet(d);if (current != d) {
if (current != null) {
current.dispose();}return true;}}return false;}
AtomicReference是一个原子类型的引用。这里正式通过对该原子类型引用的赋值来完成取消订阅的——通过一个原子操作将其设置为 DISPOSED.
?
3.2 线程调度
-
Schedulers:调度器的管理者。管理着多种不同种类的Scheduler
-
Scheduler:调度器。负责线程Worker的创建
createWorker()
,调度Worker的执行schedule()
-
Worker:抽象的工作线程。被线程调度器管理,负责线程的创建和执行
3.2.1 subscribeOn()
//Observable.javapublic final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));}
返回值Observable情理之中,return返回RxJavaPlugins.onAssembly()也是一样,两点不同:
- 装饰类是ObservableSubscribeOn
- 传入参数还有Scheduler
装饰类ObservableSubscribeOn:
//ObservableSubscribeOn.java
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);this.scheduler = scheduler;}@Overridepublic void subscribeActual(final Observer<? super T> observer) {
//1. 对传入的观察者进行包装final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);//2. 订阅回调observer.onSubscribe(parent);//3. 真正在传入线程进行逻辑操作parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}//...
}
和 create()
方法类似,都是传入了一个 source 之后对其进行包装,然后在 subscribeActual()
方法中,得到一个 经过装饰的parent,然后调用 onSubscribe()
继而进行后续处理,都使用了装饰者设计模式。
不同在于最后还调用了parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
SubscribeOnObserver装饰类:SubscribeOnObserver是ObservableSubscribeOn的内部类
//ObservableSubscribeOn.javastatic final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;final Observer<? super T> downstream;final AtomicReference<Disposable> upstream;SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;this.upstream = new AtomicReference<>();}@Overridepublic void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);}@Overridepublic void onNext(T t) {
downstream.onNext(t);}@Overridepublic void dispose() {
DisposableHelper.dispose(upstream);DisposableHelper.dispose(this);}@Overridepublic boolean isDisposed() {
return DisposableHelper.isDisposed(get());}void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);}}
在Scheduler中真正处理线程调用逻辑的是Worker类,这里setDisposable()的作用就是将你传入的Scheduler返回的worker加入管理。
回到subscribeActual()中,调用观察者的onSubscribe()之后,马上调用了parent.setDisposable()。
传入的参数是scheduler.scheduleDirect(new SubscribeTask(parent))
。 先看SubscribeTask这个类
//ObservableSubscribeOn.javafinal class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;}@Overridepublic void run() {
//这里的订阅操作发生在了Scheduler的线程中source.subscribe(parent);}}
//Scheduler.javapublic Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);}public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);DisposeTask task = new DisposeTask(decoratedRun, w);w.schedule(task, delay, unit);return task;}
传入的子线程被包装配置之后,开始在Worker也就是Scheduler线程中执行 我们继续看DisposeTask这个类,具体的订阅子线程的启动就在这里
//Scheduler.javastatic final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNullfinal Runnable decoratedRun;@NonNullfinal Worker w;@NullableThread runner;DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;this.w = w;}@Overridepublic void run() {
runner = Thread.currentThread();try {
try {
decoratedRun.run();} catch (Throwable ex) {
RxJavaPlugins.onError(ex);throw ex;}} finally {
dispose();runner = null;}}@Overridepublic void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();} else {
w.dispose();}}@Overridepublic boolean isDisposed() {
return w.isDisposed();}@Overridepublic Runnable getWrappedRunnable() {
return this.decoratedRun;}}
可以看到run()中调用了decoratedRun.run();
来启动线程,注意这里是使用的run()而不是start(),而且整个rxjava流程走完后会自己调用dispose();
关闭线程。
到这里,你应该明白了subscribeOn()线程调度的过程,正如它的效果描述一样:将观察者的操作运行在传入的Scheduler.io()线程中
–> subscribeOn(Schedulers.io())
–> 返回一个ObservableSubscribeOn的包装类
–> 当上游的被观察者被订阅之后,回调ObservableSubscribeOn包装类中的subscribeActual()
–> 线程切换至Schedulers.io(),并进行订阅操作source.subscribe(parent)
?
RxJava 的整体使用的是装饰者设计模式。按照装饰者设计模式的思路,RxJava 的包装过程和调用 subscribe()
方法之后的回调过程将如下所示:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cuRFK2j0-1642584817789)(https://raw.githubusercontent.com/rachel-lly/image_host/master/rxjava/image.2ks41b597qu0.webp)]
?
为什么subscribeOn(Schedulers.xxx())多次切换线程,是第一次生效?
我们知道使用subscribeOn()进行线程调度时订阅的顺序是从下往上,所以有多个subscribeOn()时,从最后一个开始执行,一直执行到第一个,最后的结果还是以第一个为准
?
3.2.2 observeOn()
//Observable.javapublic final Observable<T> observeOn(@NonNull Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());}public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");ObjectHelper.verifyPositive(bufferSize, "bufferSize");return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));}
observeOn()与先前create()、subscribeOn()逻辑一致,都是通过不同的装饰类包装观察者形成。
observeOn()传入的参数(装饰类)是ObservableObserveOn
//ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;final boolean delayError;final int bufferSize;public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);this.scheduler = scheduler;this.delayError = delayError;this.bufferSize = bufferSize;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
//判断是不是在当前线程//如果传入的线程是当前线程,直接在当前线程中执行source.subscribe(observer);} else {
Scheduler.Worker w = scheduler.createWorker();source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));}}//...
}
装饰类ObserveOnObserver是ObservableObserveOn的内部类
//ObservableObserveOn.javastatic final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>implements Observer<T>, Runnable {
@Overridepublic void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")QueueDisposable<T> qd = (QueueDisposable<T>) d;int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);if (m == QueueDisposable.SYNC) {
sourceMode = m;queue = qd;done = true;downstream.onSubscribe(this);schedule();return;}if (m == QueueDisposable.ASYNC) {
sourceMode = m;queue = qd;downstream.onSubscribe(this);return;}}queue = new SpscLinkedArrayQueue<>(bufferSize);downstream.onSubscribe(this);}}@Overridepublic void onNext(T t) {
if (done) {
return;}if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);}schedule();}void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);}}}
?
为什么多次调用ObserverOn(),是最后一次生效的
schedule();
之前都是取数据的操作,并没有对数据进行发送,所以说即使使用线程调用将被观察者的操作放在主线程,他的数据准备阶段是在原线程执行的,
schedule();
之后,进入上面传入Workder线程,也就是传入的观察线程,然后才将queue中的T取出,继而发送给下游的观察者。其他方法也是一样的流程,比如onError()、onComplete(),都是将错误或完成的信息先保存,等待切换线程后在执行发送操作。
由此,我们可知ObserverOn()是向下作用的,每次调用都对下游的代码产生作用,所以多次调用ObserverOn(),是最后一次生效的
?
3.3 操作符
以Map操作符为例
Observable.create(new ObservableOnSubscribe<String>() {
@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("next");e.onComplete();}}).map(new Function<String, Integer>() {
@Overridepublic Integer apply(String s) throws Exception {
return Integer.parseInt(s);}}).subscribe(new Observer<Integer>() {
@Overridepublic void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d);}@Overridepublic void onNext(Integer value) {
Log.d(TAG, "onNext: " + value);}@Overridepublic void onError(Throwable e) {
Log.d(TAG, "onError: " + e);}@Overridepublic void onComplete() {
Log.d(TAG, "onComplete: ");}});
查看Observable.map源码
//Observable.javapublic final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));}
传入T,返回R,符合Map操作符传入两个数据类型进行转换的效果。 与前面类似,都是调用了RxJavaPlugins.onAssembly(),仅是传入的参数不同,大多操作符都是这样的,他们的不同仅仅是传入参数的不同,也就是适配器的不同,这说明,操作符的具体实现(比如Map的类型转换)都是在各自的适配器中做的。
Function是map()的传入参数,Function接口只定义了一个apply方法,使用时重写apply方法即可
//Function.java
public interface Function<@NonNull T, @NonNull R> {
R apply(T t) throws Throwable;
}
Function接口还有3456789,其中区别是输入参数的个数
RxJavaPlugins.onAssembly传入了ObservableMap装饰类对象,查看ObservableMap源码:
//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);this.function = function;}@Overridepublic void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));}static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);this.mapper = mapper;}@Overridepublic void onNext(T t) {
if (done) {
return;}if (sourceMode != NONE) {
downstream.onNext(null);return;}U v;try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {
fail(ex);return;}downstream.onNext(v);}@Overridepublic int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);}@Nullable@Overridepublic U poll() throws Throwable {
T t = qd.poll();return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;}}
}
①在构造方法中,将传入的Observable也就是本身抛给父类;
②对转换逻辑funtion进行保存;
③重写subscribeActual()方法并在其中实现订阅,这里与ObservableCreate是一样的,只是传递的参数不同
create以及对大多数操作符的第一层适配器中都会重写subscribeActual()并实现订阅逻辑
?
这里的进行订阅操作的source.subscribe()
传入的是MapObserver的类,除onNext()之外的三个方法是在它的父类BasicFuseableObserver中重写的,MapObserver中只对onNext()进行的重写,而且在其中进行了数据类型转换的工作
@Overridepublic void onNext(T t) {
if (done) {
return;}if (sourceMode != NONE) {
downstream.onNext(null);return;}U v;try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {
fail(ex);return;}downstream.onNext(v);}
可以看到再代码中利用ObjectHelper将上游传过来的T,转换成了下游需要的U
?
3 总结
RxJava 核心的实现依赖了两个设计模式,一个是观察者模式,另一个是装饰器模式。每个装饰器负责自己一种任务,这符合单一责任原则;各个装饰器之间相互协作,来完成复杂的功能。使用了装饰者模式之后,链的每个环节只需要实现自己的功能,可以根据不同的需求在链上面增加环节,类似于转换、过滤、统计等等不同功能,每个类的责任变得单一,从整个调用链上面解耦出来。
但我们也可以从源码分析得出,Rxjava的缺点:使用了大量不同的装饰类来装饰观察者和被观察者,导致调用栈很长。