FP / RP:
- FP 是一种编程范式,可以帮助我们更好构建反应式应用程序。FP 的几大核心概念,如不可变性、引用透明、函数作为一等公民,可以帮助解决一些并行、并发的问题
- RP 本质是数据的流转和处理,需要我们把逻辑抽象理解成一个个节点,业务逻辑就是通过编排这些节点,以及基本的操作符,表达出具体的业务逻辑。最后节点串联在一起,像河流一样,最后合流成最终结果
JDK8 之前依赖各种集合框架,用循环语句处理数据
List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);
for (Integer i: list) {
System.out.println(i * i);
}
JDK8
- 引入了函数式编程的概念,增加 Steam / Lambda,
LongStream.range(0, 100).map(lt -> lt * lt).forEach(System.out::println);
FP 的特质是不变性,函数不会改变入参的值,而是返回新的值。由于这个特质,所以循环在纯函数式编程中是被摒弃的,替代的就是递归的方案,但是这样会降级编程效率,所以就引入了高阶函数,lambda 表达式等概念,还有非常强大的集合类 - IO 模型,异步化实现;Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.
Reactive 实现框架(Java)
- Java 9 Flow API
- RxJava
- Reactor
The Reactive Manifesto
- 我们需要系统具备以下特质:即时响应性 ( Responsive )、回弹性 ( Resilient )、适应性/弹性 ( Elastic )、消息驱动 ( Message Driven )
关于 Back Pressure(回压、反向压力)
- 大部分异步编程(包括 PR)可以抽象成生产者消费者模型,当生产者提交任务的速度大于消费者处理的速度,出现 pressure. 传统解决方案(如 MQ),将 pressure 放在消费者一端:缓存(线程池的任务队列)、丢弃(RejectedExecutionHandler)。BP 是将处理放在生产者端
- 关联概念:buffer / throttling / sample
RP 优劣势:
- RP 不会提高数据处理速度,由于上下文切换、比 Blocking 模型慢,但有更好的并行处理能力,能更好利用多核处理器性能。
- 测试困难,可读性低,有一定开发门槛
RxJS
Observable
类似一个 Generator
,产生一个流,并通过 next
, error
, complete
来发送值和通知
const observable = Rx.Observable.create(ob => {
ob.next(1);
ob.next(2);
// end
ob.complete();
});
Operators(操作符)
用来转换、过滤、组合等对流进行操作的函数,所有的操作符都是惰性求值的过程,实际上流并没有触发
const observableWithOps = observable.filter(it => it === 2).take(1);
Observer
对流进行监听的一个对象,包括 next
, error
, complete
三个方法来对应 Observable
的通知
const observer = {
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('complete'),
};
Subscription
建立 Observable
和 Observer
的订阅关系,只有当订阅关系建立、流中的操作符开始执行
observable.subscribe(observer); // 输出: 1 2 complete
observableWithOps.subscribe(observer); // 输出: 2 complete
Subject
Subject
是 subscription
的集合,可以通过 Subject
实现多播;同时 Subject
也是一个 Observable
const subject = new Rx.Subject();
subject.subscribe(v => console.log('one: ' + v));
subject.subscribe(v => console.log('two: ' + v));
subject.next(1); // 输出: one: 1 two: 1
Operators
1. 创建流
Rx.Observable.of(1); // 从值创建
Rx.Observable.from([1, 2]); // 从数组创建
Rx.Observable.fromPromise(Promise.resolve(2)); // 从 Promise 创建
Rx.Observable.fromEvent(document, 'click'); // 从事件创建
Rx.Observable.create(ob => {ob.next(1); ob.complete();}); // 自定义创建
Rx.Observable.interval(1000); // 从定时器创建
2. 数据操作
Rx.Observable.from([1, 2]).map(val => val + 1);
Rx.Observable.from([1, 2]).mapTo(1); // 转换为固定值
Rx.Observable.from([1, 2]).scan((x, y) => x + y, 0); // x为当前的值,y为缓存的值,可以用来做计数器
Rx.Observable.of({a: 1}).pluck('a'); // 从数据中取某些属性的值
// 过滤
Rx.Observable.from([1, 2]).filter(x => x === 1);
// 去重
Rx.Observable.from([1, 1, 2, 2]).distinct(); // 1 2
Rx.Observable.from([1, 1, 2, 2, 1, 1, 3, 3]).distinctUntilChanged(); // 1 2 1 3
// 取符合条件的值,以take举例,skip, skipLast, skipWhile, skipUntil同理
Rx.Observable.from([1, 2]).take(1);
Rx.Observable.from([1, 2]).takeLast(1);
Rx.Observable.from([1, 2]).takeWhile(x => x === 2);
Rx.Observable.interval(1000)
.takeUntil(Rx.Observable.interval(2500)); // 除非另一个Observable发出值,不然一直取值。log: 0 1
// 多个流合并
const ob1 = Rx.Observable.interval(500).take(3);
const ob2 = Rx.Observable.interval(600).take(3);
Rx.Observable.merge(ob1, ob2); // 并行执行流,0 0 1 1 2 2
Rx.Observable.concat(ob1, ob2); // 串行执行流,0 1 2 0 1 2
Rx.Observable.combineLatest(ob1, ob2); // 将多个流最新的值组成数组 [0, 0] [1, 0] [1, 1] [2, 1] [2, 2]
Rx.Observable.forkJoin(ob1, ob2); // 多个流结束后,将最新的值组成数组 [2, 2]
Rx.Observable.zip(ob1, ob2); // 将多个流按顺序一一组合,直到其中有流结束 [0, 0] [1, 1] [2, 2]
在流中会出现值也为 Observable
的情况,如
Rx.Observable.from([1, 2]).map(val => Rx.Observable.of(val))
称为高阶 Observable
,如果想把高阶 Observable
转化为一阶 Observable
,需要 concatAll
mergeAll
这种操作符
// concatAll 处理和 concat 相似,mergeAll / combineAll / zipAll 同理
Rx.Observable.from([1, 2]).map(val => Rx.Observable.of(val)).concatAll() // 1 2
// 另外还有 switch 可以取消前一个 Observable,可以应用在触发新请求时取消前一个请求
Rx.Observable.interval(600).take(3)
.map(val => Rx.Observable.interval(500).take(3))
.switch() // 0 0 0 1 2
// 而 map 和 concatAll 两个操作可以整合为一个操作 concatMap
// mergeMap / switchMap / concatMapTo / mergeMapTo / switchMapTo 同理
Rx.Observable.from([1, 2]).concatMap(val => Rx.Observable.of(val)) // 1 2
处理中
// 节流
Rx.Observable.fromEvent(input, 'input').debounce(() => Rx.Observable.interval(1000));
Rx.Observable.interval(1000).debounceTime(600);
Rx.Observable.interval(1000).throttle(() => Rx.Observable.interval(2000));
Rx.Observable.interval(1000).throttleTime(600);
// buffer
// 直到流结束,当传给 buffer 的 Observable 发出通知时,取 buffer 的值
Rx.Observable.interval(500).take(5).buffer(Rx.Observable.interval(1100)) // [0, 1] [2, 3]
Rx.Observable.interval(500).take(5).bufferCount(3) // 每n个流的值,取buffer的值,[0, 1, 2] [3, 4]
Rx.Observable.interval(500).take(5).bufferTime(1100) // 每n个流的值,取buffer的值,[0, 1] [2, 3] [4]
// 每当返回的 Observable 发出通知,返回 buffer 区内的值,可能为 []
Rx.Observable.interval(500).take(5).bufferWhen(() => Rx.Observable.interval(500 * Math.random()))
// window 和 buffer 类似,不同点在于返回的为高阶 Observable,需要打平
// windowCount / windowTime / windowWhen 同理
Rx.Observable.interval(500).take(5).window(Rx.Observable.interval(1100)).concatAll() // 0 1 2 3 4
错误处理
// 可以返回 Observable 继续执行
Rx.Observable.create(ob => {throw 'Error'}).catch((err) => Rx.Observable.of(2))
// 可以重试
Rx.Observable.create(ob => {throw 'Error'}).catch((err) => caught)
// 可以直接抛异常
Rx.Observable.create(ob => {throw 'Error'}).catch((err) => {throw err})
// 重试5次
Rx.Observable.create(ob => {throw 'Error'}).retry(5)
RxJS 具体使用
处理 input 的输入事件,请求数据
<!-- html -->
<input id="j-input"></input>
Rx.Observable.fromEvent(document.getElementById('j-input'))
.debounceTime(500)
.pluck('target', 'value')
.distinctUntilChanged()
.switchMap((val) => Rx.Observable.fromPromise(fetch('/something?data=' + val)))
.catch(e => throw(e))
.subscribe(console.log
WebSocket 中合成多个数据源的信息
const socket = io('http://127.0.0.1:8080');
Rx.Observable.merge(
Rx.Observable.fromEvent(socket, 'data1'),
Rx.Observable.fromEvent(socket, 'data2'),
).combineLatest().subscribe(console.log)