当前位置: 代码迷 >> 综合 >> rxjava2.0背压实验测试总结
  详细解决方案

rxjava2.0背压实验测试总结

热度:18   发布时间:2023-12-14 01:57:00.0

本文参考了:

http://www.jianshu.com/p/1f4867ce3c01

1.

BackpressureStrategy.DROP
 
 
解释:如果缓冲区128条已经填满,此时新产生的事件将会直接被丢弃,直到缓冲区又有空的位置,那么此时再次新产生的事件有可以入队。
 
2.LATEST
解释:经过测试认为,这个设置是说保证最后一条不被丢弃,但是接收方能否取到不一定,如果接受方已经接满了,
那么此时也是不会收到,如果没有接满,自然会收到最后一条
 
3.ERROR
解释:如果缓冲区128条已经满了,此时新的事件没有地方放,此时会出现
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
3.BUFFER
表示缓冲区128用满了,再扩展,但是消耗内存,具有OOM风险,使用要谨慎
 
public class MainActivity extends AppCompatActivity {private String TAG="mainactivity";@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);findViewById(R.id.startTv).setOnClickListener(new View.OnClickListener() {@Overridepublic void onClick(View v) {test2();}});}private  void test1(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> e) throws Exception {while (true){e.onNext(1);}}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {Thread.sleep(2000);System.out.println(integer);}});}private void test2(){Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {@Overridepublic void subscribe(FlowableEmitter<Integer> emitter) throws Exception {int i=0;while (i<200){i++;Thread.sleep(10);emitter.onNext(i);Log.d(TAG, "emit "+i);}}}, BackpressureStrategy.DROP); //增加了一个参数//LATEST:根据测试现象应当理解为保证最后一条数据不被丢弃,而绝不是像"总能使消费者能够接收到生产者产生的最后一个事件"说法 ,//实际验证是如此,例如上面最后一个策略如果是DROP,那么其实只能收到128条,最后一条产生的时候也没能入队,所以被丢弃// 如果设置ERROR,io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests//如果缓冲128个用完,新的事件无处存储时候,就会发生此异常Subscriber<Integer> subscriber = new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {Log.d(TAG, "onSubscribe");//  s.request(20);//策略BUFFER ,消费能力20 结果只收到20条,但是数据依然在发射// s.request(0);//策略BUFFER ,消费能力0 结果收不到数据,但是数据依然在发射s.request(129);//策略DROP,消费能力20,结果只收到连续的20条,但是数据依然在发射//策略DROP,消费能力10000,结果数据128条之后,发现直接抛到844了,这说明默认缓冲区满了之后//129条没有能进入到缓冲区,为什么呢,因为缓冲区满了,库存就那么大,根据DROP策略,只能丢弃//简单的说,DROP可能导致新产生的事件被丢弃,收到的事件可能会断层}@Overridepublic void onNext(Integer integer) {try {Thread.sleep(100);Log.d(TAG, "onNext: " + integer);} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable t) {Log.w(TAG, "onError: ", t);}@Overridepublic void onComplete() {Log.d(TAG, "onComplete");}};flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);}
}