参考:
https://mp.weixin.qq.com/s/RkGHpVSpngfHDXo4Es-a9w
https://mp.weixin.qq.com/s/elA3Gib57YGWnXOEcFOPUQ
https://mp.weixin.qq.com/s/WaWEtFjmajalISwAkJyuKw
上一篇我们说了Android-RxJava(上)主要包括组合操作符,变换操作符,创建操作符,我们再接再厉,继续下半部分内容,也就是剩余的操作符:
3.4 过滤操作符
含义:过滤/筛选 被观察者发送的事件。
3.4.1 filter
过滤操作符filter(),通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。
相关代码:
private void rxJavaFilter(){Observable.just(1,2,3,4).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
可看到我们要求打印integer小于3的,所以打印了1和2.
3.4.2 ofType
过滤操作符ofType(),可以过滤不符合该类型事件。
private void rxJavaOfType(){Observable.just(1,2,3,4,"薛之涛").ofType(String.class).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(String integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
我们要求过滤Integer数据类型,留下String类型,打印结果正确!
3.4.3 skip 或 skipLast
过滤操作符skip,跳过正序部分事件,参数为跳过前多少个事件。
Observable.just(1,2,3,4).skip(2).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
skipLast和skip操作符相反,它是跳过后多少个事件打印其之前的事件
3.4.4 distinct 或 distinctUntilChanged
过滤操作符distinct,过滤事件序列中的重复事件
代码:
private void rxJavadistinct(){Observable.just(1,2,3,3,3,4,2).distinct().subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
- distinctUntilChanged
过滤操作符distinctUntilChanged ()过滤掉连续重复的事件
代码:
Observable.just(1,2,3,3,3,2,1).distinctUntilChanged().subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});
结果:
3.4.5 take 或 takeLast
过滤操作符take(),控制观察者接收的事件的数量。
代码:
Observable.just(1,2,3,3,3,2,1).take(3).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});
结果:
- takeLast()操作符,控制观察者只能接受事件序列的后面几个请求
代码:
Observable.just(1,2,3,3,3,2,1).takeLast(3).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});
结果:
3.4.6 elementAt 或 elementAtOrError()
过滤操作符 elementAt(),可以指定取出事件序列中事件,下标从0开始,但如果指定的index大于总的事件序列数,则无反应
代码:
/*** 过滤操作符elementAt,指定队列中的事件下标,取出该事件*/private void rxjavaElementAt(){Observable.just(1,2,3,4).elementAt(3).subscribe(new Consumer<Integer>(){@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "accept: "+integer);}});}
结果:
- elementAtOrError() 操作符和elementAt不同的是,其在下标超出队列数后会报 NoSuchElementException 异常.
- irstElement() 取事件序列的第一个元素
- lastElement() 取事件序列的最后一个元素
3.4.7 throttleFirst 或 throttleLast
过滤操作符throttleFirst (),可以和rxbinding2结合使用和绑定view的点击事件,防止在规定时间内多次点击,也就是防止规定事件防止重复点击。先加入rxbinding2依赖。implementation 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
代码:
/*** throttleFirst操作符绑定view的点击事件,防止在规定时间内多次点击*/int clickNum =1;private void rxjavathrottleFirst(){TextView tv=findViewById(R.id.tv);RxView.clicks(tv).throttleFirst(3,TimeUnit.SECONDS).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {Log.d(TAG, "accept: "+"第"+clickNum+"次点击了TextView");clickNum ++ ;}});}
结果:
我在程序运行期间不断点击TextView,结果只打印了3秒间隔之后的第一次点击。
-throttleWithTimeout
throttleWithTimeout() 操作符,如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。
3.5条件操作符
含义:通过指定条件,判断是否接收被观察者发送的事件。
3.5.1 all
条件操作符all(),主要用来判断所有事件是否满足.如果都满足则返回 true,反之则返回 false
代码:
/*** 条件操作符,all如果都满足则返回 true,反之则返回 false*/private void rxJavaAll(){Observable.just(1,2,3,4).all(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 5;}}).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "accept: "+aBoolean);}});}
结果:
3.5.2 takeWhile
条件操作符takeWhile(),当判断发送的事件不满足条件时,就会终止后续事件的接受
代码:
/*** 某个数据满足条件时就会发送该数据,反之则不发送*/private void rxJavaTakeWhile(){Observable.just(3,2,1,4).takeWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer >= 2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "accept: "+integer);}});}
结果:
3.5.3 skipWhile
条件操作符skipWhile(),当判断发送的事件不满足条件时,才接受后续事件,反之亦然。
代码:
/**
* 满足条件的事件不发送,不满足时发送其及其之后的事件,注意其之后的数据是不判断的*/private void rxJavaSkipWhile(){Observable.just(1,2,4,3).skipWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {Log.d(TAG, "test: "+integer);return integer >= 3 ;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "accept: "+integer);}});}
结果:
3.5.4 takeUntil
条件操作符takeUntil(),满足条件时,其之后的事件不会被发送
代码:
private void rxJavaTakeUntil(){Observable.just(1,2,4,3).takeUntil(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {Log.d(TAG, "test: "+integer);return integer >= 3 ;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.d(TAG, "accept: "+integer);}});}
结果:
3.5.6 sequenceEqual
条件操作符sequenceEqual(),判断两个非观察者发送的事件是否一样
代码:
private void rxJavaSequenceEqual(){Observable.sequenceEqual(Observable.just(1, 2, 3),Observable.just(1,2,3)).subscribe(new Consumer < Boolean > () {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "accept====" + aBoolean);}});}
结果:
注意如果是:Observable.just(1, 2, 3) 和Observable.just(3,2,1)比较返回结果为false,是有顺序之分的
3.6.7 contains
条件操作符contains(),判断是否包含指定数据
代码:
private void rxJavaContains(){Observable.just(1, 2, 3).contains(4).subscribe(new Consumer < Boolean > () {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "accept====" + aBoolean);}});}
结果:
3.6.8 isEmpty
条件操作符sEmpty(),判断发送的数据是否为空,如果事件序列中元素为空则返回true
代码:
private void rxJavaIsEmpty() {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {e.onComplete();}}).isEmpty().subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {Log.d(TAG, "accept===" + aBoolean);}});}
结果:
3.6其他操作符
含义:被观察者发送事件时,进行功能性拓展。
3.6.1 功能操作符
- doOnEach
doOnEach 操作符:Observable 每发送一件事件之前都会先回调这个方法。
代码:
/***doOnEach(),Observable 每发送一件事件之前都会先回调这个方法。*/private void rxjavaDoOnEach(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe: ");emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}}).doOnEach(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {Log.d(TAG, "accept: "+"执行了doOnEach获取的元素值为:"+integerNotification.getValue());}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
- doOnLifecycle
在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅,第二个参数则是与 doOnDispose() 一样,在调用 Disposable 的 dispose() 之后回调该方法**
我们先看一个代码:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();Log.d(TAG, "subscribe: ");}}).doOnLifecycle(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {Log.d(TAG, "doOnLifecycle ===accept: ");}}, new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "doOnLifecycle ===Action: ");}}).doOnDispose(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "doOnDispose === run: ");}}).subscribe(new Observer<Integer>() {Disposable mDisposable;@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");mDisposable =d;}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);//此处取消订阅mDisposable.dispose();}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。那我们如果使用 doOnLifecycle 进行取消订阅,来看看结果:
代码:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe: ");emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}}).doOnLifecycle(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {Log.d(TAG, "doOnLifecycle ===accept: ");disposable.dispose();}}, new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "doOnLifecycle ===Action: ");}}).doOnDispose(new Action() {@Overridepublic void run() throws Exception {Log.d(TAG, "doOnDispose === run: ");}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});}
结果:
可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。
其余的我就不写代码了,大家也都能明白.
- doAfterNext
Observable 每发送 onNext() 之后都会回调这个方法。 - doOnNext
Observable 每发送 onNext() 之前都会先回调这个方法。 - doOnComplete
Observable 每发送 onComplete() 之前都会回调这个方法。 - doOnError
Observable 每发送 onError() 之前都会回调这个方法。 - doOnSubscribe
Observable 每发送 onSubscribe() 之前都会回调这个方法 - doOnDispose
当调用 Disposable 的 dispose() 之后回调该方法。 - doOnTerminate & doAfterTerminate
doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。 - doFinally()
在所有事件发送完毕之后回调该方法。
3.6.2 出现错误或异常处理操作符
- onErrorReturn
当接受到一个 onError() 事件之后回调,将不再走onError回调,返回的值会回调 onNext() 方法,,并正常结束该事件序列。
代码:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe: ");emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onError(new NullPointerException());}}).onErrorReturn(new Function<Throwable,Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {Log.d(TAG, "onErrorReturn ==== apply: "+throwable);return 500;}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});
结果:
- onErrorResumeNext
当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。
代码:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe: ");emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onError(new NullPointerException());}}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {Log.d(TAG, "onErrorResumeNext ==== apply: "+throwable);return Observable.just(4, 5, 6);}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: ");}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});
结果:
- retry
retry(),如果出现错误事件,则会重新发送所有事件序列。参数是代表重新发的次数。
代码:
Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.d(TAG, "subscribe: ");emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onError(new NullPointerException());}}).retry(2).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "onSubscribe: ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "onNext: "+integer);}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: "+e);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete: ");}});
结果:
3.6.3 事件重发操作符
- repeat
重复发送被观察者的事件,times 为发送次数。
代码:
bservable.create(new ObservableOnSubscribe < Integer > () {@Overridepublic void subscribe(ObservableEmitter < Integer > e) throws Exception {e.onNext(1);e.onNext(2);e.onNext(3);e.onComplete();}
})
.repeat(2)
.subscribe(new Observer < Integer > () {@Overridepublic void onSubscribe(Disposable d) {Log.d(TAG, "===================onSubscribe ");}@Overridepublic void onNext(Integer integer) {Log.d(TAG, "===================onNext " + integer);}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {Log.d(TAG, "===================onComplete ");}
});
结果:
- repeatWhen
这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件,具体就不说了都一样的。 - repeatUntil
根据条件决定是否执行,其实现方法getAsBoolean()的返回值如果一直是false,则无限执行,当返回值是true时则立即中断执行。
3.6.4 被观察者延迟发送操作符
- delay
延迟一段事件发送事件,没啥说的。
3.6.5 发送事件超时处理操作符
- timeout
这个timeout有多个不同类型参数:
timeout(long timeout, TimeUnit timeUnit):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出 TimeoutException,以一个错误通知终止Observable。
timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout 在超时时会切换到使用一个你指定的备用的 Observable。
timeout(Function<> itemTimeoutIndicator):timeout使用一个Function对原始Observable发射的每一项进行观察,如果当这个Function执行完但原始Observable还没有发射下一个数据时,系统就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止原始Observable。
就先说这么多吧,告辞!