RxJava 1.0版本入门
1.首先在app目录下的build.gradle中加入
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
然后点击同步工程(sync Project with gradle files)不报错的话就可以来使用了.
Demo1 观察者是Observer
//1.创建observable被观察者Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("hello RXAndroid");}});//2.创建观察者Observer<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {Log.v(TAG,"onCompleted");}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {Log.v(TAG,"onNext"+s);}};//3.订阅observable.subscribe(observer);
Demo2 观察者是Single
//Single对象只能调用onSuccess()/onError() 并且只能调用一次Single<String> observable = Single.create(new Single.OnSubscribe<String>() {@Overridepublic void call(SingleSubscriber<? super String> singleSubscriber) {//系统内部就会调用观察者对象的onNext+onComplete()//singleSubscriber.onSuccess("好好学习!");//系统内部就会调用观察者对象的onError()singleSubscriber.onError(new NullPointerException());}});//2.家庭-----》观察者ObserverObserver<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {Log.i(TAG, "onCompleted: ");}@Overridepublic void onError(Throwable e) {Log.i(TAG, "onError: "+e.getLocalizedMessage());}//接受处理事件的方法@Overridepublic void onNext(String s) {Log.i(TAG, "onNext: "+s);}};//3.关联----》订阅subscribeobservable.subscribe(observer);
}
Demo3 观察者是Subscriber即Observer的实现类
final Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {//相当于发报纸subscriber.onNext("Hello Android !");//subscriber.onCompleted();subscriber.onError(new NullPointerException("mock exception !"));}});//2.观察者Subscriber是Observer的子类 他还提供了取消订阅和判断是否订阅的方法Subscriber<String> observer = new Subscriber<String>() {@Overridepublic void onCompleted() {Log.i(TAG, "onCompleted: ");}@Overridepublic void onError(Throwable e) {Log.i(TAG, "onError: "+e.getLocalizedMessage());}//接受处理事件的方法@Overridepublic void onNext(String s) {Log.i(TAG, "onNext: "+s);}};//3.关联----》订阅subscribeobservable.subscribe(observer);}
Demo4 只关心事件Action
//如果只关心onNext事件 而不关心onComplete()/onError//如果只关心onNext事件 那么被观察者发送了异常而没人处理 就会抛给系统Action1<String> onNextAction=new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call: "+s);}};//关心Error事件<Throwable> onErrorAction=new Action1<Throwable>() {@Overridepublic void call(Throwable throwable) {Log.i(TAG, "call: "+throwable.getLocalizedMessage());}};//关心onComplete事件Action0 onCompleteAction=new Action0() {@Overridepublic void call() {Log.i(TAG, "onComplete: ");}};
Demo5 观察者是Subject的子类AsyncSubject
//Subject<T, R> extends Observable<R> implements Observer<T>//AsyncSubject他在创建之后就可以发送数据(不用订阅之后再发送数据)它只接收最后一个onNext()事件(在onComplete调用之前)//只要没有onComplete被发送 那么观察者就接收不到任何信息AsyncSubject<String> observable=AsyncSubject.create();observable.onNext("Hello Android !");observable.onNext("Hello Java !");observable.onNext("Hello CPP !");//observable.onCompleted();observable.onError(new NullPointerException());//2.创建一个观察者Action1<String> onNextAction=new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call: "+s);}};//3.实现订阅observable.subscribe(onNextAction);
Demo6 观察者是Subject的子类BehaviorSubject
//1.创建一个被观察者BehaviorSubject是以订阅方法作为分界线//只发送订阅前最后一个onNext事件和订阅后的所有onNext事件//如果订阅前没有发送数据 那么就会接收构造器里面默认的事件和订阅后的事件。BehaviorSubject<String> observable= BehaviorSubject.create("DEFAULT");
// observable.onNext("A !");
// observable.onNext("B !");
// observable.onNext("C !");//2.创建一个观察者Action1<String> onNextAction=new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call: "+s);}};//3.实现订阅observable.subscribe(onNextAction);observable.onNext("D !");observable.onNext("E !");observable.onNext("F !");
Demo7 观察者是Subject的子类PublishSubject
//1.创建一个被观察者PublishSubject:// 它是在创建之后就可以发送事件// 作为观察者 只能接收到订阅后的所有事件PublishSubject<String> observable= PublishSubject.create();observable.onNext("A !");observable.onNext("B !");observable.onNext("C !");//2.创建一个观察者Action1<String> onNextAction=new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call: "+s);}};//3.实现订阅observable.subscribe(onNextAction);observable.onNext("D !");observable.onNext("E !");observable.onNext("F !");
Demo8 观察者是Subject的子类ReplaySubject
//1.创建一个被观察者ReplaySubject://ReplaySubject刚创建完毕的时候就开始发送数据了//不管观察者是什么时候订阅 它都会接收ReplaySubject对象发出的任何事件。ReplaySubject<String> observable= ReplaySubject.create();observable.onNext("A !");observable.onNext("B !");observable.onNext("C !");//2.创建一个观察者Action1<String> onNextAction=new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call: "+s);}};//3.实现订阅observable.subscribe(onNextAction);observable.onNext("D !");observable.onNext("E !");observable.onNext("F !");
Demo8 观察者是可连接的被观察者ConnectableObservable
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {//相当于发报纸subscriber.onNext("Hello Android !");subscriber.onNext("Hello Android !");subscriber.onNext("Hello Android !");subscriber.onNext("Hello Android !");}});//publish--->将普通的被观察者 变成可连接的观察者ConnectableObservable<String> connectableObservable = observable.publish();//refCount--->将可连接的观察者转换成普通的观察者//Observable<String> stringObservable = connectableObservable.refCount();Observer<String> observer = new Observer<String>() {@Overridepublic void onCompleted() {Log.i(TAG, "onCompleted: ");}@Overridepublic void onError(Throwable e) {Log.i(TAG, "onError: "+e.getLocalizedMessage());}//接受处理事件的方法@Overridepublic void onNext(String s) {Log.i(TAG, "onNext: "+s);}};connectableObservable.subscribe(observer);//connect-->让可连接的被观察者调用内部的call方法(相当于发送了事件)connectableObservable.connect();
Demo9 观察者是可连接的被观察者ConnectableObservable
//创建一个普通的被观察者Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {//相当于发报纸subscriber.onNext("Hello Android !");subscriber.onNext("Hello Android !");subscriber.onNext("Hello Android !");subscriber.onNext("Hello Android !");}});//2.将普通的被观察者变成可连接的被观察者//publish()创建的被观察者只有在connect()之前订阅的观察者才能接收事件 如果在connect()之后订阅的观察者 是无法获取被观察者发送的事件//有没办法可以让 只要是观察者订阅了可连接的被观察者 都能打印出被观察者发送出来的数据 而不管订阅在connect()的前后顺序。-->replay()ConnectableObservable<String> connectableObservable = observable.replay();//3.实现订阅connectableObservable.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call==1===: "+s);}});//4.让被观察者主动发送事件connectableObservable.connect();//5.再次订阅一个新的观察者connectableObservable.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.i(TAG, "call==2===: "+s);}});
有的被观察者 在创建之后就马上发送了数据—–》“热”Observable—-》Subject的子类
有的被观察者 在订阅的时候才发送的数据——->”冷”Observable—》普通的Observable
还有一种特殊的被观察者 他可以在我们指定的时间点发送数据—–>”冷”Observable—->可连接的Observable