目录
1、介绍
1.1、subscribeOn()
1.2、observeOn()
2、subscribeOn()原生实现
3、subscribeOn()手写实现
3.1、subscribeOn()执行
3.2、subscribeOn()触发
4、observeOn()手写实现
4.1、observeOn()执行
4.2、observeOn()触发
1、介绍
在我们的RxJava中,不管是create() 或者map(),还是filter()等等吧,大部分操作符默认情况下都是没有异步执行的,它们执行的线程取决于你订阅时的线程。换句话说,我们的Observable默认情况下是同步进行的,这也会导致进程的阻塞。
RxJava这样设计的原因在于Observable一般是由其他并发机制支持的,就比如我们的数据来源直接来源于异步,或者自定义线程等,所以RxJava将控制权交给了开发人员。
在成熟的Rx应用程序中,我们应该尽量的少用subscribeOn(),因为大部分情况下,我们的数据源就来之于异步。subscribeOn()仅用于一些特殊情况,比如你已经知道底层Observable是同步的(create()是阻塞的)时候。
但是,我们可以通过subscribeOn()和observeOn()函数来改变任务的执行线程,将我们Observable改成异步执行。
1.1、subscribeOn()
subscribeOn()可以放在Observable和subscribe()之间任何地方,在单独使用subscribeOn()的情况下,我们整个流程都会在你使用的线程中执行。
如果你在Observable和subscribe()之间多次使用了subscribeOn(),Rx将会以离create()最近的一次subscribeOn()为准。
如下图:我们调用三次subscribeOn()和一次observeOn()。
输出结果我们看到:Observable的操作只在一个线程中,线程名为io()创建的线程名。
如果在你的create()方法中,单个数据的处理都比较耗时,但是我们的subscribeOn()只会创建一个线程管道,这时候我们就需要借用flatMap()+subscribeOn()的形式来异步处理单个任务,当然如果你的数据量很大的话,如果为每个数据都创建一个新的线程都话,将会造成很大的内存消耗,甚至OOM,幸好我们flatMap()有一个控制线程数量的参数maxConcurrency。当创建的线程数量为你设置的maxConcurrency时,后面的任务将会进行等待。
1.2、observeOn()
subscribeOn()的主要功能是决定由哪个Scheduler(线程)来执行create()函数的。
observeOn()则是决定下游的Subsriber在哪个Scheduler(线程)执行的。
-------------------------------------------------------------------------------------------------------
observeOn()有点特殊的是,不管你前面的操作符是在哪个Scheduler(线程)中执行的,只要调用了observeOn(),那么后面的所有操作符都得在observeOn()的Scheduler(线程)中执行。
2、subscribeOn()原生实现
下图是RxJava中subscribeOn()的原生实现:
subscribeOn()的参数是一个Scheduler,而这个类就是我们实现异步的基础。
1、首先我们new了一个Scheduler对象,然后其中返回了一个Worker对象;
2、在Worker对象的实现中,我们创建了一个ScheduledExecutorService对象,这个类是ExecutorService的子类,也就是Java原生的线程池控制类。通过这个类我们实现了了我们的任务异步。看下图2的输出结果。
注意:其实RxJava的自带的Schduler也是通过ScheduledExecutorService这个类来执行异步和延时操作的。
3、到这里,我们应该了解到我们的create()任务被包装成了一个Runnabe。我们只要将这个Runnable对象放入我们的线程中就实现了异步。
3、subscribeOn()手写实现
前面我们讲解了:RxJava3手写实现-1-create()+subscribe()
RxJava3手写实现-2-map操作符+解析数据流实现
这里将在上两节的基础上进行对subscribeOn()的单独讲解,如果向了解全部流程的话,可以查看一下。
本文章手写的源码地址。
-------------------------------------------------------------------------------------------------------
3.1、subscribeOn()执行
其实RxJava中subscribeOn()操作的执行,并没有实际的逻辑操作,它只是将我们创建的线程控制器,传值给了ObservableSubscribeOn对象。
3.2、subscribeOn()触发
subscribeOn()的触发,还是订阅的时候触发的:
1、当订阅的时候,我们会触发YoObservableSubScribeOn中的subscribeActual()方法;
2、在subscribeActual()中,首先我们会new一个YoSubscribeOnObserver对象,并将我们的订阅者传进去,进行流传递;
3、紧接着我们会new一个Runnable对象,并用yo1订阅parent对象,从而触发create()方法中的subscribe()方法;这时候create()方法中的subscribe()进行的所有操作已经在我们创建的线程池中了。
4、然后触发onNext()函数,进而触发parent对象的onNext()方法,最后触发我们的订阅者的onNext()方法。
注:RxJava只是封装了一下线程池,并将线程池抽象为了Scheduler。原理都一样,就如下图所示。
4、observeOn()手写实现
observeOn()只是将从开始调用observeOn()方法后面的所有事件放入了另一个线程之中,所以我们不能将subscribe()放入到新的线程之中,我们应该想办法将onNext()事件放入我们的线程中。
4.1、observeOn()执行
observeOn()的执行和subscribeOn()的执行一样,并没有执行实际的逻辑操作,只是进行了赋值操作。将我们创建的线程池赋值给了YoObservableObserveOn对象之中。
4.2、observeOn()触发
observeOn()的触发和subscribeOn()的触发一样,都是在订阅的时候触发,不同的是,他们调用异步线程的时机不一样,subscribeOn()会将整个调用流程(从订阅开始,一层一层向上订阅,然后再一层一层地向下onNext)都放入一个异步线程之中,但是observeOn()只是将他调用之后的onNext()事件放入了异步线程之中。
-----------------------------------------------------------------------------------------
看下图1:你会发现在onNext()中会将所有数据,放入一个队列之中,然后不管是onNext() 、onError()还是 onCompleted(),它们都会调用schdule()方法,看来实现异步的秘诀就在于这个方法。
看下图2:
1、我们发现,schdule()函数会用我们的线程池,去执行Runnable事件,这个Runnable就是YoObserveOnObserver自己。
2、在我们的新线程中,我们会从队列中不断地拿出数据,并执行onNext()方法,将数据传递到我们的订阅者中。
3、这样我们线程就算完成了。
5、代码
本系列的所有源码。
public abstract class YoObservable<T> implements YoObservableSource<T> {@Overridepublic void subscribe(YoObserver<? super T> yoObserver) {subscribeActual(yoObserver);}public static YoObservable create(YoObservableOnSubscribe source) {return new YoObservableCreate(source);}public final <R> YoObservable<R> map(YoFunction<? super T, ? extends R> mapper) {return new YoObservableMap<>(this, mapper);}public final YoObservable<T> subscribeOn(Executor scheduler) {return new YoObservableSubscribeOn<>(this, scheduler);}public final YoObservable<T> observeOn(Executor scheduler) {return new YoObservableObserveOn<>(this, scheduler);}protected abstract void subscribeActual(YoObserver<? super T> observer);
}
public class YoObservableSubscribeOn<T> extends YoObservable<T> {private final Executor scheduler;private final YoObservableSource<T> source;public YoObservableSubscribeOn(YoObservableSource<T> source, Executor scheduler) {this.scheduler = scheduler;this.source = source;}@Overrideprotected void subscribeActual(YoObserver<? super T> observer) {YoSubscribeOnObserver<T> parent = new YoSubscribeOnObserver<>(observer);observer.onSubscribe();Runnable runnable = new Runnable() {@Overridepublic void run() {source.subscribe(parent);}};scheduler.execute(runnable);}static final class YoSubscribeOnObserver<T> implements YoObserver<T> {final YoObserver<? super T> downstream;YoSubscribeOnObserver(YoObserver<? super T> downstream) {this.downstream = downstream;}@Overridepublic void onSubscribe() {downstream.onSubscribe();}@Overridepublic void onNext(T t) {downstream.onNext(t);}@Overridepublic void onError(Throwable t) {downstream.onError(t);}@Overridepublic void onComplete() {downstream.onComplete();}}}
public class YoObservableObserveOn<T> extends YoObservable<T> {private final Executor scheduler;private final YoObservableSource<T> source;public YoObservableObserveOn(YoObservableSource<T> source, Executor scheduler) {this.scheduler = scheduler;this.source = source;}@Overrideprotected void subscribeActual(YoObserver<? super T> observer) {source.subscribe(new YoObserveOnObserver<>(observer, scheduler));}static final class YoObserveOnObserver<T> implements YoObserver<T>, Runnable {final YoObserver<? super T> downstream;final Executor worker;Throwable error;LinkedQueue<T> queue = new LinkedQueue<>();YoObserveOnObserver(YoObserver<? super T> actual, Executor worker) {this.downstream = actual;this.worker = worker;}@Overridepublic void onSubscribe() {}/*** schedule()应该在接受完所有数据调用,这里没做那种处理,* 所以就在onComplete中调用一次,这里不调用了*/@Overridepublic void onNext(T t) {queue.enqueue(t);schedule();}@Overridepublic void onError(Throwable t) {error = t;schedule();}@Overridepublic void onComplete() {schedule();}/*** 在自己的线程中运行这个Runnable*/void schedule() {worker.execute(this);}@Overridepublic void run() {drainNormal();}void drainNormal() {System.out.println("----------------------");YoObserver<? super T> a = downstream;if (checkTerminated(queue.isEmpty(), a)) {return;}for (;;) {T v;try {v = queue.dequeue();} catch (Throwable ex) {a.onComplete();return;}boolean empty = v == null;if (empty) {break;}a.onNext(v);}}boolean checkTerminated(boolean empty, YoObserver<? super T> a) {Throwable e = error;if (e != null) {a.onError(e);return true;} else if (empty) {a.onComplete();return true;}return false;}}}