Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,同时因为他是一个可观测对象,所以它可以传递和释放它观测到的数据对象,并且能释放新的对象。
1. Subject的类型
一共有四种为不同用途而设计的Subject,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。
1. AsyncSubject
AsyncSubject仅释放Observable释放的最后一个数据,并且仅在Observable完成之后。然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。
2. BehaviorSubject
当Observer订阅了一个BehaviorSubject,它一开始就会释放Observable最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放Observable释放的所有数据。如果Observable因异常终止,BehaviorSubject将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。
3. PublishSubject
PublishSubject仅会向Observer释放在订阅之后Observable释放的数据。
4. ReplaySubject
不管Observer何时订阅ReplaySubject,ReplaySubject会向所有Observer释放Observable释放过的数据。
有不同类型的ReplaySubject,它们是用来限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。
如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete和onError方法,因为这会导致顺序错乱,这个是违反了Observer规则的。
2. RxJava的Subject源码分析
1. Subject
Subject表示一个同时是Observable和Observer的对象。类Subject的代码如下:
- package rx.subjects;
-
- import rx.Observable;
- import rx.Observer;
- import rx.Subscriber;
-
-
-
-
- public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
- protected Subject(OnSubscribe<R> onSubscribe) {
- super(onSubscribe);
- }
-
- public abstract boolean hasObservers();
-
- public final SerializedSubject<T, R> toSerialized() {
- return new SerializedSubject<T, R>(this);
- }
- }
2. BehaviorSubject
Subject有四个主要的子类,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。接下来将以BehaviorSubject为例进行源码分析。
2.1 BehaviorSubject订阅subscribe过程
在需要使用subject时,调用Subject的subscribe(..)方法,该方法实际会调用下面这个subscribe(Subscriber<? super T> subscriber)方法,所以其他的subscribe方法都要将输入参数转化为一个Subscriber对象。
- public final Subscription subscribe(Subscriber<? super T> subscriber) {
- ...
-
- subscriber.onStart();
-
- ...
-
-
- try {
-
- hook.onSubscribeStart(this, onSubscribe).call(subscriber);
- return hook.onSubscribeReturn(subscriber);
- } catch (Throwable e) {
-
- Exceptions.throwIfFatal(e);
-
- try {
- subscriber.onError(hook.onSubscribeError(e));
- } catch (OnErrorNotImplementedException e2) {
-
- throw e2;
- } catch (Throwable e2) {
-
-
- RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
-
- hook.onSubscribeError(r);
-
- throw r;
- }
- return Subscriptions.unsubscribed();
- }
- }
方法中hook.onSubsribeStart(this, onSubscribe).call(subscriber)默认情况下等价于onSubscribe.call(subscriber)。onSubscriber是什么呢?这个就需要了解BehaviorSubject的构造方法
- protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
- super(onSubscribe);
- this.state = state;
- }
其中调用了父类Subject的构造方法
- protected Subject(OnSubscribe<R> onSubscribe) {
- super(onSubscribe);
- }
其中调用了父类Observer的构造方法
- protected Observable(OnSubscribe<T> f) {
- this.onSubscribe = f;
- }
obSubscribe即是BehaviorSubject构造方法中传入的第一个参数。
BehaviorSubject有3个静态工厂方法用来生产BehaviorSubject对象。
- public final class BehaviorSubject<T> extends Subject<T, T> {
-
- public static <T> BehaviorSubject<T> create() {
- return create(null, false);
- }
-
- public static <T> BehaviorSubject<T> create(T defaultValue) {
- return create(defaultValue, true);
- }
-
- private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
- final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
- if (hasDefault) {
- state.set(NotificationLite.instance().next(defaultValue));
- }
- state.onAdded = new Action1<SubjectObserver<T>>() {
-
- @Override
- public void call(SubjectObserver<T> o) {
- o.emitFirst(state.get(), state.nl);
- }
-
- };
- state.onTerminated = state.onAdded;
- return new BehaviorSubject<T>(state, state);
- }
- ....
- }
前两个Public的静态构造方法实际上调用的是第三个private方法。
最后return new BehaviorSubject<T>(state, state),所以onSubscribe实际为一个SubjectSubscriptionManager的对象,onSubscribe.call(subscriber)实际调用的是SubjectSubscriptionManager的call方法。
- final class SubjectSubscriptionManager<T> implements OnSubscribe<T> {
- ...
- @Override
- public void call(final Subscriber<? super T> child) {
- SubjectObserver<T> bo = new SubjectObserver<T>(child);
- addUnsubscriber(child, bo);
- onStart.call(bo);
- if (!child.isUnsubscribed()) {
- if (add(bo) && child.isUnsubscribed()) {
- remove(bo);
- }
- }
- }
- }
1. 调用addUnsubscriber方法,注册一个在取消订阅时执行的一个动作,即将该观擦者Observer移除掉。
-
- void addUnsubscriber(Subscriber<? super T> child, final SubjectObserver<T> bo) {
- child.add(Subscriptions.create(new Action0() {
- @Override
- public void call() {
- remove(bo);
- }
- }));
- }
2. 调用add(SubjectObserver<T> o)方法,将该Observer加入已经注册的Observer[]数组当中。
- boolean add(SubjectObserver<T> o) {
- do {
- State oldState = state;
- if (oldState.terminated) {
- onTerminated.call(o);
- return false;
- }
- State newState = oldState.add(o);
- if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
- onAdded.call(o);
- return true;
- }
- } while (true);
- }
该方法会调用onAdd.call(o)。BehaviorSubject的onAdd对象如下,state.get()得到的是最近的数据对象,o.emitFirst即会释放最近的数据对象,这正体现了BehaviorSubject的特点。
- state.onAdded = new Action1<SubjectObserver<T>>() {
-
- @Override
- public void call(SubjectObserver<T> o) {
- o.emitFirst(state.get(), state.nl);
- }
-
- };
在这个过程中使用了SubjectSubscriptionManager的两个内部类。
1. State<T>
该类用来管理已经注册的Observer数组,以及他们的状态。
-
- protected static final class State<T> {
- final boolean terminated;
- final SubjectObserver[] observers;
- static final SubjectObserver[] NO_OBSERVERS = new SubjectObserver[0];
- static final State TERMINATED = new State(true, NO_OBSERVERS);
- static final State EMPTY = new State(false, NO_OBSERVERS);
-
- public State(boolean terminated, SubjectObserver[] observers) {
- this.terminated = terminated;
- this.observers = observers;
- }
- public State add(SubjectObserver o) {
- ...
- }
- public State remove(SubjectObserver o) {
- ...
- }
- }
2. SubjectObserver<T>
该类时Observer的一个装饰类,运用了装饰模式给Observer类添加新的功能。
以上就是Subject对象订阅Observer时的流程。
2.2 BehaviorSubject的onNext
Behavior的onNext(T v)方法如下
- @Override
- public void onNext(T v) {
- Object last = state.get();
- if (last == null || state.active) {
- Object n = nl.next(v);
- for (SubjectObserver<T> bo : state.next(n)) {
- bo.emitNext(n, state.nl);
- }
- }
- }
state是SubjectSubscriptionManager类的对象,是这个对象来维护最近释放的数据对象,state.get()获取最近释放的数据对象,state.next(Object n)方法重新设置最近释放的数据对象,并返回已经注册的Observer数组。
- SubjectObserver<T>[] next(Object n) {
- set(n);
- return state.observers;
- }
bo.emitNext(Object n, final NotificationLite<T> nl)释放给定的数据对象。
2.3 BehaviorSubject的onCompleted和onError
onCompleted和onError会调用SubjectSubscriptionManager的terminate(Object n)方法,该方法会重新设置最近释放的数据对象,设置Subject状态为TERMINATED,表示终结了,最后返回已注册的Observer数组。
- SubjectObserver<T>[] terminate(Object n) {
- set(n);
- active = false;
-
- State<T> oldState = state;
- if (oldState.terminated) {
- return State.NO_OBSERVERS;
- }
- return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers;
- }