有这样的情况,上游传递到下游的数据需要进行处理,然而上游推送的速度又很快,下游由于资源等原因来不及处理;如果这时还是通过不限制上游速度的方式推送数据,就会出问题,因此Reactive Streams有两一种处理方式,就是通过request的机制向上游传递信号,并指定接收数量;通过这种方法将push模型转化为push-pull hybrid,这就是backpressure的用法。
通过编写Subscriber实现backpressure
下面介绍backpressure比较原始的写法,通过构建Subscriber控制request的大小:
@Test
public void rawBackPressure () {Flux<String> flux = Flux.range(1,10).map(i -> String.valueOf(i)).log();flux.subscribe(new Subscriber<String>() {private int count = 0;private Subscription subscription;private int requestCount = 2;@Overridepublic void onSubscribe(Subscription s) {this.subscription = s;s.request(requestCount); // 启动}@SneakyThrows@Overridepublic void onNext(String s) {count++;if (count == requestCount) { // 通过count控制每次request两个元素Thread.sleep(1000);subscription.request(requestCount);count = 0;}}@Overridepublic void onError(Throwable t) {}@Overridepublic void onComplete() {}});
}
- 通过使用count对控制request的触发
- 指定每次request的大小,实际生产中可以根据资源情况动态调节
通过编写BaseSubscriber实现backpressure
- 可以使用功能更多的BaseSubscriber类处理背压逻辑,因为BaseSubscriber的一些方法可以简化处理的逻辑
@Test
public void baseBackPressure () {Flux<Integer> flux = Flux.range(1,10).log();flux.subscribe(new BaseSubscriber<Integer>() {private int count = 0;private final int requestCount = 2;@Overrideprotected void hookOnSubscribe(Subscription subscription) {request(requestCount);}@SneakyThrows@Overrideprotected void hookOnNext(Integer value) {count++;if (count == requestCount) { // 通过count控制每次request两个元素Thread.sleep(1000);request(requestCount);count = 0;}}});
}
运行结果一样,request(2)之后执行两个next
通过使用limitRate()方法实现
- 通过flux的limitrate方式实现调整request数量
@Test
public void backPressureLimitRate(){Flux.range(1,10).log().limitRate(2).subscribe();
}
运行结果跟上面一样:
代码