FP / RP:

    • FP 是一种编程范式,可以帮助我们更好构建反应式应用程序。FP 的几大核心概念,如不可变性、引用透明、函数作为一等公民,可以帮助解决一些并行、并发的问题
    • RP 本质是数据的流转和处理,需要我们把逻辑抽象理解成一个个节点,业务逻辑就是通过编排这些节点,以及基本的操作符,表达出具体的业务逻辑。最后节点串联在一起,像河流一样,最后合流成最终结果

    JDK8 之前依赖各种集合框架,用循环语句处理数据

    List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);
    for (Integer i: list) {
        System.out.println(i * i);
    }
    

    JDK8

    • 引入了函数式编程的概念,增加 Steam / Lambda,LongStream.range(0, 100).map(lt -> lt * lt).forEach(System.out::println);

      FP 的特质是不变性,函数不会改变入参的值,而是返回新的值。由于这个特质,所以循环在纯函数式编程中是被摒弃的,替代的就是递归的方案,但是这样会降级编程效率,所以就引入了高阶函数,lambda 表达式等概念,还有非常强大的集合类

    • IO 模型,异步化实现

      Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

    Reactive 实现框架(Java)

    • Java 9 Flow API
    • RxJava
    • Reactor

    The Reactive Manifesto

    • 我们需要系统具备以下特质:即时响应性 ( Responsive )、回弹性 ( Resilient )、适应性/弹性 ( Elastic )、消息驱动 ( Message Driven )

    关于 Back Pressure(回压、反向压力)

    • 大部分异步编程(包括 PR)可以抽象成生产者消费者模型,当生产者提交任务的速度大于消费者处理的速度,出现 pressure. 传统解决方案(如 MQ),将 pressure 放在消费者一端:缓存(线程池的任务队列)、丢弃(RejectedExecutionHandler)。BP 是将处理放在生产者端
    • 关联概念:buffer / throttling / sample

    RP 优劣势:

    • RP 不会提高数据处理速度,由于上下文切换、比 Blocking 模型慢,但有更好的并行处理能力,能更好利用多核处理器性能。
    • 测试困难,可读性低,有一定开发门槛

    RxJS

    Observable

    类似一个 Generator,产生一个流,并通过 next, error, complete 来发送值和通知

    const observable = Rx.Observable.create(ob => {
        ob.next(1);
        ob.next(2);
        // end
        ob.complete();
    });
    

    Operators(操作符)

    用来转换、过滤、组合等对流进行操作的函数,所有的操作符都是惰性求值的过程,实际上流并没有触发

    const observableWithOps = observable.filter(it => it === 2).take(1);
    

    Observer

    对流进行监听的一个对象,包括 next, error, complete 三个方法来对应 Observable 的通知

    const observer = {
        next: x => console.log(x),
        error: err => console.error(err),
        complete: () => console.log('complete'),
    };
    

    Subscription

    建立 ObservableObserver 的订阅关系,只有当订阅关系建立、流中的操作符开始执行

    observable.subscribe(observer); // 输出: 1 2 complete
    observableWithOps.subscribe(observer);  // 输出: 2 complete
    

    Subject

    Subjectsubscription 的集合,可以通过 Subject 实现多播;同时 Subject 也是一个 Observable

    const subject = new Rx.Subject();
    subject.subscribe(v => console.log('one: ' + v));
    subject.subscribe(v => console.log('two: ' + v));
    subject.next(1);    // 输出: one: 1 two: 1
    

    Operators

    1. 创建流

    Rx.Observable.of(1);               // 从值创建
    Rx.Observable.from([1, 2]);        // 从数组创建
    Rx.Observable.fromPromise(Promise.resolve(2));     // 从 Promise 创建
    Rx.Observable.fromEvent(document, 'click');        // 从事件创建
    Rx.Observable.create(ob => {ob.next(1); ob.complete();});        // 自定义创建
    Rx.Observable.interval(1000);      // 从定时器创建
    

    2. 数据操作

    Rx.Observable.from([1, 2]).map(val => val + 1);
    Rx.Observable.from([1, 2]).mapTo(1); // 转换为固定值
    Rx.Observable.from([1, 2]).scan((x, y) => x + y, 0); // x为当前的值,y为缓存的值,可以用来做计数器
    Rx.Observable.of({a: 1}).pluck('a'); // 从数据中取某些属性的值
    
    // 过滤
    Rx.Observable.from([1, 2]).filter(x => x === 1);
    // 去重
    Rx.Observable.from([1, 1, 2, 2]).distinct(); // 1 2
    Rx.Observable.from([1, 1, 2, 2, 1, 1, 3, 3]).distinctUntilChanged(); // 1 2 1 3
    // 取符合条件的值,以take举例,skip, skipLast, skipWhile, skipUntil同理
    Rx.Observable.from([1, 2]).take(1);
    Rx.Observable.from([1, 2]).takeLast(1);
    Rx.Observable.from([1, 2]).takeWhile(x => x === 2);
    Rx.Observable.interval(1000)
        .takeUntil(Rx.Observable.interval(2500)); // 除非另一个Observable发出值,不然一直取值。log: 0 1
    
    // 多个流合并
    const ob1 = Rx.Observable.interval(500).take(3);
    const ob2 = Rx.Observable.interval(600).take(3);
    Rx.Observable.merge(ob1, ob2); // 并行执行流,0 0 1 1 2 2
    Rx.Observable.concat(ob1, ob2); // 串行执行流,0 1 2 0 1 2
    Rx.Observable.combineLatest(ob1, ob2); // 将多个流最新的值组成数组 [0, 0] [1, 0] [1, 1] [2, 1] [2, 2]
    Rx.Observable.forkJoin(ob1, ob2); // 多个流结束后,将最新的值组成数组 [2, 2]
    Rx.Observable.zip(ob1, ob2); // 将多个流按顺序一一组合,直到其中有流结束 [0, 0] [1, 1] [2, 2]
    

    在流中会出现值也为 Observable 的情况,如

    Rx.Observable.from([1, 2]).map(val => Rx.Observable.of(val))
    

    称为高阶 Observable,如果想把高阶 Observable 转化为一阶 Observable,需要 concatAll mergeAll 这种操作符

    // concatAll 处理和 concat 相似,mergeAll / combineAll / zipAll 同理
    Rx.Observable.from([1, 2]).map(val => Rx.Observable.of(val)).concatAll() // 1 2
    
    // 另外还有 switch 可以取消前一个 Observable,可以应用在触发新请求时取消前一个请求
    Rx.Observable.interval(600).take(3)
        .map(val => Rx.Observable.interval(500).take(3))
        .switch() // 0 0 0 1 2 
    
    // 而 map 和 concatAll 两个操作可以整合为一个操作 concatMap
    // mergeMap / switchMap / concatMapTo / mergeMapTo / switchMapTo 同理
    Rx.Observable.from([1, 2]).concatMap(val => Rx.Observable.of(val)) // 1 2
    

    处理中

    // 节流
    Rx.Observable.fromEvent(input, 'input').debounce(() => Rx.Observable.interval(1000));
    Rx.Observable.interval(1000).debounceTime(600);
    Rx.Observable.interval(1000).throttle(() => Rx.Observable.interval(2000));
    Rx.Observable.interval(1000).throttleTime(600);
    
    // buffer
    // 直到流结束,当传给 buffer 的 Observable 发出通知时,取 buffer 的值
    Rx.Observable.interval(500).take(5).buffer(Rx.Observable.interval(1100))    // [0, 1] [2, 3]
    Rx.Observable.interval(500).take(5).bufferCount(3) // 每n个流的值,取buffer的值,[0, 1, 2] [3, 4]
    Rx.Observable.interval(500).take(5).bufferTime(1100) // 每n个流的值,取buffer的值,[0, 1] [2, 3] [4]
    
    // 每当返回的 Observable 发出通知,返回 buffer 区内的值,可能为 []
    Rx.Observable.interval(500).take(5).bufferWhen(() => Rx.Observable.interval(500 * Math.random()))
    
    // window 和 buffer 类似,不同点在于返回的为高阶 Observable,需要打平
    // windowCount / windowTime / windowWhen 同理
    Rx.Observable.interval(500).take(5).window(Rx.Observable.interval(1100)).concatAll() // 0 1 2 3 4
    

    错误处理

    // 可以返回 Observable 继续执行
    Rx.Observable.create(ob => {throw 'Error'}).catch((err) => Rx.Observable.of(2))
    // 可以重试
    Rx.Observable.create(ob => {throw 'Error'}).catch((err) => caught)
    // 可以直接抛异常
    Rx.Observable.create(ob => {throw 'Error'}).catch((err) => {throw err})
    // 重试5次
    Rx.Observable.create(ob => {throw 'Error'}).retry(5)
    

    RxJS 具体使用

    处理 input 的输入事件,请求数据

    <!-- html -->
    <input id="j-input"></input>
    
    Rx.Observable.fromEvent(document.getElementById('j-input'))
        .debounceTime(500)
        .pluck('target', 'value')
        .distinctUntilChanged()
        .switchMap((val) => Rx.Observable.fromPromise(fetch('/something?data=' + val)))
        .catch(e => throw(e))
        .subscribe(console.log
    

    WebSocket 中合成多个数据源的信息

    const socket = io('http://127.0.0.1:8080');
    Rx.Observable.merge(
        Rx.Observable.fromEvent(socket, 'data1'),
        Rx.Observable.fromEvent(socket, 'data2'),
    ).combineLatest().subscribe(console.log)