FP / RP:
- FP 是一种编程范式,可以帮助我们更好构建反应式应用程序。FP 的几大核心概念,如不可变性、引用透明、函数作为一等公民,可以帮助解决一些并行、并发的问题
- RP 本质是数据的流转和处理,需要我们把逻辑抽象理解成一个个节点,业务逻辑就是通过编排这些节点,以及基本的操作符,表达出具体的业务逻辑。最后节点串联在一起,像河流一样,最后合流成最终结果
JDK8 之前依赖各种集合框架,用循环语句处理数据
List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);for (Integer i: list) { System.out.println(i * i);}
JDK8
- 引入了函数式编程的概念,增加 Steam / Lambda,
LongStream.range(0, 100).map(lt -> lt * lt).forEach(System.out::println);FP 的特质是不变性,函数不会改变入参的值,而是返回新的值。由于这个特质,所以循环在纯函数式编程中是被摒弃的,替代的就是递归的方案,但是这样会降级编程效率,所以就引入了高阶函数,lambda 表达式等概念,还有非常强大的集合类 - IO 模型,异步化实现;Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
Reactive 实现框架(Java)
- Java 9 Flow API
- RxJava
- Reactor
The Reactive Manifesto
- 我们需要系统具备以下特质:即时响应性 ( Responsive )、回弹性 ( Resilient )、适应性/弹性 ( Elastic )、消息驱动 ( Message Driven )
关于 Back Pressure(回压、反向压力)
- 大部分异步编程(包括 PR)可以抽象成生产者消费者模型,当生产者提交任务的速度大于消费者处理的速度,出现 pressure. 传统解决方案(如 MQ),将 pressure 放在消费者一端:缓存(线程池的任务队列)、丢弃(RejectedExecutionHandler)。BP 是将处理放在生产者端
- 关联概念:buffer / throttling / sample
RP 优劣势:
- RP 不会提高数据处理速度,由于上下文切换、比 Blocking 模型慢,但有更好的并行处理能力,能更好利用多核处理器性能。
- 测试困难,可读性低,有一定开发门槛
RxJS
Observable
类似一个 Generator,产生一个流,并通过 next, error, complete 来发送值和通知
const observable = Rx.Observable.create(ob => { ob.next(1); ob.next(2); // end ob.complete();});
Operators(操作符)
用来转换、过滤、组合等对流进行操作的函数,所有的操作符都是惰性求值的过程,实际上流并没有触发
const observableWithOps = observable.filter(it => it === 2).take(1);
Observer
对流进行监听的一个对象,包括 next, error, complete 三个方法来对应 Observable 的通知
const observer = { next: x => console.log(x), error: err => console.error(err), complete: () => console.log('complete'),};
Subscription
建立 Observable 和 Observer 的订阅关系,只有当订阅关系建立、流中的操作符开始执行
observable.subscribe(observer); // 输出: 1 2 completeobservableWithOps.subscribe(observer); // 输出: 2 complete
Subject
Subject 是 subscription 的集合,可以通过 Subject 实现多播;同时 Subject 也是一个 Observable
const subject = new Rx.Subject();subject.subscribe(v => console.log('one: ' + v));subject.subscribe(v => console.log('two: ' + v));subject.next(1); // 输出: one: 1 two: 1
Operators
1. 创建流
Rx.Observable.of(1); // 从值创建Rx.Observable.from([1, 2]); // 从数组创建Rx.Observable.fromPromise(Promise.resolve(2)); // 从 Promise 创建Rx.Observable.fromEvent(document, 'click'); // 从事件创建Rx.Observable.create(ob => {ob.next(1); ob.complete();}); // 自定义创建Rx.Observable.interval(1000); // 从定时器创建
2. 数据操作
Rx.Observable.from([1, 2]).map(val => val + 1);Rx.Observable.from([1, 2]).mapTo(1); // 转换为固定值Rx.Observable.from([1, 2]).scan((x, y) => x + y, 0); // x为当前的值,y为缓存的值,可以用来做计数器Rx.Observable.of({a: 1}).pluck('a'); // 从数据中取某些属性的值// 过滤Rx.Observable.from([1, 2]).filter(x => x === 1);// 去重Rx.Observable.from([1, 1, 2, 2]).distinct(); // 1 2Rx.Observable.from([1, 1, 2, 2, 1, 1, 3, 3]).distinctUntilChanged(); // 1 2 1 3// 取符合条件的值,以take举例,skip, skipLast, skipWhile, skipUntil同理Rx.Observable.from([1, 2]).take(1);Rx.Observable.from([1, 2]).takeLast(1);Rx.Observable.from([1, 2]).takeWhile(x => x === 2);Rx.Observable.interval(1000) .takeUntil(Rx.Observable.interval(2500)); // 除非另一个Observable发出值,不然一直取值。log: 0 1// 多个流合并const ob1 = Rx.Observable.interval(500).take(3);const ob2 = Rx.Observable.interval(600).take(3);Rx.Observable.merge(ob1, ob2); // 并行执行流,0 0 1 1 2 2Rx.Observable.concat(ob1, ob2); // 串行执行流,0 1 2 0 1 2Rx.Observable.combineLatest(ob1, ob2); // 将多个流最新的值组成数组 [0, 0] [1, 0] [1, 1] [2, 1] [2, 2]Rx.Observable.forkJoin(ob1, ob2); // 多个流结束后,将最新的值组成数组 [2, 2]Rx.Observable.zip(ob1, ob2); // 将多个流按顺序一一组合,直到其中有流结束 [0, 0] [1, 1] [2, 2]
在流中会出现值也为 Observable 的情况,如
Rx.Observable.from([1, 2]).map(val => Rx.Observable.of(val))
称为高阶 Observable,如果想把高阶 Observable 转化为一阶 Observable,需要 concatAll mergeAll 这种操作符
// concatAll 处理和 concat 相似,mergeAll / combineAll / zipAll 同理Rx.Observable.from([1, 2]).map(val => Rx.Observable.of(val)).concatAll() // 1 2// 另外还有 switch 可以取消前一个 Observable,可以应用在触发新请求时取消前一个请求Rx.Observable.interval(600).take(3) .map(val => Rx.Observable.interval(500).take(3)) .switch() // 0 0 0 1 2 // 而 map 和 concatAll 两个操作可以整合为一个操作 concatMap// mergeMap / switchMap / concatMapTo / mergeMapTo / switchMapTo 同理Rx.Observable.from([1, 2]).concatMap(val => Rx.Observable.of(val)) // 1 2
处理中
// 节流Rx.Observable.fromEvent(input, 'input').debounce(() => Rx.Observable.interval(1000));Rx.Observable.interval(1000).debounceTime(600);Rx.Observable.interval(1000).throttle(() => Rx.Observable.interval(2000));Rx.Observable.interval(1000).throttleTime(600);// buffer// 直到流结束,当传给 buffer 的 Observable 发出通知时,取 buffer 的值Rx.Observable.interval(500).take(5).buffer(Rx.Observable.interval(1100)) // [0, 1] [2, 3]Rx.Observable.interval(500).take(5).bufferCount(3) // 每n个流的值,取buffer的值,[0, 1, 2] [3, 4]Rx.Observable.interval(500).take(5).bufferTime(1100) // 每n个流的值,取buffer的值,[0, 1] [2, 3] [4]// 每当返回的 Observable 发出通知,返回 buffer 区内的值,可能为 []Rx.Observable.interval(500).take(5).bufferWhen(() => Rx.Observable.interval(500 * Math.random()))// window 和 buffer 类似,不同点在于返回的为高阶 Observable,需要打平// windowCount / windowTime / windowWhen 同理Rx.Observable.interval(500).take(5).window(Rx.Observable.interval(1100)).concatAll() // 0 1 2 3 4
错误处理
// 可以返回 Observable 继续执行Rx.Observable.create(ob => {throw 'Error'}).catch((err) => Rx.Observable.of(2))// 可以重试Rx.Observable.create(ob => {throw 'Error'}).catch((err) => caught)// 可以直接抛异常Rx.Observable.create(ob => {throw 'Error'}).catch((err) => {throw err})// 重试5次Rx.Observable.create(ob => {throw 'Error'}).retry(5)
RxJS 具体使用
处理 input 的输入事件,请求数据
<!-- html --><input id="j-input"></input>Rx.Observable.fromEvent(document.getElementById('j-input')) .debounceTime(500) .pluck('target', 'value') .distinctUntilChanged() .switchMap((val) => Rx.Observable.fromPromise(fetch('/something?data=' + val))) .catch(e => throw(e)) .subscribe(console.log
WebSocket 中合成多个数据源的信息
const socket = io('http://127.0.0.1:8080');Rx.Observable.merge( Rx.Observable.fromEvent(socket, 'data1'), Rx.Observable.fromEvent(socket, 'data2'),).combineLatest().subscribe(console.log)