当前位置: 代码迷 >> 综合 >> Android-RxJava(上)
  详细解决方案

Android-RxJava(上)

热度:83   发布时间:2023-11-19 20:01:30.0

Rxjava出来已经好久了,但始终存在于会使用的阶段,今天来做个总结。
参考:
https://mp.weixin.qq.com/s/7sKjc5ahHI4fmadXW-uP_w
https://mp.weixin.qq.com/s/V0hEyiZwC7z5PiC3Uz0wyA
https://mp.weixin.qq.com/s/RkGHpVSpngfHDXo4Es-a9w
https://mp.weixin.qq.com/s/elA3Gib57YGWnXOEcFOPUQ
https://mp.weixin.qq.com/s/WaWEtFjmajalISwAkJyuKw

1.简介

RxJava是观察者模式的扩展,是响应式函数的扩展库,在观察者模式上实现了发送者(observable)和接受者(observer)解耦;链式调用降低业务之间的依赖,使得代码很简介;支持泛型,减少冗余代码,增强代码可读性;支持设置同步异步切换,简单实现异步回调;观察者与被观察者的继承,多态,更好解决复杂逻辑的嵌套。

2.基本使用

2.1

首先RxJava一般有三要素

  • 观察者(Observer)
  • 被观察者(Observable)
  • 订阅,即绑定(subscribe)
    话不多说上代码:
    首先添加相关依赖
    //RxJava的相关依赖implementation 'io.reactivex.rxjava2:rxjava:2.1.4'implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'

注意:RxAndroid是RxJava的一个针对Android平台的扩展,主要用于 Android 开发。

/*** 执行RxJava相关代码*/private void doRxJava() {//创建被观察者Observable observable =Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {//通过ObservableEmitter发射器发送事件emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}});//创建观察者Observer observer = new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "======================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.e(TAG, "======================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "======================onError");}@Overridepublic void onComplete() {Log.e(TAG, "======================onComplete");}};//被观察者通过subscribe()订阅观察者observable.subscribe(observer);}

打印信息如下:


6748497-a6005e6be37ea04f.png
image.png

通过打印信息可以看到:
在被观察者通过订阅绑定观察者之后,他们先后执行了观察者的onSubscribe()->被观察者的subscribe(),然后是观察者的onNext() -> onComplete().当然onError()方法没有执行,因为我们的请求时成功的,那么这些方法都有什么含义呢:

  • onSubscribe() ,观察者的方法,如果我们的被观察者通过subscribe订阅观察者成功则会执行该方法。
  • subscribe() ,被观察者的方法,通过该方法在订阅成功后给观察者发送事件
  • onNext(),观察者的方法,接受被观察者事件处理结果的方法。
  • onComplete(),观察者的方法,事件完成调用的方法。
  • onError(),观察者的方法,事件因为异常所调用的方法,注意该方法执行后其他事件将不会继续发送,如onComplete()

当然也可以换一种写法,通过链式调用的方式,如下:

 /*** 执行RxJava相关代码*/private void doRxJava2() {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}}).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {Log.e(TAG, "======================onSubscribe");}@Overridepublic void onNext(Integer integer) {Log.e(TAG, "======================onNext " + integer);}@Overridepublic void onError(Throwable e) {Log.e(TAG, "======================onError");}@Overridepublic void onComplete() {Log.e(TAG, "======================onComplete");}});}

总结:
被观察者(Observable):通过订阅行为(subscribe())把事件按顺序发送到 观察者(Observer)。 观察者(Observer):按顺序接收到事件&做出响应反馈。

2.2

你可能看到过这种写法:

      Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {emitter.onNext(1)}})..subscribe(new Action1<String>() {@Overridepublic void call(String s) {System.out.println(s);}});

这是RxJava的时候的写法,我们说说Action1或者Action是什么意思呢,为什么不用new Observer了呢?
如果我们到的Observer不想实现 OnComplete ()和 OnError()方法,只需要在 onNext 时做一些处理,可以用 Action1 或Action类,明白了吧。
subscribe 方法有一个重载版本,接受1~3个 Action1 类型的参数,分别对应 OnNext,OnError,OnComplete然后我们现在只需要 onNext,就只需要传入一个参数。
同样的我们来看看如果你升级到RxJava2,改如果写呢?

//创建被观察者Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {Log.e(TAG, "======================" + Thread.currentThread().getName());//通过ObservableEmitter发射器发送事件emitter.onNext(1);emitter.onNext(2);emitter.onNext(3);emitter.onComplete();}});//好比onNextConsumer<Integer> onNextConsumer = new Consumer<Integer>() {@Overridepublic void accept(Integer integer) {Log.e(TAG, "======================onNext " + integer);}};//好比onErrorConsumer<Integer> onErroyCusumer = new Consumer<Integer>() {@Overridepublic void accept(Integer integer) {Log.e(TAG, "======================onError");}};//好比onCompleteAction completeAction = new Action() {@Overridepublic void run() throws Exception {Log.e(TAG, "======================onComplete");}};//被观察者通过subscribe()订阅观察者observable.subscribe(onNextConsumer, onErroyCusumer, completeAction);

RxJava和RxJava2关于Acton的的最大区别是:Action -> Consumer
这就是RxJava2的写法,同志们需要了解一下RxJava和RxJava2升级的区别,网上有很多,就不推荐了。

3.操作符

RxJava操作符是RxJava中重要的部分 ,操作符实质上就是RxJava函数式编程模式的体现,而操作符的种类包括创建操作符,变换操作符,合拼操作符,过滤操作符,条件操作符,其他操作符,今天我们来逐一讲解:

3.1创建操作符

含义:被观察者把事件发送至观察者。
具体应用:

应用种类
数据遍历 just() fromArray() fromlterable() range()
定时任务 interval() intervalRange()
异步嵌套回调 create()
延迟任务 defer() time()

这么多,我们每个类别说一个就好了,其余自行百度。

  • just()的使用
    just 只用于处理单个的数据。
 Observable.just("123456789").subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e(TAG, "======================accept"+s);}});

这里不得不提一下fromArray(),from和fromArray的区别是:fromArray用于转换多个数据,比如 ArrayList等包含多个数据的数组或者列表,可以传入多于10个的变量,并且可以传入一个数组;,而 just 只用于处理单个的数据。
代码:

 List<String> list = Arrays.asList("1", "2", "3", "4", "5", "6");Observable.fromArray(list).subscribe(new Consumer<List<String>>() {@Overridepublic void accept(List<String> strings) throws Exception {Log.e(TAG, "======================accept"+strings);}});

打印结果:


6748497-cdc4324837e305a9.png
image.png

还忍不住想说一下fromlterable(),fromlterable和fromArray的区别是fromIterable发送一个 List 集合数据给观察者,并依此打印集合中的元素。

  • interval()
    interval 操作符用于间隔时间执行某个操作,其接受三个参数,分别是第一次发送延迟,间隔时间,时间单位.
Observable.interval(3,2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {Log.e(TAG, "======================accept"+aLong);}});

结果:


6748497-ee81bf2830b0db49.png
image.png

intervalRange()可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。

  • create() 就不用说了吧。
  • defer()
    defer操作符就相当于懒加载,只有等observable 与observer建立了订阅关系时,observable才会建立执行相关代码。
int num = 0;/*** 创建操作符Defer*/private void rxJavaDefer(){num=1;Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> call() throws Exception {return Observable.just(num);}});num = 2;Consumer<Integer> consumer =new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e(TAG, "================onNext " + integer);}};observable.subscribe(consumer);}

结果:


6748497-1e9808a3b7dd1415.png

获取的是最后赋值的 num = 2;也就是Observable.defer()并没有在创建的时候执行。如果这么说还不明白,我们看一下直接用just操作符执行的结果

int a = 0;private void rxJavaDefer(){a =1;Observable<Integer> observable = Observable.just(a);a = 2;Consumer<Integer> consumer =new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e(TAG, "================onNext " + integer);}};observable.subscribe(consumer);}

结果打印:


6748497-5b8648ad251ff186.png
image.png

这下明白了吧!

3.2变换操作符

含义:被观察者把序列事件加工为其他序列事件(变换)。

应用种类
变换 map() flatmap()/concatmap()
  • map()
    map 可以将被观察者发送的数据类型转变成其他的类型.
    以下代码我们将Integer转为String
Observable.just(1,2,3).map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Exception {return integer+"";}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e(TAG, "accept:Integer转化为String后的数据"+s );}});

结果:


6748497-d5cbf9511ff3abe1.png
image.png
  • concatmap
    concatmap和flatmap操作符区别在于concatmap是有序发送而flatmap是无序的。
    比如我们要制定一任务,打印每人上午和下午的工作安排,先创建相关实体类:
private class User{//姓名String name;//任务列表List<Plan> listPlan;public String getName() {return name;}public void setName(String name) {this.name = name;}public List<Plan> getListPlan() {return listPlan;}public void setListPlan(List<Plan> listPlan) {this.listPlan = listPlan;}}//详细计划实体类public class Plan{//姓名String name;//时间String time;//工作内容String content;public String getTime() {return time;}public String getName() {return name;}public void setName(String name) {this.name = name;}public void setTime(String time) {this.time = time;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}public Plan(String name ,String time, String content) {this.time = time;this.content = content;this.name=name;}}

执行代码:

  /*** concatMap,有序将事件序列中的元素进行整合加工,返回一个新的被观察者,对应flatMap是无序的*/private  void rxJavaconcatMap(){LinkedList<Plan> zhangsan =new LinkedList<>();Plan plan =new Plan("张三","上午","打扫卫生");Plan plan2 =new Plan("张三","下午","学习");zhangsan.add(plan);zhangsan.add(plan2);LinkedList<Plan> lisi =new LinkedList<>();Plan plan3 =new Plan("李四","上午","打扫卫生");Plan plan4 =new Plan("李四","下午","参加体育活动");lisi.add(plan3);lisi.add(plan4);User user =new User();User user2 =new User();user.setName("张三");user.setListPlan(zhangsan);user2.setName("李四");user2.setListPlan(lisi);List<User> link =new LinkedList<>();link.add(user);link.add(user2);//我们上有提到fromIterable操作符是将集合中的元素输出Observable.fromIterable(link).concatMap(new Function<User, Observable<Plan>>(){@Overridepublic Observable<Plan> apply(User user) throws Exception {//注意这里是返回一个新的被观察者return Observable.fromIterable(user.getListPlan());}}).subscribe(new Consumer<Plan>() {@Overridepublic void accept(Plan plan) throws Exception {Log.e(TAG, "accept() returned: " +plan.getName()+plan.getTime()+""+plan.getContent() );}});}

结果:


6748497-70a3c40b1725180d.png
image.png

3.3组合操作符

含义:将多个被观察组合 & 将它们需要发送的事件合拼。

应用种类
组合多个被观察者,合并事件 concatArray()(发送事件--串行) concatDelayError() mergeArray()(发送事件--并发) mergeArrayDelayError()
组合多个被观察者,组合为一个被观察者 zip() combinelatest()
发送事件前追加其他事件 startWithArray() startWith()
组合多个事件为一个事件 reduce collect()
汇总发送事件数量 count()
  • concatArray()
    concatArray可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件,而concatArray()可以发送多个。
    代码:
 Observable.concatArray(Observable.just(1,2),Observable.just(3,4),Observable.just(5,6),Observable.just(7,8),Observable.just(9,10)).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Log.e(TAG, "accept: "+integer);}});

结果:


6748497-839617abb4a4afd8.png
image.png

而mergeArray则和concatArray相反这里就不说了。

  • zip()
    zip会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。
/*** 组合操作符zip()*/private void rxJavaZip() {//intervalRange()参数依此是:从那个数开始,发送次数,首次延迟,发送间隔Observable.zip(Observable.intervalRange(1, 3, 2, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {String s = "A" + aLong;Log.e(TAG, "===================A 发送的事件 " + s);return s;}}),Observable.intervalRange(1, 4, 2, 1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {String s2 = "B" + aLong;Log.e(TAG, "===================B 发送的事件 " + s2);return s2;}}),new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) throws Exception {String res = s + s2;return res;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {Log.e(TAG, "===================组合A和B发送的事件 " + s);}});}

打印结果:


6748497-fc6385ff7b37d3a6.png
image.png

上面代码中有两个 Observable,第一个发送事件的数量为3个,第二个发送事件的数量为4个,可以发现最终接收到的事件数量是3,那么为什么第二个 Observable 没有发送第4个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。

-startWithArray & startWith
在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。

 Observable.just(5, 6, 7).startWithArray(2, 3, 4).startWith(1).subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.e(TAG, "================accept " + integer);}});

打印:


6748497-19d264206e2b988e.png
image.png
  • reduce()
    reduce,作用是将发送数据以一定逻辑聚合起来.
/*** 组合操作符reduce*/private void reduce(){Observable.just(0, 1, 2).reduce(new BiFunction < Integer, Integer, Integer > () {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {int resullt = integer + integer2;Log.e(TAG, "====================integer " + integer);Log.e(TAG, "====================integer2 " + integer2);Log.e(TAG, "====================resullt " + resullt);return resullt;}}).subscribe(new Consumer < Integer > () {@Overridepublic void accept(Integer integer) throws Exception {Log.e(TAG, "==================accept " + integer);}});}

打印:


6748497-f1a26850b2995a8a.png
image.png

看到没得,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止。

  • count()
    count,返回被观察者发送事件的数量。
  /*** 组合操作符count*/private void count (){Observable.just(1, 2, 3,4,5).count().subscribe(new Consumer < Long > () {@Overridepublic void accept(Long aLong) throws Exception {Log.e(TAG, "accept: "+aLong );}});}

结果打印


6748497-3042e3f685d2ca51.png

先整这么多吧,剩下的下一篇内容再说。

  相关解决方案