当前位置: 代码迷 >> 综合 >> RxJava 2.x 学习
  详细解决方案

RxJava 2.x 学习

热度:46   发布时间:2023-11-09 17:42:30.0

参照书籍 : 《 RxJava 2.x 实战 》

观察者模式名词解释

Observable : 可观察的,被观察的,被观察者,事件和数据的源头

Observer :观察者,数据最终的流向

subscribe :订阅

在 Rx.Java 中, 被观察者、观察者、subscribe()方法三者缺一不可 ,只有使用了 subscribe(), 被观察者才会开始发送数据,这点极为重要

RxJava 5种观察者模式;

Observable 与 Observer :能够发射 0 或 n 个数据,并以成功或错误事件终止。

Flowable 与 Subscriber :能够发射 0 或 n 个数据,并以成功或错误事件终止。 支持背压,可以控制数据源发射的速度

Single 与 Single Observer :只发射单个数据或错误事件

Completable 与 Completable Observer :从来不发射数据,只处理Oncomplete,OnError事件。可以看成是Rx的Runnable

Maybe 与 MaybeObserver :能够发射 0 或者 1 个数据.要么成功,要么失败 有点类似于Optional

RxJava do操作符;

do 操作符 :可以给 Observable 生命周期的各个阶段加上一系列的回调监听

doOnSubscribe:一旦观察者订阅了 Observable 它就会被调用

doOnLifecycle : 可以在观察者订阅之后 设置是否取消订阅

doOnNext :它产生的 Observable 每发射一项数据就会调用它一次, 它的 Consumer 接受发射的数据项。一般 doOnNext 用于在 subscribe 之前对数据进行处理

doOnEach : 它产生的 Observable 每发射 项数据就会调用它一次 ,不仅包括 onNext ,还包括 onError 和 onCompleted

doAfterNext :在 onNext 之后执行 doOnNext()是在 onNext 之前执行

doOnComplete :当它产生的 Observable 在正常终止调用 onComplete 会被调用

doFinally :在当它产生的 Observable 终止之后会被调用, 无论是正常终止还是异常终止。 doFinally 优先 doAfterTerminate 的调用

doAfterTerminate :注册一个 Action,当 Observable 调用 onComplete 和 onError 触发

Observable

分类

  • Hot Observable

Hot Observable 对于一个Observer(观察者)而言, 只能获取 从它开始订阅 ( subscribe(观察者) ) 之后 发射出去的数据
通常意义上讲,一个hot Observable ,甚至在没有订阅者接受数据的情况下,也可以 发射数据 或 发生事件 (这一点同 “没被订阅(subscribe(观察者))之前 什么都不会发生 ” 的规则有冲突)
即 : 无论有没有观察者进行订阅,事件始终都会发生;

当 Hot Observable 有多个订阅者时(多个观察者进行订阅时) , Hot Observable 与订阅者们的关系是 一对多的关系,可以与多个订阅者共享信息。
叫做 广播模式

  • Cold Observable

Cold Observable 对于一个Observer(观察者)而言, 能获取从头开始所有的的数据,即 能够获取 订阅前后的全部数据和事件

只有观察者订阅了,才开始执行发射数据流的代码;(这条就是 通用规则 “没被订阅(subscribe(观察者))之前 什么都不会发生”)

Cold Observable 和 Observer 只能是 1 对 1 的关系,当有多个不同的订阅者时,消息是重新完整发迭。
也就是说,对 Cold Observable 而言 ,有多个 Observer 的时候,它们各自的事件是独立的。
叫做 点对点模式

Observable 的 just() creat() range() fromXXX() 等操作符都能生成 Cold Observable

  • Cold Observable 转换成 Hot Observable
  1. 使用 publish ,生成 ConnectableObservable

  2. 使用 Subject/Processor: Processor是 Rx.Java 2.x 新增的类,继承自 Flowable支持背压控制( Back Pres ur ),而 Subject 则不支持背压控制。

Subject 既是 Observable ,又是 Observer Subscriber 。这一点可以从 Subject 源码上看到 ,继承自 Observable ,实现 Observer

  • Hot Observable 如何转换成 Cold Observable

1> ConnectableObservable的 refCount操作符

2> Observable的 share 操作符 : share 操作符封装了 publish{).refCount()调用

Flowable

RxJava 2.x 中, Observable 不再支持背压( Back Pressure ),而 改由 Flowable 来支持非阻塞式 背压。
  • Observable 使用场景
  1. 一般处理最大不超过 1000 条数据,并且几乎不存在内存溢出:
  2. GUI 鼠标事件,基本不会背压(可以结合 sampling/debouncing 操作):
  3. 处理同步流。
  • Flowable 使用场景

处理以某种方式产生超过 1OKB 的元素;

  1. 文件读取与分析
  2. 读取数据库记录,也是一个阻塞的和基于拉取模式;
  3. 网络 I/O 流;
  4. 创建一个响应式非阻塞接口

Single,Completable,Maybe

  • Single

Single 只有 onSuccess 和 onError 事件;

onSuccess()用于发射数据,而且只能发射一个数据,后面即使再发射数据 不会做任何处理。

Single 可以通过 toXXX 方法转换成 Observable Flowable Completable Maybe

  • Completable

Completable 只有 onComplete 和 onError 事件

Completable 在创建后,不会发射任何数据

Completable 经常结合 andThen 操作符使用。

Completable 可以通过 toXXX 方法转换成 Observable Flowable Completable Maybe

  • Maybe

是 Rx.Java 2.x 之后才有的新类型

可以看成是 Single 和 Completable 的结合

Maybe 创建之后 MaybeEmitter 和 SingleEmitter 一样,并没有 nNext()方法,同样需要 onSuccess()方法来发射数据。

Maybe 也只能发射 0 或者 1 个数据,即使发射多个数据 后面发射的数据也不会处理

Maybe 可以通过 toXXX 方法转换成 Observable Flowable Completable Maybe

Subject & Processor

Subject

RxJava Subject 种特殊的存在,它的灵活性在使用时也会伴随着风险,若是没有用好则可能会错过事件
Subject 既是 observable ,又是 Observer
Subject 是线程非安全的
  • 分类

AsyncSubject

BehaviorSubject

ReplaySubject

PublishSubject

  • AsyncSubject

Observer 会接收 AsyncSubject onComplete()之前的最后 个数据

【注意】subject.onComplete()必须要调用才会开始发送数据,否则观察者将不接收任何数据

  • BehaviorSubject

Observer 会先接 BehaviorSubject 被订阅之前的最后一个数据,再接收订阅之后发射过 来的数据。

如果 BehaviorSubject 被订阅之前没有发送任何数据,则会发送一个默认数据。

BehaviorSubject 还可以缓存最近一次发出信息的数据

  • ReplaySubject

ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。

ReplaySubject 可以缓存数据

ReplaySubject.createWithSize(n):可以控制发射的数据量

ReplaySubject 除了可以限制发射数据的数量,还能限制缓存的时间,使用 createWithTime()即可。

  • PublishSubject

Observer 只接收 PublishSubject 被订阅之后发送的数据

己经执行了onComplete()方法后,就无法发射数据了

Processor

RxJava2.0 新增的功能,它是一个接口,
继承自 Subscriber Publisher ,说明它和 Subject 一样,即是生产者,又是消费者。
它能够支持背压(Back Pressure )控制,这是 Processor 和 Subject 最大区别。
  • Reactive Streams 接口由以下四个接口组成

Publisher : 消息发布者。

Subscriber :消息订阅者。

Subscription : 一个订阅。

Processor: Publisher+ Subscriber 的结合体

RxJava 常用操作符

创建操作符、变换操作符、过滤操作符、条件操作符
布尔操作符、合并操作符、连接操作符、工具操作符

创建操作符

just() :将 1个或 n个对象 转换成一个 Observable,然后一次性发射

from() :将从Iterable, Future ,数组 转换成 一个 Observable,逐个发射每一个元素

create() :使用 函数 创建 Observable

defer() :只有当订阅者订阅才创建 Observable ,为每个订阅创建一个全新的 Observable

range() :创建一个发射指定范围的整数序列的 Observable

interval() : 创建一个按照给定的时间间隔发射整数序列的 Observable

timer() :创建一个在给定的延时之后发射单个数据的 Observable

empty() :创建一个什么都不做,直接通知完成 的 Observable

error() :创建一个什么都不做,直接通知错误 的 Observable

never() :一创建一个不发射任何数据的 Observable

  • from()

from 操作符可以将 Future, Iterable 和 Array 转换成 Obseruable

对于 Iterable 和 Array,产生的 Observable 会发射 Iterable 或 Array 的每一项数据。

对于 Future ,它会发射 Future.get()方法返回的单个数据。

  • repeat()

会重复地发射数据

repeat 不是创建一个 Observable ,而是重复发射原始 Observable 的数据序列,

repeatWhen : 有条件地重新订阅和发射原来的 Observable

repeatUntil : ,表示直到某个条件就不再重复发射数据

RxJava 的线程操作

Scheduler:调度器

在默认情况下, Rx.Java 只在当前线程中运行,它是单线程的。
Observable 生成发射数据流, Operators 加工数据流在后台线程中进行, Observer 在前台线程中接收井响应数据。
Scheduler 作用
single 使用定长为 1 的线程池( new Scheduled Thread Pool( I) ),重复利用这个线程
new Thread 每次都启用新线程 并在新线程中执行操作
computation 使用的固定的线程池( Fixed Scheduler Pool ),大小为 PU 核数,适用于 CPU 密集型计算
io 适合 IO 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和newThreadO 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下, io()比 newThread()更有效
trampoline 直接在当前线程运行,如果当前线程有其他任务正在执行,则会先暂停其他任务
from 将java.util.concurrent.Executor 转换成一个调度器实例,即可以自定义 Executor 来作为调度器

线程调度

默认情况下不做任何线程处理, Observable Observer 处于同一线程中
  • subscribeOn

用来指定 上游 数据源 运行在特定的线程调度器 Scheduler

若多次执行 subscribeOn ,则只有一次起作用

在 RxJava 链式操作 中,数据的处理是自下而上的 ,这点与数据发射正好相反

如果多次 调用 subscribeOn ,则最上面的线程切换最晚 ,所以就变成了只有第一次切换线程才有效。

  • observeOn

用来指定 下游 操作运行在特定的线程调度器 Scheduler 上。

若多次执行 observeOn ,则每次都起作用 ,线程会一直切换。

Scheduler 的测试

TestScheduler 是专门用于测试的调度器,
与其他调度器的区别是, TestScheduler 只有被调用了时间才会继续。 
TestScheduler 是一种特殊的、非线程安全的调度器,
用于测试一些不引入真实并发性、允许手动推进虚拟时间的调度器。在Rx.Java 2.x 中,原先 Rx.Java l.x Schedulers. test()被去掉了 。
要想获得 TestScheduler对象,则可以通过直接 new TestScheduler()的方式来实现。
  • TestScheduler 的方法

advanceTimeTo : 将调度器的时钟移动到某个特定时刻。
例如:scheduler.advanceTimeTo(lO , TimeUnit.MILLISECONDS) ; 时钟移动到 10ms

advanceTimeBy : 将调度程序的时钟按指定的时间向前移动

triggerActions : 不会修改时间,它执行计划中未启动的任务,已经执行过的任务不会再启动。

变换操作符和过滤操作符

  • RxJava 变换操作符
操作 描述
map 对序列的每一项都用一个函数来变换 Observable 发射的数据序列,就是简单的做 数据元素的加工
flatMap,concatMap,flatMaplterable 将 Observable 发射的数据集合变换为Observables 集合,然后将这些 Observable 发射的数据平坦化地放进一个单独的Observable
switchMap 将 Observable 发射的数据集合变换为 Observables 集合,然后只发射这些Observables 最近发射过的数据
scan 对 Observable 发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
groupBy 将 Observable 拆分为 Observable 集合,将原始 Observable 发射的数据按Key 分组,每一个 Observable 发射过一组不同的数据。
buffer 定期从 Observable 收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window 定期将来自 Observable 的数据拆分成 Observable 窗口,然后发射这些窗口,而不是每次发射一项
cast 在发射之前强制将 Observable 发射的所有数据转换为指定类型
  • RxJava 过滤操作符
操作 描述
filter 过滤数据
takeLast 只发射最后的 n 项数据
last 只发射最后 1 项数据
lastOrDefault 只发射最后 1 项数据,如果 Observable 为空,就发射默认值。
takeLastBuffer 将最后的 n 项数据当作单个数据发射
skip 跳过开始的 n 项数据
skipLast 跳过最后的 n 项数据
take 只发射开始的 n 项数据
first,takeFirst 只发射第 1 项数据,或者满足某种条件的第 1 项数据。
firstOrDefault 只发射第 1 项数据,如果 Observable 为空,就发射默认值。
elementAt 发射第 n 项数据
elementAtOrDefault 发射第 n 项数据,如果 Observable 数据少于 n 项,就发射默认值。
sample,throttleLast 定期发射 Observable 最近的数据
throttleFirst 定期发射 Observable 发射的第 1 项数据。
throttleWithTimeout,debounce 只有当 Observable 在指定的时间段后还没有发射数据时,才发射一个数据。
timeout 如果在一个指定的时间段后还没发射数据,就发射 1 个异常
distinct 过滤掉重复的数据
distinctUntilChanged 过滤掉连续重复的数据
OfType 只发射指定类型的数据
ignoreElements 丢弃所有的正常数据,只发射错误或完成通知

条件操作符和布尔操作符

  • RxJava 条件操作符
操作 描述
amb 给定多个 Observable ,只让第 个发射数据的 Observable 发射全部数据
defaultlfEmpty 发射来自原始 Observable 的数据,如果原始 Observable 没有发射数据,则发射默认数据
skipUntil 丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,然后发射原始 Observable 的剩余数据
skipWhile 丢弃原始 Observable 发射的数据,直到 1 个特定的条件为 false,然后发射原始 Observable 剩余的数据
takeUntil 发射来自原始 Observable 的数据,直到第 2 个 Observable 发射了 1 个数据或 1 个通知
takeWhile,WhileWithlndex 发射原始 Observable 的数据,直到 1 个特定的条件为 true,然后跳过剩余的数据
  • RxJava 布尔操作符
操作 描述
all 判断是否所有的数据项都满足某个条件
contains 判断 Observable 是否会发射一个指定的值
exists,isEmpty 判断 Observable 是否发射了一个值
sequenceEqual 判断两个 Observables 发射的序列是否相等

合并操作符与连接操作符

  • RxJava 合并操作符
操作 描述
startWith
merge
mergeDelayError
zip
combineLatest
join,groupJoin
switchOnNext
  • RxJava 连接操作符
操作 描述
ConnectableObservable.connect() 指示 1 个可连接的 Observable 开始发射数据
Observable.publish() 将一个 Observable 转换为 1 个可连接的 Observable
Observable.replay() 确保所有的订阅者看到相同的数据序列,即使它们在 Observable 开始发射数据之后才订阅
ConnectableObservable.refCount() 让一个可连接的 Observable 表现得像一个普通的 Observable

RxJava 的背压

在Rx.Java 中, 会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压( Back Pressure )场景。
在 RxJava 2.x 中,只有新增的 Flowable 类型是支持背压的;在RxJava 1.x 中, Observable 是支持背压的,
  • 背压条件
  1. 背压必须是在异步的场景下才会出现,即被观察者和观察者处于不同的线程中
  2. Rx.Java 是基于 Push 模型 。

RxJava 1.x 中背压

  • RxJava 1.x 中背压的情况
  1. 在RxJava 1.x 中, Observable 是支持背压的,但不是所有的 Observable 都支持背压;
  2. Rx.Java 1.x 中, Hot Observables 是不支持背压的,而 Cold Observables 中也有一部分不支持背压。
  • RxJava 1.x 中背压问题的解决方法

1.过滤限流,通过使用限流操作符将被观察者产生的大部分事件过滤井抛弃 ,以达到限流的目的,间接降低事件发射的速度,例如使用以下操作符.

sample :在一段时间内,只处理最后一个数据
throttleF irst :在 段时间内,只处理第 个数据
debounce :发送一个数据,开始计时,到了规定时间,若没有再发送数据,则开始处理数据,反之重新开始计时。

  1. 打包缓存,在被观察者发射事件过快导致观察者来不及处理的情况下,可以使用缓存类的操作符将其中一部分打包缓存起来,再一点一点的处理其中的事件。

buffer :将多个事件打包放入一个 List 中,再一起发射
window :将多个事件打包放入一个 Observable 中,再一起发射

  1. 使用背压操作符,我们可以通过一些操作符宋转化成支持背压的 Observable

onBac ressureBuffer
onBackpressureDrop
onBackpressureLatest
onBackpressureBlock (己过期)

  • RxJava 1.x 背压设计的缺陷

1.它的缓存池很小,只有 16 ,不能处理较大的并发事件。
2.Rx.Java 1.x 上游(被观察者)无法得知下游(观察者)对事件的 处理能力和 处理进度,只能把事件一次性抛给下游

RxJava 2.x 中背压

Rx.Java 2.x 中, Observable 不再支持背压,而是改用 Flowable 来专门支持背压。
默认队列大小为 128 ,并且要求所有的操作符强制支持背压。
Flowable 一共有 5 种背压策略
  • MISSING 策略

此策略表示,通过 Create 方法创建的 Flowable 没有指定背压策略,
不会对通过 OnNext 射的数据做缓存或丢弃处理
需要下游通过背压操作符 onBackpressureBuffer()/ onBackpressureDrop()/onBackpressureLatest()指定背压策略。

  • ERROR 策略

如果放入 Flowable 的异步缓存池中的数据超限了,则会抛出 MissingBackpressureException 异常。

  • BUFFER 策略

Flowable 的异步缓存 Observable 样,没有固定大小,可以无限制添加数据, 不会报 MissingBackpressureException 异常, 但会导致 OOM (内存溢出)

  • DROP 策略

此策略表示,如果 Flowable 的异步缓存池满了, 则会丢掉将要放入缓存池中的数据。

  • LATEST 策略

此策略表示,如果缓存池满了,会丢掉将要放入缓存池中的数据,这一点与 DROP 策略一样。
不同的是,不管缓存池的状态如何, LATEST 策略会将最后一条数据强行放入缓存池中。

Disposable 和 Transformer 的使用

Disposable

可以使用 Disposable 来管理一个订阅,可以使用 CompositeDisposable 管理多个订阅
  • Rx.Java 1.x 取消订阅

Rx.Java 1.x 中, observable.subscribe() 方法会返回一个 Subscription 的对象,而 Subscription 接口可以用来取消订阅。
Subscription 只需调用 unsubscribe() 就可以取消订阅。
Subscription 对象是被观察者和订阅者之间的纽带。
RxJava 使用 Subscription 取消订阅时,会停止整个调用链。
如果使用了一串很复杂的操作符,则调用 unsubscribe()将会在它当前执行的地方终止,而不需要做任何额外的工作(比如 释放资源等 )。

  • Rx.Java 2.x 取消订阅

RxJava 2.0 之后, Subscription 被改名为 io.reactivex.disposables.Disposable。
disposable .dispose() 取消订阅 。

  • CompositeDisposable

Rx.Java 1.x 中有一个复合订阅( composite subscription )的概念。
Rx.Java 2.x 中, Rx.Java 也内置了一个类似复合订阅的容器 CompositeDisposable。

每当我们得到一个 Disposable 时,就 调用 CompositeDisposable.add (), 将它添加到容器中,
在退出的时候,调用 CompositeDisposable.clear()即可切断所有的事件

Rxlifecycle 和 AutoDispose (第三方库)

  • Rxlifecycle

https://github.com/trello/RxLifecycle
仅可以在 android 平台上使用
RxLifecycle 是配合 Activity/Fragment 生命周期来管理订阅的。

  • AutoDispose

https: //github.com/uber/AutoDispose
AutoDispose 是 Uber 开源库, 它与 RxLifecycle的区别是:
它不仅可以在 android 平台上使用,还可以在 Java (企业级)平台上使用,适用的范围更宽广

Transformer在 RxJava 中的使用

Transformer :转换器 的意思

Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象
转换成
另一个 Observable/Flowable/Single/Completable/Maybe对象, 与调用一系列的内联操作符一模一样。

  • 与 compose 操作符结合使用

compose 操作符能够从数据流中得到原始的被观察者。
当创建被观察者时, compose 操作符会立即执行,而不像其他的操作符需要在 onNext()调用后才能执行

小结

本章介绍了 Disposable 以及 Transformer 的用途。
Disposable 能够管理一个订阅, CompositeDisposable 可以管理多个订阅 。
Transformer 顾名思义是转换器的意思,能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象。
虽然它们看起来风马牛不相及,但是可以借助 compose 操作符将它们联系在一起。

RxJava 的并行编程

  • jdk8 的实现

Java8 新增了井行流来实现并行的效果,只需在集合上调用 parallelStream() 方法即可。
Java 8 借助 JDK fork/join 框架来实现并行编程

RxJava 并行操作

  • 借助 flatMap 实现并行

Rx.Java 中可以借助 flatMap 操作符来实现类似于 Java8 的并行执行效果。
flaMap 操作符的原理是将这一个 Observable 转化为多个Observable ,然后再将这多个 Observable 的数据整合发射出来

  • 通过 Round-Robin 算法实现并行

Round-Robin (轮询)算法是最简单的一种 负载均衡算法,
即 借助 groupBy 操作符把 Observable 分组,然后每一组再指定单独线程执行

ParallelFlowable

Rx.Java 2.0.5 版本新增了 ParallelFlowable API ,它允许并行地执行一些操作符,
例如 map,filter,concatMap,flatMap, collect,reduce 等
  • ParallelFlowable 实现并行

RxJava 并没有 ParallelObservable ,因为 RxJava 之后, Observable 不再支持背压。 然而在并行处理中背压是必不可少的, 否者 会淹没在并行操作符的内部队列中。

ParallelFlowable 是并行的 Flowable 版本,并不是新增的被观察者类型。

在 ParallelFlowable中,很多典型的操作符( take,skip 等)是不可用的。

类似于 Java8 的井行流,在相应的操作符上调用 Flowable的 parallel()就会返回 ParallelFlowable。

默认情况下,并行级别被设置为可用 CPU 的数量( Runtime.getRuntime().availableProcessors() )

可以通过 parallel() 方法来指定并行量。

ParallelFlowable.sequential() : 并行流 返回到 顺序流

Flowable.bufferSize() : 设置顺序源的预取量

  • ParallelFlowable Scheduler

ParallelFlowable 遵循与 Flowable 相同的异步原理,因此 parallel()本身并不引入顺序源的异 步消耗,只准备并行流,
但是可以通过 runOn( Scheduler ) 操作符定义异步。
这点与 Flowable 有很大不同, Flowable 使用 subscribeOn, observeOn 操作符。

parallelFlowable.runOn() : 定义异步 ,这与 Flowable 的 subscribeOn, observeOn 操作符 一样的功能

  • ParallelFlowable 支持的操作符

map, filter, flatMap, concatMap, reduce, collect, sorted, toSortedList, compose,fromArray,
doOnCancel, doOnError, doOnComplete, doOnNext, doAfterNext, doOnSubscribe,
doAfterTerminated, doOnRequest

  • ParallelFlowable Flowable.flatMap 比较

Flowable.flatMap 实现井行的原理和 Observable.flatMap 实现并行的原理相同

  • 那么什么时候使用 flatMap 进行并行处理比较好,什么时候使用 ParallelFlowable 比较好呢?

Rx.Java 本质上是连续的,借助 flatMap 操作符进行分离和加入一个序列可能会变得很复杂,
并引起一定的开销 但是如果使用 ParallelFlowable , 则开销会更小。

然而, Paralle!Flowable 的操作符很有限,
如果有一些特殊的操作需要并行执行,而这些操作不能用 ParallelFlowable 所支持的操作符来表达,
那么就应该使用基于 Flowable.flatMap 来实 现井行

优先推荐使用 ParallelFlowable ,对于无法使用 ParallelFlowable 的操作符,则可以使用 flatMap 来实现井行