当前位置: 代码迷 >> 综合 >> rxjava 操作符详解
  详细解决方案

rxjava 操作符详解

热度:2   发布时间:2023-12-13 00:17:38.0

Observable的 combineLatest

只看上面的图片不知道操作符是什么意思?下面通过源码分析这个操作符的意思

Observable的 combineLatest最终都会返回ObservableCombineLatest对象。看一下他的subscribeActual

public void subscribeActual(Observer<? super R> observer) {
    ObservableSource<? extends T>[] sources = this.sources;int count = 0;if (sources == null) {
    //sourcesIterable转化为sourcessources = new Observable[8];for (ObservableSource<? extends T> p : sourcesIterable) {
    if (count == sources.length) {
    ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];System.arraycopy(sources, 0, b, 0, count);sources = b;}sources[count++] = p;}} else {
    count = sources.length;}if (count == 0) {
    EmptyDisposable.complete(observer);return;}LatestCoordinator<T, R> lc = new LatestCoordinator<T, R>(observer, combiner, count, bufferSize, delayError);lc.subscribe(sources);
}

上面只是简单获取了他的ObservableSource个数 然后调用LatestCoordinator的subscribe方法

static final class LatestCoordinator<T, R> extends AtomicInteger implements Disposable {
    private static final long serialVersionUID = 8567835998786448817L;final Observer<? super R> downstream;final Function<? super Object[], ? extends R> combiner;final CombinerObserver<T, R>[] observers;Object[] latest;final SpscLinkedArrayQueue<Object[]> queue;final boolean delayError;volatile boolean cancelled;volatile boolean done;final AtomicThrowable errors = new AtomicThrowable();int active;int complete;@SuppressWarnings("unchecked")LatestCoordinator(Observer<? super R> actual,Function<? super Object[], ? extends R> combiner,int count, int bufferSize, boolean delayError) {
    this.downstream = actual;this.combiner = combiner;this.delayError = delayError;this.latest = new Object[count];//根据ObservableSource的数量初始化相同数量的CombinerObserverCombinerObserver<T, R>[] as = new CombinerObserver[count];for (int i = 0; i < count; i++) {
    as[i] = new CombinerObserver<T, R>(this, i);}this.observers = as;this.queue = new SpscLinkedArrayQueue<Object[]>(bufferSize);}public void subscribe(ObservableSource<? extends T>[] sources) {
    Observer<T>[] as = observers;int len = as.length;//调用下游的onSubscribedownstream.onSubscribe(this);//开始订阅每个ObservableSourcefor (int i = 0; i < len; i++) {
    if (done || cancelled) {
    return;}sources[i].subscribe(as[i]);}}
}

上面的代码订阅了声明的多个ObservableSource,这里直接新建了CombinerObserver来处理订阅结果。注意每个CombinerObserver和ObservableSource是一一绑定的。
现在看一下CombinerObserver是怎么处理的

static final class CombinerObserver<T, R> extends AtomicReference<Disposable> implements Observer<T> {
    private static final long serialVersionUID = -4823716997131257941L;final LatestCoordinator<T, R> parent;final int index;CombinerObserver(LatestCoordinator<T, R> parent, int index) {
    this.parent = parent;this.index = index;}@Overridepublic void onSubscribe(Disposable d) {
    DisposableHelper.setOnce(this, d);}//@Overridepublic void onNext(T t) {
    parent.innerNext(index, t);}@Overridepublic void onError(Throwable t) {
    parent.innerError(index, t);}@Overridepublic void onComplete() {
    parent.innerComplete(index);}public void dispose() {
    DisposableHelper.dispose(this);}}

上面的代码直接回调了订阅结果,看一下LatestCoordinator的innerNext

void innerNext(int index, T item) {
    boolean shouldDrain = false;synchronized (this) {
    //这里存储每个ObservableSource数组onnext返回的结果,每个object对应一个ObservableSourceObject[] latest = this.latest;if (latest == null) {
    return;}//active表示ObservableSource数组中有几个执行完了Object o = latest[index];int a = active;if (o == null) {
    active = ++a;}//每次有最新值都替换保存latest[index] = item;//数组中的ObservableSource都执行完了if (a == latest.length) {
    queue.offer(latest.clone());shouldDrain = true;}}if (shouldDrain) {
    drain();}
}

latest的长度和ObservableSource数组的长度一样 ,对应保存每个ObservableSource的返回值。上面的代码是当所有ObservableSource都有返回值了才会执行下一步

void drain() {
    if (getAndIncrement() != 0) {
    return;}final SpscLinkedArrayQueue<Object[]> q = queue;final Observer<? super R> a = downstream;final boolean delayError = this.delayError;int missed = 1;for (;;) {
    for (;;) {
    if (cancelled) {
    clear(q);return;}//delayError==false 的时候 只要出错直接返回if (!delayError && errors.get() != null) {
    cancelSources();clear(q);a.onError(errors.terminate());return;}boolean d = done;//结果数组 这里是都不为nullObject[] s = q.poll();boolean empty = s == null;if (d && empty) {
    clear(q);Throwable ex = errors.terminate();if (ex == null) {
    a.onComplete();} else {
    a.onError(ex);}return;}if (empty) {
    break;}R v;try {
    //调用传进来的Function方法v = ObjectHelper.requireNonNull(combiner.apply(s), "The combiner returned a null value");} catch (Throwable ex) {
    Exceptions.throwIfFatal(ex);errors.addThrowable(ex);cancelSources();clear(q);ex = errors.terminate();a.onError(ex);return;}//返回结果给下游。a.onNext(v);}missed = addAndGet(-missed);if (missed == 0) {
    break;}}
}

从上面可知道 当拿到所有ObservableSource返回的值之后, 直接调用Function方法获取结果,再把结果传递给下游。

自此,就知道这个操作符的作用是,等待所有的ObservableSource返回值,当每个ObservableSource都有返回值之后,在拿这些所有值进行操作

Observable.combineLatest(Observable.just(1, 2, 3), Observable.intervalRange(1, 3, 2, 2, TimeUnit.SECONDS), new BiFunction<Integer, Long, Long>() {
    @Overridepublic Long apply(Integer first, Long second) throws Exception {
    return first * second;}}).subscribe(new Consumer<Long>() {
    @Overridepublic void accept(Long value) throws Exception {
    //pring value}});

因此 上面的结果就是 3 6 9 ,由于第二个ObservableSource延迟执行 第一个ObservableSource,一直都是3.