当前位置: 代码迷 >> 综合 >> Rxjava分析—Subject
  详细解决方案

Rxjava分析—Subject

热度:29   发布时间:2023-12-12 22:49:14.0

Subject在ReactiveX是作为observer和observerable的一个bridge或者proxy。因为它是一个观察者,所以它可以订阅一个或多个可观察对象,同时因为他是一个可观测对象,所以它可以传递和释放它观测到的数据对象,并且能释放新的对象。


1. Subject的类型


一共有四种为不同用途而设计的Subject,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject


1. AsyncSubject


AsyncSubject仅释放Observable释放的最后一个数据,并且仅在Observable完成之后。然而如果当Observable因为异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。



2. BehaviorSubject


当Observer订阅了一个BehaviorSubject,它一开始就会释放Observable最近释放的一个数据对象,当还没有任何数据释放时,它则是一个默认值。接下来就会释放Observable释放的所有数据。如果Observable因异常终止,BehaviorSubject将不会向后续的Observer释放数据,但是会向Observer传递一个异常通知。



3. PublishSubject


PublishSubject仅会向Observer释放在订阅之后Observable释放的数据。



4. ReplaySubject


不管Observer何时订阅ReplaySubject,ReplaySubject会向所有Observer释放Observable释放过的数据。

有不同类型的ReplaySubject,它们是用来限定Replay的范围,例如设定Buffer的具体大小,或者设定具体的时间范围。

如果使用ReplaySubject作为Observer,注意不要在多个线程中调用onNext、onComplete和onError方法,因为这会导致顺序错乱,这个是违反了Observer规则的。



2. RxJava的Subject源码分析


1. Subject


Subject表示一个同时是Observable和Observer的对象。类Subject的代码如下:

[java]  view plain
  1. package rx.subjects;  
  2.   
  3. import rx.Observable;  
  4. import rx.Observer;  
  5. import rx.Subscriber;  
  6.   
  7. /** 
  8.  * Represents an object that is both an Observable and an Observer. 
  9.  */  
  10. public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {  
  11.     protected Subject(OnSubscribe<R> onSubscribe) {  
  12.         super(onSubscribe);  
  13.     }  
  14.   
  15.     public abstract boolean hasObservers();  
  16.       
  17.     public final SerializedSubject<T, R> toSerialized() {  
  18.         return new SerializedSubject<T, R>(this);  
  19.     }  
  20. }  

2. BehaviorSubject


Subject有四个主要的子类,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。接下来将以BehaviorSubject为例进行源码分析。


2.1 BehaviorSubject订阅subscribe过程


在需要使用subject时,调用Subject的subscribe(..)方法,该方法实际会调用下面这个subscribe(Subscriber<? super T> subscriber)方法,所以其他的subscribe方法都要将输入参数转化为一个Subscriber对象。

[java]  view plain
  1. public final Subscription subscribe(Subscriber<? super T> subscriber) {  
  2.         ...    
  3.         // new Subscriber so onStart it  
  4.         subscriber.onStart();  
  5.           
  6.         ...  
  7.   
  8.         // The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.  
  9.         try {  
  10.             // allow the hook to intercept and/or decorate  
  11.             hook.onSubscribeStart(this, onSubscribe).call(subscriber);  
  12.             return hook.onSubscribeReturn(subscriber);  
  13.         } catch (Throwable e) {  
  14.             // special handling for certain Throwable/Error/Exception types  
  15.             Exceptions.throwIfFatal(e);  
  16.             // if an unhandled error occurs executing the onSubscribe we will propagate it  
  17.             try {  
  18.                 subscriber.onError(hook.onSubscribeError(e));  
  19.             } catch (OnErrorNotImplementedException e2) {  
  20.                 // special handling when onError is not implemented ... we just rethrow  
  21.                 throw e2;  
  22.             } catch (Throwable e2) {  
  23.                 // if this happens it means the onError itself failed (perhaps an invalid function implementation)  
  24.                 // so we are unable to propagate the error correctly and will just throw  
  25.                 RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);  
  26.                 // TODO could the hook be the cause of the error in the on error handling.  
  27.                 hook.onSubscribeError(r);  
  28.                 // TODO why aren't we throwing the hook's return value.  
  29.                 throw r;  
  30.             }  
  31.             return Subscriptions.unsubscribed();  
  32.         }  
  33.     }  

方法中hook.onSubsribeStart(this, onSubscribe).call(subscriber)默认情况下等价于onSubscribe.call(subscriber)。onSubscriber是什么呢?这个就需要了解BehaviorSubject的构造方法

[java]  view plain
  1. protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {  
  2.         super(onSubscribe);  
  3.         this.state = state;  
  4.     }  
其中调用了父类Subject的构造方法

[java]  view plain
  1. protected Subject(OnSubscribe<R> onSubscribe) {  
  2.         super(onSubscribe);  
  3.     }  
其中调用了父类Observer的构造方法

[java]  view plain
  1. protected Observable(OnSubscribe<T> f) {  
  2.         this.onSubscribe = f;  
  3.     }  
obSubscribe即是BehaviorSubject构造方法中传入的第一个参数。

BehaviorSubject有3个静态工厂方法用来生产BehaviorSubject对象。

[java]  view plain
  1. public final class BehaviorSubject<T> extends Subject<T, T> {  
  2.   
  3.     public static <T> BehaviorSubject<T> create() {  
  4.         return create(nullfalse);  
  5.     }  
  6.   
  7.     public static <T> BehaviorSubject<T> create(T defaultValue) {  
  8.         return create(defaultValue, true);  
  9.     }  
  10.   
  11.     private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {  
  12.         final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();  
  13.         if (hasDefault) {  
  14.             state.set(NotificationLite.instance().next(defaultValue));  
  15.         }  
  16.         state.onAdded = new Action1<SubjectObserver<T>>() {  
  17.   
  18.             @Override  
  19.             public void call(SubjectObserver<T> o) {  
  20.                 o.emitFirst(state.get(), state.nl);  
  21.             }  
  22.               
  23.         };  
  24.         state.onTerminated = state.onAdded;  
  25.         return new BehaviorSubject<T>(state, state);   
  26.     }  
  27.     ....  
  28. }  
前两个Public的静态构造方法实际上调用的是第三个private方法。

最后return new BehaviorSubject<T>(state, state),所以onSubscribe实际为一个SubjectSubscriptionManager的对象,onSubscribe.call(subscriber)实际调用的是SubjectSubscriptionManager的call方法。

[java]  view plain
  1. /* package */final class SubjectSubscriptionManager<T> implements OnSubscribe<T> {  
  2.     ...  
  3.     @Override  
  4.     public void call(final Subscriber<? super T> child) {  
  5.         SubjectObserver<T> bo = new SubjectObserver<T>(child);  
  6.         addUnsubscriber(child, bo);  
  7.         onStart.call(bo);  
  8.         if (!child.isUnsubscribed()) {  
  9.             if (add(bo) && child.isUnsubscribed()) {  
  10.                 remove(bo);  
  11.             }  
  12.         }  
  13.     }  
  14. }  
1. 调用addUnsubscriber方法,注册一个在取消订阅时执行的一个动作,即将该观擦者Observer移除掉。

[java]  view plain
  1. /** Registers the unsubscribe action for the given subscriber. */  
  2.     void addUnsubscriber(Subscriber<? super T> child, final SubjectObserver<T> bo) {  
  3.         child.add(Subscriptions.create(new Action0() {  
  4.             @Override  
  5.             public void call() {  
  6.                 remove(bo);  
  7.             }  
  8.         }));  
  9.     }   
2. 调用add(SubjectObserver<T> o)方法,将该Observer加入已经注册的Observer[]数组当中。
[java]  view plain
  1. boolean add(SubjectObserver<T> o) {  
  2.         do {  
  3.             State oldState = state;  
  4.             if (oldState.terminated) {  
  5.                 onTerminated.call(o);  
  6.                 return false;  
  7.             }  
  8.             State newState = oldState.add(o);  
  9.             if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {  
  10.                 onAdded.call(o);  
  11.                 return true;  
  12.             }  
  13.         } while (true);  
  14.     }  

该方法会调用onAdd.call(o)。BehaviorSubject的onAdd对象如下,state.get()得到的是最近的数据对象,o.emitFirst即会释放最近的数据对象,这正体现了BehaviorSubject的特点。

[java]  view plain
  1. state.onAdded = new Action1<SubjectObserver<T>>() {  
  2.   
  3.             @Override  
  4.             public void call(SubjectObserver<T> o) {  
  5.                 o.emitFirst(state.get(), state.nl);  
  6.             }  
  7.               
  8.         };  

在这个过程中使用了SubjectSubscriptionManager的两个内部类。

1. State<T>

该类用来管理已经注册的Observer数组,以及他们的状态。

[java]  view plain
  1. /** State-machine representing the termination state and active SubjectObservers. */  
  2.     protected static final class State<T> {  
  3.         final boolean terminated;  
  4.         final SubjectObserver[] observers;  
  5.         static final SubjectObserver[] NO_OBSERVERS = new SubjectObserver[0];  
  6.         static final State TERMINATED = new State(true, NO_OBSERVERS);  
  7.         static final State EMPTY = new State(false, NO_OBSERVERS);  
  8.           
  9.         public State(boolean terminated, SubjectObserver[] observers) {  
  10.             this.terminated = terminated;  
  11.             this.observers = observers;  
  12.         }  
  13.         public State add(SubjectObserver o) {  
  14.             ...  
  15.         }  
  16.         public State remove(SubjectObserver o) {  
  17.             ...  
  18.         }  
  19.     }  

2. SubjectObserver<T>

该类时Observer的一个装饰类,运用了装饰模式给Observer类添加新的功能。

以上就是Subject对象订阅Observer时的流程。


2.2 BehaviorSubject的onNext


Behavior的onNext(T v)方法如下

[java]  view plain
  1. @Override  
  2.     public void onNext(T v) {  
  3.         Object last = state.get();  
  4.         if (last == null || state.active) {  
  5.             Object n = nl.next(v);  
  6.             for (SubjectObserver<T> bo : state.next(n)) {  
  7.                 bo.emitNext(n, state.nl);  
  8.             }  
  9.         }  
  10.     }  
state是SubjectSubscriptionManager类的对象,是这个对象来维护最近释放的数据对象,state.get()获取最近释放的数据对象,state.next(Object n)方法重新设置最近释放的数据对象,并返回已经注册的Observer数组。

[java]  view plain
  1. SubjectObserver<T>[] next(Object n) {  
  2.         set(n);  
  3.         return state.observers;  
  4.     }  
bo.emitNext(Object n, final NotificationLite<T> nl)释放给定的数据对象。



2.3 BehaviorSubject的onCompleted和onError


onCompleted和onError会调用SubjectSubscriptionManager的terminate(Object n)方法,该方法会重新设置最近释放的数据对象,设置Subject状态为TERMINATED,表示终结了,最后返回已注册的Observer数组。

[java]  view plain
  1. SubjectObserver<T>[] terminate(Object n) {  
  2.         set(n);  
  3.         active = false;  
  4.   
  5.         State<T> oldState = state;  
  6.         if (oldState.terminated) {  
  7.             return State.NO_OBSERVERS;  
  8.         }  
  9.         return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers;  
  10.     }