RxJava1
背压(Backpressure)
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
- 异步环境下
- 不是操作符,是一种策略
如何解决?
主动拉取 通过reques(n) ,需本身支持背压策略
例如:
Observable.range() //支持
Observable.interval() //不支持
对于本身就不支持背压的Observable,可以根据实际业务情况使用以下方法来缓解该问题:
- sample 这个操作符简单理解就是每隔X时间发送里最近那个事件,其他的事件浪费掉;
- buffer 这个操作符简单理解就是把X时间内的事件打包发送;
- onBackpressurebuffer 把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常);
- onBackpressureDrop 将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。
注:onBackpressurebuffer,onBackpressureDrop可以响应下游观察者的request(n)方法了,也就是说,使用了这两种操作符,可以让原本不支持背压的Observable“支持”背压了。
如何区分Observable是否支持背压?
rxjava1中没有确切的方法可以确切的方法进行区分,这也是rxjava1中的设计缺陷
RxJava2
解决背压问题
rxjava2源码层面上重新设计了架构,用于解决背压。
- Flowable 支持背压
- Observable 不支持背压
各自都有自己的一套几乎相同操作符,但不能混用。如果你有意识的使用流控,那么就用 Flowable ,否则就用 Obserable。
但他们之间支持想换转换 Flowable -> Obserable , Obserable -> Flowable.