当前位置: 代码迷 >> 综合 >> Reactor 3 (10): 数据合并concat、merge
  详细解决方案

Reactor 3 (10): 数据合并concat、merge

热度:39   发布时间:2024-02-08 02:35:47.0

由于业务需求有的时候需要将多个数据源进行合并,Reactor提供了concat方法和merge方法:

concat方法示意图:

在这里插入图片描述

merge方法示意图:
在这里插入图片描述

从图中可以很清楚的看出这两种合并方法的不同:

  • concat是合并的flux,按照顺序分别运行,flux1运行完成以后再运行flux2
  • merge是同时运行,根据时间先后运行

下面对concat和merge相关的方法进行测试,先准备测试数据:

private Flux<Integer> flux1() {return Flux.range(1,4);
}private Flux<Integer> flux2() {return Flux.range(5,8);
}private Flux<String> hotFlux1() {return flux1().map(i-> "[1]"+i).delayElements(Duration.ofMillis(10));
}private Flux<String> hotFlux2() {return flux2().map(i-> "[2]"+i).delayElements(Duration.ofMillis(4));
}

concat相关方法

concat代码演示

    @Testpublic void concatTest() throws InterruptedException {Flux.concat(hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);}

运行结果

  • 从结果可以看出先运行完flux1之后再运行flux2

在这里插入图片描述

concatWith方法

用法和concat基本相同,写法略有不同:

@Test
public void concatWithTest () {flux1().concatWith(flux2()).log().subscribe();
}

merge相关方法

merge代码演示

@Test
public void mergeTest() throws InterruptedException {Flux.merge(hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

运行结果

  • 很明显顺序和concat的区别,是按照时间先后执行

在这里插入图片描述

mergeWith用法

  • 用法和merge相同,写法不同而已
@Test
public void mergeWithTest() throws InterruptedException {hotFlux1().mergeWith(hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

mergeSequential用法

在这里插入图片描述

  • 跟concat有些相似,得到的结果类似
  • 跟concat不同在于,订阅的源是hot型,接收数据后根据订阅顺序重新排序
@Test
public void mergeSequentialTest() throws InterruptedException {Flux.mergeSequential(hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

在这里插入图片描述

结果和concat的一样都是

mergeOrdered用法

  • 合并接收之后再排序
    @Testpublic void mergeOrderedTest() throws InterruptedException {Flux.mergeOrdered(Comparator.reverseOrder(), hotFlux1(), hotFlux2()).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);}

在这里插入图片描述

combineLatest用法

在这里插入图片描述

  • 跟concat和merge不同该方法是将多个源的最后得到元素通过函数进行融合的到新的值

代码示例

@Test
public void combineLatestTest() throws InterruptedException {Flux.combineLatest(hotFlux1(), hotFlux2(), (v1, v2) -> v1 + ":" + v2).subscribe(i -> System.out.print("->"+i));Thread.sleep(200);
}

运行结果

  • 结果都是flux1和flux2中元素进行融合之后的元素
    在这里插入图片描述

代码

github

gitee

  相关解决方案