RxJava背压

RxJava1

背压(Backpressure)

背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

  • 异步环境下
  • 不是操作符,是一种策略

如何解决?

主动拉取 通过reques(n) ,需本身支持背压策略

例如:

Observable.range()         //支持
Observable.interval()    //不支持

对于本身就不支持背压的Observable,可以根据实际业务情况使用以下方法来缓解该问题:

  • sample 这个操作符简单理解就是每隔X时间发送里最近那个事件,其他的事件浪费掉;
  • buffer 这个操作符简单理解就是把X时间内的事件打包发送;
  • onBackpressurebuffer 把observable发送出来的事件做缓存,当request方法被调用的时候,给下层流发送一个item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常);
  • onBackpressureDrop 将observable发送的事件抛弃掉,直到subscriber再次调用request(n)方法的时候,就发送给它这之后的n个事件。

注:onBackpressurebuffer,onBackpressureDrop可以响应下游观察者的request(n)方法了,也就是说,使用了这两种操作符,可以让原本不支持背压的Observable“支持”背压了。

如何区分Observable是否支持背压?

rxjava1中没有确切的方法可以确切的方法进行区分,这也是rxjava1中的设计缺陷

RxJava2

解决背压问题

rxjava2源码层面上重新设计了架构,用于解决背压。

  • Flowable 支持背压
  • Observable 不支持背压

各自都有自己的一套几乎相同操作符,但不能混用。如果你有意识的使用流控,那么就用 Flowable ,否则就用 Obserable。

但他们之间支持想换转换 Flowable -> Obserable , Obserable -> Flowable.