当前位置: 代码迷 >> 综合 >> 【ReactiveX】Observable 对象(译)
  详细解决方案

【ReactiveX】Observable 对象(译)

热度:3   发布时间:2023-12-14 15:11:50.0

更多内容,欢迎关注作者博客:

http://www.china10s.com/blog/?p=475

Observable 对象

在 ReactiveX 中,一个观察者向 Observable 对象订阅消息。然后这个观察者将会响应 Observable 发射的消息任何对象或者对象队列。这个模式有利于并发操作,因为他不需要在等待 Observable 发射消息时候阻塞线程。而不是创建了一个观察者式的哨兵,随时准备着适时响应未来时间 Observable 对象发射消息。
这一页解释了什么是响应模式,什么是观察者和 Observable 对象(和观察者如何订阅 Observable 对象的)。其他页面解释了你应该如何使用各种 Observable 操作来将 Observable 对象关联上,然后改变他们的行为。
这个文档将解释集中展现为“marble diagrams”。下图就是代表 Observable 对象和 Observable 对象的转换过程:

例如:
  • Single — a specialized version of an Observable that emits only a single item
  • Rx Workshop: Introduction
  • Introduction to Rx: IObservable
  • Mastering observables (from the Couchbase Server documentation)
  • 2 minute introduction to Rx by Andre Staltz (“Think of an Observable as an asynchronous immutable array.”)
  • Introducing the Observable by Jafar Husain (JavaScript Video Tutorial)
  • Observable object (RxJS) by Dennis Stoyanov
  • Turning a callback into an Rx Observable by @afterecho

背景:

在很多软件程序任务中,你或多或少都会期望你编写的指令将会执行和完成的顺序是增量的,一个接一个的,按照你编写的顺序来执行。但是,在 ReactiveX 中,很多指令将会并行执行,他们的结果将会按照任意的顺序被延迟捕获。
不通过调用函数,你可以在 Observable 对象中定义一个装置来检索和转换数据,然后绑定一个观察者在其中,用来将一个之前定义的装置引发动作,观察者作为哨兵,当他们准备好,就可以捕获并且响应他们的释放。
这种方法的优势在于,,当你有多个任务,但是任务与任务之间并没有依赖关系,这样你就能够同步开始所有任务,而不是等待各自完成知乎在开始接下来的任务。如此,你的整个任务束的执行时间长度就和你最长的那个任务时间一样长。
有很多条款可以用来描述这种同步执行程序和设计。这份文档将会使用“一个观察者订阅了一个 Observable 对象”,一个 Observable 对象通过调用监听者的方法,来发射或者推送通知给它的监听者。
在其他文档和上线文中,我们叫做“观察者”的对象,有时候被叫做“订阅者 ”,“关注者”,或者“响应者”。这个模式总体上来说常常称作“响应者模式”

建立观察者

这个页面常规伪代码来举例,但是 ReactiveX 是在多种语言中实现的。
常规调用方式,并不是按照异步顺序,而是按照并行调用在 ReactiveX 中,流程如下:
1、调用一个方法。
2、用变量存储一个方法的返回值。
3、使用变量和它的新中去干活。
或者,像这样:

在异步调用的模式中,流程更多的是像这样:
1、定义好一个方法,利用异步调用的返回值来做一些有用的事情,这个方法是观察的这一部分。
2、定义好一个异步调用作为一个 Observable 的对象。
3、通过订阅,将观察者绑定到 Observables 对象上去(这也初始化了 Observables 对象的行为)。
4、继续你的工作,直到有调用返回,Obsevables 对象发送数据,观察者的方法将会基于其数据进行相关操作。
像这样:

继续、完成、错误

订阅者的方法能够让你将观察者连接上 Observable 对象。你的观察者实现如下函数中的一部分:
onNext:
当 Observable 对象发送数据的时候,该函数就会被调用。这个函数将数据作为一个参数携带。
onError:
Observable 对象产生想要的数据失败或者遇到其他错误的时候,就会调用该方法。这个函数将错误原因作为参数携带。
onCompleted:
如果没有遇到任何问题,当 Observable 对象调用完成 onNext 方法之后,最后就会调用 onCompleted 方法。
当连接 Observable 对象之后, onNext 将会被调用零次或者多次。然后接下来就会调用 onCompleted 或者 onError,但是并不会同时调用两者,这将会是最后的调用。我们约定,当调用 onNext 的时候,通常被称作 “emissions”,当调用 onCompleted 或者 onError 的时候,被称作 “notifications”。
更详细的订阅过程如下:

取消订阅

在某些 ReactiveX 实现上,有一个特殊的接口—Subscriber,它实现了一个 unsubscribe 方法。你可以通过调用这个方法表明监听者不在关注该 Observable 对象发送的任何消息。这些 Observable 对象(如果他们没有其他观察者)就可以选择不在产生和发送新的数据。
这个 unsubscription 接口将会级联回调整个链上的操作,这些操作之前是观察者订阅在 Observable 对象上的。这也将会导致整个链上的所有连接都停止发送数据。这个操作并不能保证立即执行,然而,一个 Observable 对象也有可能在没有观察者订阅的情况下,仍旧产生和试图发送数据。

关于命名约定的提示

对于每个语言,ReactiveX 都有其特殊的实现方式。没有什么公用的命名规范,但是,这些实现方式都有一些共性。
此外,一些名称在其他上下文中有不同的意义,或者在其他特殊的语言实现中,看起来有些尴尬。
例如, onEvent 的命名模式(或者 onNext,onCompleted,onError)。在某些语境中,这表示事件已经被注册。但是在 ReactiveX 中,它代表事件本身。

“热订阅”与“冷订阅”

Observable 对象什么时候开始在他们的队列中发送数据?一个“热订阅”在产生数据的时候就会立刻把它发送出去,所以其他后续加入的订阅者将会从队列的中间接受到数据。一个“冷订阅”只会在一个订阅者订阅它之后,才会开始发送数据。所以,这类发送者能够保证看到完整的序列。
在某些 ReactiveX 的实现中,也存在一种“连接订阅”的 Observable 对象。这些对象只有在 Connect 函数1被调用的时候才会发送数据,无论被订阅没有。

订阅操作的组合

Observable 对象和 订阅者只是 ReactiveX 的开始。要想适用于一个事件队列而不是单一事件,最好能够将标准的订阅者模式进行轻微的改造才行。
真正的力量来自于“reactuve extension”(因此叫做 ReativeX)-操作,这些操作允许你转换、合并、操纵和控制Observable 对象发送的消息队列。
Rx 操作让你能够通过声明的方法组合异步队列,让你能够充分利用回调的便利性,同时,避免了通常和异步系统关联的嵌套回调的缺点。
这份文档将各种操作分组列下,对其使用方式举例:
Creating Observables
CreateDeferEmpty/ Never/ ThrowFromIntervalJustRangeRepeatStart, and  Timer
Transforming Observable Items
BufferFlatMapGroupByMapScan, and  Window
Filtering Observables
DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTake, and  TakeLast
Combining Observables
And/ Then/ WhenCombineLatestJoinMergeStartWithSwitch, and  Zip
Error Handling Operators
Catch and  Retry
Utility Operators
DelayDoMaterialize/ DematerializeObserveOnSerializeSubscribeSubscribeOn, TimeIntervalTimeoutTimestamp, and  Using
Conditional and Boolean Operators
AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntil, and  TakeWhile
Mathematical and Aggregate Operators
AverageConcatCountMaxMinReduce, and  Sum
Converting Observables
To
Connectable Observable Operators
ConnectPublishRefCount, and  Replay
Backpressure Operators
a variety of operators that enforce particular flow-control policies
这些页面包含一些并不是 ReactiveX 核心功能,但是在一个或者多个语言或者可选模块中被实现的操作。

链式操作

Observable 对象的多数操作同样会返回一个 Observable 对象。允许你能够一个接一个的执行链式操作。每个操作都能够改变前一个操作返回的 Observable 对象。
其中也包括了其他模式,比如“Builder Pattern”,通过一个特殊的类上多个函数来操作数据。这些模式也让你能够将这些操作链式执行。但好似需要注意的是,执行顺序并不是按照这些操作的在链式操作上的编码先后顺序执行,而是按照 Observable 对象的操作顺序来执行。
链式操作并不是基于初始建链产生的 Observable 对象独立运行,而是都依照于链中紧邻的上一个操作产生的 Observable 对象运行。