精华内容
参与话题
问答
  • RxJS

    千次阅读 2016-12-09 16:31:27
    RxJS 全名是Reactive Extension for JavaScript,即JavaScript 的响应式扩展。 响应式的思路是把随时间不断变化的数据、状态、事件等转成可被观察的序列(Observe Sequence),然后订阅序列中那些 Observable ...

    1、简介

    RxJS 全名是Reactive Extension for JavaScript,即JavaScript 的响应式扩展。

    响应式的思路是把随时间不断变化的数据、状态、事件等转成可被观察的序列(Observe Sequence),然后订阅序列中那些 Observable 对象的变化,一旦变化,

    就会执行事先安排好的各种转换和操作。RxJS作为一个库,可以和任何框架混用。

     

     

    2、data stream

    以一个click event 来说,在用户点击的动作发生后,会有一段时间出发了几个事件:value, error or completed signal, 如下图:

    RxJS 将任何事情包括variables, user inputs, properties, caches, data structures等都转化为data streams,并通过observe 这些 data streams的变化,做出响应的处理。

     

    3、RxJS  安装 http://reactivex.io/rxjs/manual/installation.html

     

    4、创建 Observable 

    Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

    (1) 通过 构造方法创建

     /--js---
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    const source$ = new Rx.Observable(
        observe => {
            console.log('creating observable');
            //更新 数据流
            observe.next('hello rxjs');
            observe.next('another value');
     
            observe.error(new Error('Error: Something went wrong!'))
     
            setTimeout(() => {
                observe.next('yet another value');
                observe.complete();
            }, 2000)
        }
    );
     
    source$
        .catch(err => Rx.Observable.of(err))
        .subscribe(
        x => {
            console.log(x);
        },
        err => {
            console.log(err);
        },
        complete => {
            console.log('complete');
        }
    );

     

     


    (2) 从事件创建, 实现一个简单的input输入 =>  数据更新

    //---html---
    <input id="input">
    <p id="inputVal"></p>
      
    /--js---
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    const input = $('#input');
    const inputVal = $('#inputVal');
    const inputSource$ = Rx.Observable.fromEvent(input, 'keyup');
     
    inputSource$.subscribe(
        //捕获的值
        (e) => {
            inputVal.html(e.currentTarget.value);
        },
        //捕获的错误
        (err) => {
            console.log('err: ', err);
        },
        //完成
        () => {
            console.log('Complete');
        }
    );

     

    (3) 从 promise 创建

      
    /--js---
    //普通的 promise 封装
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    const myPromise = new Promise((resolve, reject) => {
       console.log('create promise!');
        setTimeout(() => {
            resolve('hello from promise!');
        }, 3000)
    });
     
    const promiseSource$ = Rx.Observable.fromPromise(myPromise);
     
    promiseSource$.subscribe(x => console.log(x));
      
    // ajax 数据请求
    function getUser(username) {
        return $.ajax({
            url: 'https://api.github.com/users/' + username,
            dataType: 'jsonp'
        }).promise();
    }
     
    Rx.Observable.fromPromise(getUser('wikidson'))
        .subscribe( x => {
            console.log(x);
        });

     

    (4)从 array 创建

      
    /--js---
    //普通的 promise 封装
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    从 promise 创建
      
    /--js---
    //普通的 promise 封装
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    const myPromise = new Promise((resolve, reject) => {
       console.log('create promise!');
        setTimeout(() => {
            resolve('hello from promise!');
        }, 3000)
    });
     
    const promiseSource$ = Rx.Observable.fromPromise(myPromise);
     
    promiseSource$.subscribe(x => console.log(x));
     
     
    // ajax 数据请求
    function getUser(username) {
        return $.ajax({
            url: 'https://api.github.com/users/' + username,
            dataType: 'jsonp'
        }).promise();
    }
     
    Rx.Observable.fromPromise(getUser('wikidson'))
        .subscribe( x => {
            console.log(x);
        });

     

    5. 使用场景

    (1)click event 

    如果我们想要能够抓取single click 与 double click 的事件,用最早的javascript 可能会需要变量来记录状态、时间等,但通过RxJS 提供的api,实现如下:

     

      
    /--js---
    //普通的 promise 封装
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    从 promise 创建
      
    /--js---
    //普通的 promise 封装
    import $ from 'jquery';
    import Rx from 'rxjs/Rx';
    const multiClickStream = clickStream$.bufferWhen(() => clickStream$.debounceTime(250))
        .map((list) => list.length)
        .filter((len) => len === 2);
     
    multiClickStream.subscribe(
        (numClicks) => {
            console.log(numClicks);
        }
    );

     

    从上图可以看出,RxJS 帮我们把 stream 上的event 依照我们想要的时间做整理,buffer 住 触发时间在 250ms 间的 click events ,并且利用map函数抓出每个 event list 的长度, 并进一步 筛选出 长度等于 2 ,也就是 double click 的event 出来。

    展开全文
  • RXJS

    2017-04-16 14:24:50
    : # rxjs简单入门> rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些...
     # rxjs简单入门> rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作**rxjs适用于异步场景,即前端
    

    rxjs简单入门

    rxjs全名Reactive Extensions for JavaScript,Javascript的响应式扩展, 响应式的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作

    rxjs适用于异步场景,即前端交互中接口请求、浏览器事件以及自定义事件。通过使用rxjs带给我们前所未有的开发体验。

    1. 统一异步编程的规范,不管是Promise、ajax还是事件,通通封装成序列(Observable Sequence),一旦有异步环节发生变更,观察序列即可截获发生变更的信息。
    2. 前端业务层和展现层解耦,比如展现层不需要关系指定事件触发时和DOM无关的处理逻辑。同时业务层也能组装异步操作中多个异步逻辑之间的关系,无需暴露给展现层。展现层关心的是:异步操作其中环节的数据变化。
    3. rxjs开发业务层具有高弹性,高稳定性,高实时性等特点。

      废话不多说,此篇文档结合模拟场景的例子,通过傻瓜式的描述来说明rxjs常用的方法以及组合关系。

    1. Let's Go

    rxjs应用观察者模式,其中包含2个重要的实例:Observer观察者和Subject被观察对象,多个Observer注册到Subject中,在Subject功能触发时,会通知注册好的Observab列表,逐一通知其响应观察变更信息。

    1.1 quick start

    1. 先从官网搬来rxjs的几个实例概念
      • Observable: 可观察的数据序列.
      • Observer: 观察者实例,用来决定何时观察指定数据.
      • Subscription: 观察数据序列返回订阅实例.
      • Operators: Observable的操作方法,包括转换数据序列,过滤等,所有的Operators方法接受的参数是上一次发送的数据变更的值,而方法返回值我们称之为发射新数据变更.
      • Subject: 被观察对象.
      • Schedulers: 控制调度并发,即当Observable接受Subject的变更响应时,可以通过scheduler设置响应方式,目前内置的响应可以调用Object.keys(Rx.Subject)查看。
    2. 我们最常用也最关心的Observable,四个生命周期:创建 、订阅 、 执行 、销毁。
      • 创建Obervable,返回被观察的序列源实例,该实例不具备发送数据的能力,相比之下通过new Rx.Subject创建的观察对象实例具备发送数据源的能力。
      • 通过序列源实例可以订阅序列发射新数据变更时的响应方法(回调方法)
      • 响应的动作实际上就是Observable的执行
      • 通过序列源实例可以销毁,而当订阅方法发生错误时也会自动销毁。
      • 序列源实例catch方法可以捕获订阅方法发生的错误,同时序列源实例可以接受从catch方法返回值,作为新的序列源实例
    3. 掌握最简单的例子

      // 5.0.0-rc.1
      import Rx from 'rxjs';
      //emit 1 from promise
      const source = Rx.Observable.fromPromise(new Promise(resolve => resolve(1)));
      //add 10 to the value
      const example = source.map(val => val + 10);
      //output: 11
      const subscribe = example.subscribe(val => console.log(val));
      

      通过代码掌握Observable, Observer, Subscription, Operators, SubjectSchedulers之间的关系

      import Rx from 'rxjs';
      /**
        Rx.Observable是Observable
        Rx.Observable.create创建序列源source,创建source的方法有多个,比如of, from, fromPromise等
        observer是Observer观察者,只有在Rx.Observable.create创建方法可以获取,其他创建方法内置了observer且不可访问
        observer.next发射数据更新
        source.map其中map就是Operators的其中一个方法,方法调用返回新的source1
        source1.subscribe是订阅,即数据更新时的响应方法。同时返回订阅实例Subscription
        subscription.next立即响应(不同于发射)静态数据,此时不会经过`Operators`处理
        ! Rx.Observable.create或者Rx.Subject.create创建的source不会自动关闭,其他方式则当检测到没有序列发生变更会自动销毁source.
      */
      const source = Rx.Observable.create(observer => {
        observer.next('foo');
        setTimeout(() => observer.next('bar'), 1000);
      });
      const source1 = source.map(val => `hello ${val}`);
      const subscription = source1.subscribe(value => console.log(value));
      subscription.next('foo1');
      
      // forEach和subscribe相似,同是实现订阅效果,等到promise可以监控subscription完成和失败的异常。
      // 日志打印并没有comlete, 因为source并没有完成关闭,触发调用observer.complete()
      const promise = source1.forEach(value => console.log(value))
      promise.then(() => console.log('complete'), (err) => console.log(err));
      /**
        output: 
        hello foo
        foo1
        hello foo
        hello bar
        hello bar
      */
      
      /**
        new Subject创建被观察者实例,同source一样都具备subscribe方法,表示的含义和作用也一样,即发射数据变更时响应方法。
        subject.next立即发射数据变更,作用同observer.next
        注意foo1是最后输出的,是因为在创建source时指定了Rx.Scheduler.async,是异步的调度器,表示在响应数据处理时是异步执行的。
      */
      Rx.Observable.of('foo1', Rx.Scheduler.async).subscribe(value => console.log(value));
      
      const subject = new Subject();
      const source2 = subject.map(val => `hello ${val}`);
      const subscription = source1.subscribe(value => console.log(value));
      subject.next('foo');
      subscription.next('bar');
      /**
        output: 
        hello foo
        bar
        foo1
      */
      

      1.2 学会看rxjs交互图

      交互图中每条连表示一个数据序列,每个球表示每次发射的变更,最后一条线表示最终产出的数据序列。

    下图以combineLastest来举例:

    • 方法之上的每条线都是一个source(数据序列实例)
    • 方法之下方法调用后返回的新source
    • combineLastest表示被组合的每个source,一旦发射数据变更,必须拿到其余的source的最新值(当异步时则等待,直到都拿到最新值),组合为新的数据,作为新source发射的数据变更。source1: ————————①——————————②——————————③————————————④—————————⑤——————————|——>source2: ———————————ⓐ————————ⓑ————————————ⓒ—————————————————————ⓓ—————————|——> combineLastest(source1, source2, (x, y) => x + y)source: ———————(①ⓐ)—(②ⓐ)—(②ⓑ)—————(③ⓑ)—(③ⓒ)———(④ⓒ)————(⑤ⓒ)—(⑤ⓓ)——|——>

    2. 实例方法Operators

    前面讲过Operators方法调用时,接收的参数是source,返回新的source, 以下是个人学习使用过程中,简单总结的rxjs各方法用法。

    2.1 创建

    • 发射完数据更新自动关闭:from, fromPromise, of, from, range
    • 不发射直接关闭:empty
    • 抛出异常后关闭:throw
    • 不发射数据也不关闭:never
    • 保持发射数据且不自动关闭:timer, interval, fromEvent
    • 需要手动发射数据且不自动关闭:create, (还有Rx.Subject.create)

    2.2 转换

    1. 1:1效果:map, mapTo, flatMap, scan, expand, pluck
      • map,source = source1.map(func)表示source1每次发射数据时经过func函数处理,返回新的值作为source发射的数据
      • mapTo,不同于map,func改为静态值
      • flatMap,当发射的数据是一个source时,在订阅的响应方法中接收到的也是一个source(这是合理的,发射什么数据就响应什么数据嘛,但是如果我们想在响应方法收到的是source的发射数据),flatMap就是可以允许发射数据是一个source,同时在响应的时候接收的是source的发送数据,后面我们称之为**source打平**
      • scan,source = source1.scan(func, initialValue), source每次发射的数据是source前次发射数据和source1当前发射的数据 的组合结果(取决于func,一般是相加), initialValue第一次发射,source前次没发射过,采用initialValue作为前次发射的数据
      • expand,和scan不同的是当func返回值是一个source时,在func接收到的数据是source打平后的发射数据。**特别适用于polling长轮询**
      • pluck,每次发射数据时,获取数据中的指定属性的值作为source的发射数据
    2. 1:N效果:concat, concatAll, concatMap, concatMapTo, merge, mergeAll, mergeMap, mergeMapTo, switchMap, switchMapTo
      • concat, concatAllmerge, mergeAll属于组合类型,放在这讲更好体现其效果。
      • concat,source = source1.concat(source2)表示source发射数组的顺序是,当source1或source2发射数据,source就发射。但是只有当source1发射完且关闭(source1不在发送数据)后,才触发source2发射数据。
      • concatAll,不同于concat,会把所有的发射的数据打平(如果数据为source时),然后在决定下次发射哪个数据。
      • concatMap,source = source1.concatMap(source2)表示source1每次发射数据时,获取source2的所有发射数据,map返回多个待发射数据,按顺序发射第一个数据变更。
      • concatMapTo, 不同于concatMap, map处理以source2的数据为返回结果
      • switchMap, 和concatMap不同的是在map之后的待发射数据排序上,concatMap中source1每次发射时source2的所有发射数据都接收,作为source1下一次发射前,之间的所有发射数据。switchMap则会判断source2的所有发射数据是否有数据的发射时间比source1下一次发射的时间晚,找出来去除掉。
      • switchMapToswitchMap就好比concatMapconcatMapTo, mergeMap对比mergeMapTo的关系也是如此。
      • mergeMap相比于switchMap,找出的数据会打平到source中,不丢弃。
    3. N:1效果:buffer, bufferCount, bufferTime, bufferWhen
      • buffer,source = source1.buffer(source2)表示source1以source2为参考,在source2的2次发射数据之间为时间段,source才发射一次数据,数据为该时间段内source1本该发射的数据的组合。
      • 比如source1原先每隔1秒发射一次数据,source2是每个2秒发射数据,source = source1.buffer(source2), 那么source会每隔2秒发射数据(source1的2秒内发射的2个数值组成的数组)
      • bufferCount,source = source1.bufferCount(count, start), count表示source1毎3次发射数据作为source的一次发射数据,发射完后,以source1当前组合的发射数据的第start个开始算下次发射数据需要组合的起始数据。
      • bufferTime,一段时间内的source1发射数据作为source的一次发射数据
      • bufferWhen, 以默认结果为准分成2段,分别作为source的每次发射数据
    4. 1:source效果:groupBy, window, windowCount, windowTime, windowWhen
      • groupBy, source = source1.groupBy(func), 表示source1的所有发射数据,按func分成多段,每段作为source的每次发送的数据(这里数据只是新的source,你可以理解为inner Observable实例)
      • windowbuffer不同的时,source每次发送的是innerObservable
      • window vs windowCount vs windowTime vs windowWhenbuffer相似
    5. 1:sources效果:partition
      • partition,sources = source1.partition(func), 根据func吧所有的source1发射数据分段,每段组成一个source,最终得到sources数组

    2.3 过滤

    source的过滤不会对发射数据做任何改变,只是减少source的发射次数,所以理解起来会简单很多,这里只做个简单分类

    • 防抖动(一段时间内只取最新数据作为一次发射数据,其他数据取消发射):debounce, debounceTime, throttle(和debounce唯一区别是debounce取一段时间内最新的,而throttle忽略这段时间后,发现新值才发送), throttleTime
    • 去重(重叠的发射数据只去第一数据作为发射数据,其他相同数据取消发射):distinct, distinctUntilChanged
    • 定位(根据条件值去一个或部分几个数据作为对应发射数据,其他取消发射):elementAt, first, last, filter, take, takeLatst, takeUntil, takeWhile,
    • 跳过(根据条件去除符合条件的,取剩下的值作为每次发射数据):skip, skipUntil, skipWhile, ignoreElements(忽略所有的,等同于empty)
    • 样本:sample, source=source1.sample(source2), 以source2发射数据时来发现最新一次source1发射的数据,作为source的发射数据,个人觉得应该属于**转换**分类,官网放到了**过滤**

    2.4 组合

    做个source组合成新的souce

    • concat, concatAllmerge, mergeAll,在**转换**分类讲过了
    • combineLastest,source = source1.combineLastest(source2, func),source1和source2一旦发射数据,func会触发,拿到source1和source2最新的发射数据,返回新的数据,作为source的发射数据。
    • combineAll,同combineLastest,,source = sources.combineAll()
    • forkJoin,source = Rx.Observable.forkJoin(sources), 所有的sources都关闭后,获取各自最新的发射数组组合为数组,作为source的发射数据
    • zipforkJoin的区别是,zip是sources都有发送数据时,组合为一个数组作为source的发送数据,而sources任一source关闭了,则取source最后发射的数值。
    • zipAll,同concatconcatAll
    • startWith,source = source1.startWith(value), 表示在source1的最前面注入第一次发射数据
    • withLastestFrom, soruce = source1.withLastestFrom(source2, func), 表示source1每次发射数据时,获取source2最新发射的数据,如果存在则func处理得到新的数组作为source的发射数据

    2.5 判断

    • findfindIndex分别是指定发射数据和发射数据的下标(第几次发送的),应该放到**过滤**分类才合理
    • isEmpty, every, include等,判断是否为真,判断的结果当做是source的发射数据

    2.6 错误处理

    • catch,source在Operators调用过程中出现的异常,都可以在catch捕获到,同时可以返回新的source,因为出现异常的当前source会自动销毁掉。
    • retry,source = source.retry(times), source的所有发射,重复来几遍。
    • retryWhen,根据条件来决定来几遍,只有当条件为false时才跳出循环。

    2.7 工具

    • do,在每次响应订阅前,可以通过source.do(func),做一些提前处理等任何动作,比如打印一下发射的数据等。
    • delay, delayWhen,每次发送数据时,都延迟一定时间间隔后再发送。
    • observeOn, 设置scheduler,即发射数据的响应方式,Schedulers详细查看地址, 这里不讲解了,项目中应用得不多。
    • subcribeOn, timeInterval设置sheduler
    • toPromise, source转成promise,可以通过promise.then达到source.subscribe的效果
    • toArray,把source所有发射的数据,组成数组输出。

    2.8 计算

    把source的所有发射数据进行指定计算后,得出的数据作为新source的发射数据,计算方法分别有:max, min, count, reduce, average

    2.9 其他

    • cache, source = source1.cache(1);共享source1的订阅结果,即不管source订阅几回,响应方法接收到的发射数据都是同一份。
    • 共享source订阅结果很重要,因为**组合**等方法组合多个source时,其中包含sourceA,同时sourceA还需要单独订阅其结果,在不用cache情况下,sourceA会产生2个subscription,即2个订阅实例,但是我们更希望是能达到sourceA发生变化时,都能通知到所有的组合sourceA的source。
    • publish,publishSource = source.publish(),让source的订阅的工作延后,即source不会发射数据,而是等到publishSource.connect()调用后才开发发射数据。效果和delay很相似,不同的是可以控制合适发射。
    • share,当source订阅多次,那么每次响应时do都会调用多次,通过share合并响应,则source发射一次数据更新,多次响应当当一次响应处理,do也调用一次。

    参考资料

    1. rxjs官网 - http://reactivex.io/rxjs/
    2. rxjs代码 - https://github.com/ReactiveX/rxjs
    3. 常用rxjs方法的交互图 - http://rxmarbles.com/
    4. rxhjs教程 - http://xgrommx.github.io/rx-book/content/observable/observable_instance_methods/toarray.html
    5. Scheduler - https://mcxiaoke.gitbooks.io/rxdocs/content/Scheduler.html

    用云栖社区APP,舒服~

    【云栖快讯】2017云栖大会·南京峰会开启报名,深度解读AI+工业互联网时代!4月26日,春暖花开,与六朝古都一起见证新时代!  详情请点击

    网友评论

    展开全文
  • rxjs

    2017-03-27 17:31:01
    rxjs是通过使用可观察序列类构建一部和基于事件的程序的库。提供了核心类型Observable和卫星类型:Observer、Schedulers、Subject和操作符。使得我们可以不异步事件以集合方式进行处理。 Observable可观察对象:...

    rxjs是通过使用可观察序列类构建一部和基于事件的程序的库。提供了核心类型Observable和卫星类型:Observer、Schedulers、Subject和操作符。使得我们可以不异步事件以集合方式进行处理。

    Observable可观察对象:表示一个可调用的未来值或事件的集合。

    Observer观察者:一个回调函数集合,它知道怎样去监听被Observable发送的值。

    Subscription订阅:表示一个可观察对象的执行,主要是用于取消执行。

    Operators操作符:纯粹的函数,是的以函数编程方式处理集合。

    Subject主题:等同于事件驱动器,将一个值或事件广播到多个观察者的唯一途径。

    Schedulers调度者:用来控制并发,计算发生时允许协调(setTimeout/requestAnimationFrame).......

    rxjs基本根据以上六个处理基本事件。

    展开全文
  • Rxjs

    2019-03-28 10:58:50
    Rxjs简介建立 Observable观察者 Observervue+rxjs缺点 众所周知,为了避免DOM渲染的冲突,Javascript是单线程模式,该如何解决耗时操作问题?(鼠标键盘事情的处理,远程http请求以及文件io操作等等。 )——异步...

    众所周知,为了避免DOM渲染的冲突,Javascript是单线程模式,该如何解决耗时操作问题?(鼠标键盘事情的处理,远程http请求以及文件io操作等等。
    )——异步。
    同步代码,直接执行
    异步函数先放在异步队列中
    待同步函数执行完毕,轮询执行异步队列函数

    同步执行异步程序带来回调地狱问题
    ES6中引入了Promises代表了在未来某个时刻完成。
    把异步中使用回调函数的场景改为了.then()、.catch()等函数链式调用的方式。基于promise我们可以把复杂的异步回调处理方式进行模块化。
    然而promise也有自身的缺点:

    • 数据源产生多个值,比如鼠标移动事情或者文件系统的字节流;
    • 没有失败重试的机制;
    • 没有取消机制;

    简介

    RxJS 是一个响应式编程的库,响应式的思路是把随时间不断变化的数据、状态、事件等等转成可被观察的序列(Observable Sequence),然后订阅序列中那些Observable对象的变化,一旦变化,就会执行事先安排好的各种转换和操作,使编写异步或基于回调的代码更容易。

    • Vue 的底层就是採用了 Reactive Programming 的观念来实作的,另外 Vue 官方也推出了 vue-rx
    • Angular 2也全面引用了 RxJS
    • Redux中加入了对 Observable 操作的支持。

    区别:
    Promise .then()只能返回一个值,Observable可以返回多个值
    Promise要么resolve要么reject,并且只响应一次。而Observable可以响应多次
    Promise不能取消,Observable可以调用unsubscribe()取消订阅
    Observable认为是加强版的Promise,它们之间是可以通过RxJS的API互相转换的:

    const observable = Observable.fromPromise(promise); // Promise转为Observable
    const promise = observable.toPromise(); // Observable转为Promise
    
    • 我们知道传统的for,while对循环体中的异步程序是无法感知的,或者说,它们不会等待异步程序执行完毕再进入下一轮循环。
    • 错误处理是任何程序都要解决的问题,本身就已很复杂的回调函数中再嵌入try/catch块吗?如果还想加入重试机制呢?
    • 商业逻辑内嵌在回调函数中,可读性差,复杂度高。现如今流行的组件化编程,即可重用,又可解耦,还方便测试;
    • 闭包是强大的,过度地使用闭包将导致我们不得不谨慎地审视变量的作用域以及其值。再加上共享变量带来的副作用,混杂在if/else条件语句和for循环中,每天都会有修不完的bug;
    • 根据事件或耗时操作无响应的时间进行取消操作;
    • 自己实现throttling和debouncing是很困难的
    • 众所周知的事件监听带来的内存泄露问题;

    在RxJS中,存在这么几种东西:

    • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
    • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
    • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
    • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
    • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
    • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

    建立 Observable

    Observable 就像是一个序列,裡面的元素会随著时间推送。

    var observable = Rx.Observable
        .create(observer => {
            observer.next('Jerry');
            observer.next('Anna');
        })
    // 订阅这个 observable  
    observable.subscribe(value => {
        console.log(value);
    })
    

    观察者 Observer

    // 观察者, next, error, complete 三个方法
    
    var observer = {
      next: x => console.log('Observer got a next value: ' + x),
      error: err => console.error('Observer got an error: ' + err),
      complete: () => console.log('Observer got a complete notification'),
    };
    
    observable.subscribe(observer)
    

    我们可以把一切输入都当做数据流来处理
    用户操作
    网络响应
    定时器

    RxJS提供了各种API来创建数据流:
    单值:of, empty, never
    多值:from
    定时:interval, timer
    从事件创建:fromEvent
    从Promise创建:fromPromise
    自定义创建:create

    创建出来的数据流是一种可观察的序列,可以被订阅,也可以被用来做一些转换操作,比如:
    改变数据形态:map, mapTo, pluck
    过滤一些值:filter, skip, first, last, take
    时间轴上的操作:delay, timeout, throttle, debounce, audit, bufferTime
    累加:reduce, scan
    异常处理:throw, catch, retry, finally
    条件执行:takeUntil, delayWhen, retryWhen, subscribeOn, ObserveOn
    转接:switch

    也可以对若干个数据流进行组合:
    concat,保持原来的序列顺序连接两个数据流
    merge,合并序列
    race,预设条件为其中一个数据流完成
    forkJoin,预设条件为所有数据流都完成
    zip,取各来源数据流最后一个值合并为对象
    combineLatest,取各来源数据流最后一个值合并为数组

    操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

    许多操作符都是跟时间相关的,它们可能会以不同的方式延迟(delay)、取样(sample)、节流(throttle)或去抖动值(debonce)。图表通常是更适合的工具。弹珠图是操作符运行方式的视觉表示,其中包含输入 Obserable(s) (输入可能是多个 Observable )、操作符及其参数和输出 Observable 。

    屏幕快照 2019-03-24 下午6.39.16.png

    rxjs应用观察者模式,其中包含2个重要的实例:Observer观察者和Subject被观察对象,多个Observer注册到Subject中,在Subject功能触发时,会通知注册好的Observab列表,逐一通知其响应观察变更信息。
    很多时候,我们会有一些显示时间的场景,比如在页面下添加评论,评论列表中显示了它们分别是什么时间创建的

    tick() {
    this.diff = moment(createAt).fromNow()
    setTimeout(tick.bind(this), 1000)
    }
    

    但组件并不一定只有一份实例,这样,整个界面上可能就有很多定时器在同时跑,这是一种浪费。如果要做优化,可以把定时器做成一种服务,把业务上需要周期执行的东西放进去,当作定时任务来跑。

    Observable.interval(1000).subscribe(() => {
      this.diff = moment(createAt).fromNow()
    })
    

    有很多数据,非常多关于数据的操作
    展示的数据是多个数据组合而成,比如任务、对应owner、标签等
    同一个数据的更新,可能来自不同的发起方
    新增的数据需要的数据处理规则应该和原来的相同

    解决:
    数据通过缓存和异步方式获取
    把每个数据流管道组合起来,流的叠合就是最后的数据
    获取和订阅放在一起,也就不需要知道数据的来源是哪里了
    现在和未来的数据merge之后通过相同的API处理,保证数据的规则相同

    ###使用RxJS实现搜索功能

    • 多余的请求
    • 已无用的请求仍然执行
    <input id="text"></input>
    <script>
        var text = document.querySelector('#text'),
            timer = null;
        text.addEventListener('keyup', (e) =>{
            // 在 250 毫秒内进行其他输入,则清除上一个定时器
            clearTimeout(timer);
            // 定时器,在 250 毫秒后触发
            timer = setTimeout(() => {
                console.log('发起请求..');
            },250)
        })
    </script>
    
    <input id="text"></input>
    <script>
        var text = document.querySelector('#text'),
            timer = null,
            currentSearch = '';
    
        text.addEventListener('keyup', (e) =>{
            clearTimeout(timer)
            timer = setTimeout(() => {
                // 声明一个当前所搜的状态变量
                currentSearch = '书'; 
    
                var searchText = e.target.value;
                $.ajax({
                    url: `/search/${searchText}`,
                    success: data => {
                        // 判断后台返回的标志与我们存的当前搜索变量是否一致
                        if (data.search === currentSearch) {
                            // 渲染展示
                            render(data);
                        } else {
                            // ..
                        }
                    }           
                });
            },250)
        })
    </script>
    

    vue+rxjs

    import Vue from 'vue'
    import VueRx from 'vue-rx' 
    import Rx from 'rxjs/Rx'
    
    Vue.use(VueRx, Rx)
    

    在Vue实例当中就会多了这个钩子函数
    subscriptions:用法类似data
    domStreams:存放事件

    <input type="text" v-stream:keyup="getInput$">
    <p>value$: {{ value$  }}</p>
    
    import { Observable } from 'rxjs';
    export default {
      domStreams: ['getInput$'],
      subscriptions () {
        return {
          value$: this.getInput$        
            .pluck('event', 'target', 'value')
            .debounceTime(2000)
            .distinctUntilChanged()
            .switchMap(url => Http.get(url))
            .map(val => {
              console.log(val);
            })
        }
      }
    }
    

    debounceTime
    只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。

    distinctUntilChanged
    返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。
    如果输入值没有变化,则不要发起请求(比如按某个字符,然后快速按退格)。

    source : --a--b--c--c--b|
                distinctUntilChanged()
    example: --a--b--c-----b|
    

    switchMap

    处理高阶 Observable 就是指一个 Observable 送出的元素还是一个 Observable

    var click = Rx.Observable.fromEvent(document.body, 'click');
    var source = click.map(e => Rx.Observable.interval(1000));
    
    var example = source.switch();
    example.subscribe({
        next: (value) => { console.log(value); },
        error: (err) => { console.log('Error: ' + err); },
        complete: () => { console.log('complete'); }
    });
    

    在这里插入图片描述

    var source = Rx.Observable.fromEvent(document.body, 'click');
    
    var example = source
                    .switchMap(
                        e => Rx.Observable.interval(100).take(3)
                    );
    

    缺点

    RxJS的抽象程度很高,可以用很简短代码表达很复杂的含义。

    展开全文
  • import { Component, OnInit } from '@angular/core';...import { Observable } from 'rxjs'; import { map, filter } from 'rxjs/operators'; @Component({ selector: 'app-home', templateUrl: './home
  • NOTE: The latest version of RxJS can be found here The Need to go Reactive | About the Reactive Extensions | Batteries Included | Why RxJS? | Dive In! | Resources | Getting Started | What about my ...
  • RxJS: Reactive Extensions For JavaScript RxJS 7 (beta) FOR 6.X PLEASE GO TO THE 6.x BRANCH Reactive Extensions Library for JavaScript. This is a rewrite of Reactive-Extensions/RxJS and is the ...
  • | rxjs | dependencies | minor | <a href="https://diff.intrinsic.com/rxjs/6.2.2/6.5.2">~6.2.0</code> -> ~6.5.0</code></a> | <a href="https://togithub.com/reactivex/rxjs">source</a> | <h3>Release ...
  • Rxjs Error

    2020-11-27 18:59:03
    <div><p>ERROR in node_modules/rxjs/internal/types.d.ts(81,44): error TS1005: ';' expected. node_modules/rxjs/internal/types.d.ts(81,74): error TS1005: ';' expected. node_modules/rxjs/...
  • 深入浅出Rxjs.pdf

    2018-12-09 10:43:50
    高清版 <深入浅出Rxjs.pdf> 电子书,欢迎下载,也可以在 [码农书籍]小程序上直接在线阅读,或者您不想下载也可以去我主页加群获取。 这是⼀个信息技术爆炸的时代,计算机编程语⾔和框架层出不穷,同 时,编程的风格...
  • 深入浅出 RxJS(全本)

    2019-03-25 01:03:55
    本书系统讲解RxJS响应式编程的技术原理与应用。第1章剖析函数响应式编程的基本概念,通过简单RxJS代码引入函数响应式编程,并与传统编程方式对比,解释这种编程范式的优势,以及这种范式形成的历史。第2章介绍学习...
  • Rxjs dependencies

    2020-12-02 19:12:32
    <p>I like your library, but I would like it so much more if you removed the <code>rxjs-compat</code> dependency as well as <code>rxjs</code> on version 5. <p>Can you migrate to <code>rxjs</code> 6?...
  • add RxJS 6 support

    2020-12-02 16:53:27
    Import auto-complete to project which uses RxJS 6 (without rxjs-compact). <p><strong>Current behavior Angular CLI 6 returns error: <pre><code> ERROR in node_modules/rxjs/Observable.d.ts(1,15): error ...

空空如也

1 2 3 4 5 ... 20
收藏数 4,946
精华内容 1,978
关键字:

rxjs