ReactiveX库的设计

前言

ReactiveX 的系列类库是一系列开源的库,他主要的设计目的是去掉复杂的callback,使用观察者模式让开发者可以简单的操作线程。官网写的不错,翻译了一下。

ReactiveX

ReactiveX是个使用观察者队列模型,集成异步事件驱动编程的类库。

ReactiveX拓展了观察者模型,支持数据、事件、运算的队列。这样你可以组织线程,同步,线程安全,并发数据结构和I/O闭环等抽象的概念,并且组织成队列。

观察着模型以一种理想的方式解决了多种类型的异步队列问题。

单一类型 多类型
同步 T getData() Iterable<T> getData()
异步 Future<T> getData() Observable<T> getData()

ReactiveX一些时候被称为函数响应式编程,但这并不准确。ReactiveX可能是函数式的,也有可能是响应式的,但是函数响应式编程是另一种方式。一个最明显的不同是,函数响应式编程是随着时间连续的对值操作,但是ReactiveX是随着时间离散的对值操作。

为什么使用观察者模型

ReactiveX观察者模型允许你在面对异步事件、组织操作,像你用数组容易一样的简单。它让你从繁乱的回调函数中解放出来,因此是你的代码更加的可读和少bug。

观察者是可组的

像Java的Futures模式一样,将会用于单一的异步执行。但是当这些技术复杂了的时候,他们开始增加一些没有必要的复杂度。

使用Futures模式去组织有条件的异步流的执行非常的困难(或者说不可能,因为每个请求变量在运行时是未知的)。当然,这是可以完成的,但是这样做很容易变得很复杂(因此容易出错),或者这种方式会过早的阻滞Future.get(),这将消除异步执行的好处。

另一方面,ReactiveX观察者模型目的是组织工作流和异步数据。

观察者是灵活的

ReactiveX不仅仅支持传送标量变量(像Future模型),而且支持无尽的事件队列流。观察者模型是一种单一的抽象概念,可以被用于任何情况。观察者模型像他的兄弟迭代器(Iterable)一样的优雅和灵活。

事件 Iterable (pull) Observable (push)
检索数据 T next() onNext(T)
发现错误 throwsException onError(Exception)
完成 !hasNext() onCompleted()

观察者并不死板

ReactiveX并不是偏向于某种并发或者异步处理。观察者可以被实现用于线程池,事件循环,非闭环I/O,Actor模型,各种你想实现框架,你的方式或者你熟悉的。客户端代码可以用异步的方式,使用观察者模型应对各种交互,无论你选择的实现是闭环还是非闭环的。

观察者模型如何实现的?

public Observable<data> getData();
  • 在同一线程是否可以同步调用?
  • 在另一个线程可以异步调用?
  • 可以分割任务给多个线程,并且以任意的顺序返回数据?
  • 是否使用Actor替代线程池?
  • 是否使用NIO模式配合事件循环做异步的网络请求?
  • 是否用事件循环把工作线程和回调线程区分开?

对于观察者来说,这都不是事儿!

最重要的:使用ReactiveX你可以改变你的思维,从根本上改变你对观察者模型的实现,不会改变你的观察模型的消费者

回调模型(callback)也有的问题

回调不允许任何闭环,解决了Future.get()过早回调的问题。因为当返回响应时,就已经准备执行了回调了,这样当然有效率的。

但是相比较Futures模型回调在单一的异步回调比较好用,但在复杂的多层结构回调就笨拙了。

ReactiveX实现了多种语言

ReactiveX现在在各种语言上实现了,在兼顾各种语言的特性的基础上,更多语言的版本在编写。

Reactive编程

ReactiveX提供了收集操作,你可以筛选,选择,变形,变形,组合,和组合观察者。这是的有效率的执行和组合。

你可以想观察者类的“push”操作和Iterable的“pull”操作是相同的。Iterable是从生产者获取消费者的数据,并且线程闭环直到数据到达。相反的,观察者是生产者推送数据给消费者,无论数据是否可以获得。这样的方法更加的灵活,因为数据可以传送同步或者异步。

示例代码展示了相同序列的函数可以对于迭代器和观察者是怎么样实现的

Iterable

getDataFromLocalMemory()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .forEach({ println "next => " + it })

Observable

getDataFromNetwork()
  .skip(10)
  .take(5)
  .map({ s -> return s + " transformed" })
  .subscribe({ println "onNext => " + it })

观察者模型增加了两个缺失的语义“the Gang of Four’s Observer pattern”,为了匹配迭代器的类型:

  • 在数据最后一条的情况下,生产者发送信号给消费者 (迭代器正常的结束循环,而观察者模型使用onCompleted方法)

  • 在有错误发生的时候,生产者抛出异常信号给消费者。(迭代器抛出异常,而观察者模型使用onError方法)

综上,ReactiveX综合了迭代器和观察者模型。这两种的模式最直接的使用区别是哪种数据流。这是十分重要的,因为你可以使用在迭代器上的操作同样适合在观察者上。