当前位置: 代码迷 >> 综合 >> RxJava3源码实现-1-create()+subscribe()
  详细解决方案

RxJava3源码实现-1-create()+subscribe()

热度:107   发布时间:2023-10-09 04:48:50.0

目录

1、代码调用流程图

1.1、create()

1.2、subscribe()

2、代码

2.1、结构目录

2.2、代码一

2.3、代码二

3、输出

3.1、原生输出

3.2、手写输出


github代码地址

1、代码调用流程图

从下图中我们可看到:

1、调用create()方法会new 一个ObservableOnSubscribe对象,并将这个对象传到了ObservableCreate类中;

2、调用subscribe()订阅时,我们会创建一个Observer对象,并且穿给CreateEmitter类中;

3、然后,调用subscribe()会最后调用:

ObservableCreate_subscribeActual()

----->ObservableOnSubscribe_subscribe()

----->Emitter_onNext()

----->Observer_onNext()

4、这样我们Create()方法中的onNext()中的值,就传到了Observer中onNext()方法中。

1.1、create()

RxJava3源码实现-1-create()+subscribe()

1.2、subscribe()

RxJava3源码实现-1-create()+subscribe()

2、代码

2.1、结构目录

RxJava3源码实现-1-create()+subscribe()

2.2、代码一

YoObservableSource--->YoObservable--->YoObservableCreate

它们三者是继承关系。

public interface YoObservableSource<T> {void subscribe(YoObserver<? super T> yoObserver);
}
----------------------------------------------------------------------------public abstract class YoObservable<T> implements YoObservableSource<T> {@Overridepublic void subscribe(YoObserver<? super T> yoObserver) {subscribeActual(yoObserver);}public static YoObservable create(YoObservableOnSubscribe source) {return new YoObservableCreate(source);}protected abstract void subscribeActual(YoObserver<? super T> observer);
}
----------------------------------------------------------------------------public class YoObservableCreate<T> extends YoObservable<T> {private final YoObservableOnSubscribe<T> source;public YoObservableCreate(YoObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(YoObserver<? super T> observer) {YoEmitter emitter = new YoCreateEmitter(observer);source.subscribe(emitter);}static final class YoCreateEmitter<T> implements YoEmitter<T> {final YoObserver<? super T> observer;YoCreateEmitter(YoObserver<? super T> observer) {this.observer = observer;}@Overridepublic void onNext(T t) {if (!isDisposed()) {observer.onNext(t);}}@Overridepublic void onError(Throwable t) {if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}}}@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}void dispose() {// 取消订阅}@Overridepublic boolean isDisposed() {return false;}}
}

2.3、代码二

YoEmitter        YoObservableOnSubscribe             YoObserver

public interface YoEmitter<T> {void onNext(T value);void onError(Throwable error);void onComplete();boolean isDisposed();
}
-------------------------------------------------------------------------public interface YoObservableOnSubscribe<T> {void subscribe(YoEmitter<T> emitter);
}
-------------------------------------------------------------------------public interface YoObserver<T> {void onSubscribe();void onNext(T t);void onError(Throwable e);void onComplete();
}

3、输出

3.1、原生输出

 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<String> e) throws Throwable {log("开始发送消息!");e.onNext("A");e.onNext("B");e.onNext("C");e.onComplete();}}).doOnNext(s -> log("发送消息:" + s));observable.subscribe(new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {log("我刚被订阅上,开始初始化工作!");}@Overridepublic void onNext(@NonNull String s) {log("我接收到:" + s);}@Overridepublic void onError(@NonNull Throwable e) {log("错误:" + e.getMessage());}@Overridepublic void onComplete() {log("接受完成!");}});//输出结果
main:我刚被订阅上,开始初始化工作!
main:开始发送消息!
main:发送消息:A
main:我接收到:A
main:发送消息:B
main:我接收到:B
main:发送消息:C
main:我接收到:C
main:接受完成!

3.2、手写输出

  YoObservable yoObservable = YoObservable.create(new YoObservableOnSubscribe<String>() {@Overridepublic void subscribe(YoEmitter<String> e) {log("发送消息:A");e.onNext("A");log("发送消息:B");e.onNext("B");log("发送消息:C");e.onNext("C");e.onComplete();}});yoObservable.subscribe(new YoObserver<String>() {@Overridepublic void onSubscribe() {log("我刚被订阅上,开始初始化工作!");}@Overridepublic void onNext(String s) {log("我接收到:" + s);}@Overridepublic void onError(@NonNull Throwable e) {log("错误:" + e.getMessage());}@Overridepublic void onComplete() {log("接受完成!");}});
//输出
main:发送消息:A
main:我接收到:A
main:发送消息:B
main:我接收到:B
main:发送消息:C
main:我接收到:C
main:接受完成!

 

RxJava3源码实现-1-create()+subscribe()

 

 

 

 

 

 

 

 

 

 

 

 

  相关解决方案