observable_observablelist - CSDN
精华内容
参与话题
  • Observable详解

    2019-04-07 09:35:28
    浏览新版,请访问 RxJS Observable 在介绍 Observable 之前,我们要先了解两个设计模式: Observer Pattern - (观察者模式) Iterator Pattern - (迭代器模式) 这两个模式是 Observable 的基础,下面我们先来介绍...
        
    浏览新版,请访问 RxJS Observable

    在介绍 Observable 之前,我们要先了解两个设计模式:

    • Observer Pattern - (观察者模式)
    • Iterator Pattern - (迭代器模式)

    这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。

    Observer Pattern

    观察者模式定义

    观察者模式软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实时事件处理系统。 — 维基百科

    观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。

    我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:

    • 期刊出版方 - 负责期刊的出版和发行工作
    • 订阅者 - 只需执行订阅操作,新版的期刊发布后,就会主动收到通知,如果取消订阅,以后就不会再收到通知

    在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。

    图片描述

    观察者模式优缺点

    观察者模式的优点:

    • 支持简单的广播通信,自动通知所有已经订阅过的对象
    • 目标对象与观察者之间的抽象耦合关系能够单独扩展以及重用

    观察者模式的缺点:

    • 如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间
    • 如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃

    观察者模式的应用

    在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下:

    <button id="btn">确认</button>
    
    function clickHandler(event) {
        console.log('用户已点击确认按钮!');
    }
    document.getElementById("btn").addEventListener('click', clickHandler);

    上面代码中,我们通过 addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 clickHandler 函数。

    观察者模式实战

    Subject 类定义:

    class Subject {
        
        constructor() {
            this.observerCollection = [];
        }
        
        registerObserver(observer) {
            this.observerCollection.push(observer);
        }
        
        unregisterObserver(observer) {
            let index = this.observerCollection.indexOf(observer);
            if(index >= 0) this.observerCollection.splice(index, 1);
        }
        
        notifyObservers() {
            this.observerCollection.forEach((observer)=>observer.notify());
        }
    }

    Observer 类定义:

    class Observer {
        
        constructor(name) {
            this.name = name;
        }
        
        notify() {
            console.log(`${this.name} has been notified.`);
        }
    }

    使用示例:

    let subject = new Subject(); // 创建主题对象
    
    let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker'
    let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo'
    
    subject.registerObserver(observer1); // 注册观察者A
    subject.registerObserver(observer2); // 注册观察者B
     
    subject.notifyObservers(); // 通知观察者
    
    subject.unregisterObserver(observer1); // 移除观察者A
    
    subject.notifyObservers(); // 验证是否成功移除

    以上代码成功运行后控制台的输出结果:

    semlinker has been notified. # 输出一次
    2(unknown) lolo has been notified. # 输出两次

    需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听)

    Iterator Pattern

    迭代器模式定义

    迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。

    迭代器模式的优缺点

    迭代器模式的优点:

    • 简化了遍历方式,对于对象集合的遍历,还是比较麻烦的,对于数组或者有序列表,我们尚可以通过游标取得,但用户需要在对集合了解的前提下,自行遍历对象,但是对于 hash 表来说,用户遍历起来就比较麻烦。而引入迭代器方法后,用户用起来就简单的多了。
    • 封装性良好,用户只需要得到迭代器就可以遍历,而不用去关心遍历算法。

    迭代器模式的缺点:

    • 遍历过程是一个单向且不可逆的遍历

    ECMAScript 迭代器

    在 ECMAScript 中 Iterator 最早其实是要采用类似 Python 的 Iterator 规范,就是 Iterator 在没有元素之后,执行 next 会直接抛出错误;但后来经过一段时间讨论后,决定采更 functional 的做法,改成在取得最后一个元素之后执行 next 永远都回传 { done: true, value: undefined }

    一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 donevalue 两个属性的对象。对象的取值如下:

    • 在最后一个元素前:{ done: false, value: elementValue }
    • 在最后一个元素后:{ done: true, value: undefined }

    详细信息可以参考 - 可迭代协议和迭代器协议

    ES 5 迭代器

    接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下:

    function makeIterator(array){
        var nextIndex = 0;
        
        return {
           next: function(){
               return nextIndex < array.length ?
                   {value: array[nextIndex++], done: false} :
                   {done: true};
           }
        }
    }

    一旦初始化, next() 方法可以用来依次访问可迭代对象中的元素:

    var it = makeIterator(['yo', 'ya']);
    console.log(it.next().value); // 'yo'
    console.log(it.next().value); // 'ya'
    console.log(it.next().done);  // true

    ES 6 迭代器

    在 ES 6 中我们可以通过 Symbol.iterator 来创建可迭代对象的内部迭代器,具体示例如下:

    let arr = ['a', 'b', 'c'];
    let iter = arr[Symbol.iterator]();

    调用 next() 方法来获取数组中的元素:

    > iter.next()
    { value: 'a', done: false }
    > iter.next()
    { value: 'b', done: false }
    > iter.next()
    { value: 'c', done: false }
    > iter.next()
    { value: undefined, done: true }

    ES 6 中可迭代的对象:

    • Arrays
    • Strings
    • Maps
    • Sets
    • DOM data structures (work in progress)

    Observable

    RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

    Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

    • 订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
    • 发布:Observable 通过回调 next 方法向 Observer 发布事件。

    Proposal Observable

    自定义 Observable

    如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 Observer 作为参数然后返回另一个函数。

    它的基本特征:

    • 是一个函数
    • 接受一个 Observer 对象 (包含 next、error、complete 方法的对象) 作为参数
    • 返回一个 unsubscribe 函数,用于取消订阅

    它的作用:

    作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现:

    DataSource - 数据源

    class DataSource {
      constructor() {
        let i = 0;
        this._id = setInterval(() => this.emit(i++), 200); // 创建定时器
      }
      
      emit(n) {
        const limit = 10;  // 设置数据上限值
        if (this.ondata) {
          this.ondata(n);
        }
        if (n === limit) {
          if (this.oncomplete) {
            this.oncomplete();
          }
          this.destroy();
        }
      }
      
      destroy() { // 清除定时器
        clearInterval(this._id);
      }
    }

    myObservable

    function myObservable(observer) {
        let datasource = new DataSource(); // 创建数据源
        datasource.ondata = (e) => observer.next(e); // 处理数据流
        datasource.onerror = (err) => observer.error(err); // 处理异常
        datasource.oncomplete = () => observer.complete(); // 处理数据流终止
        return () => { // 返回一个函数用于,销毁数据源
            datasource.destroy();
        };
    }

    使用示例:

    const unsub = myObservable({
      next(x) { console.log(x); },
      error(err) { console.error(err); },
      complete() { console.log('done')}
    });
    
    /**
    * 移除注释,可以测试取消订阅
    */
    // setTimeout(unsub, 500); 

    具体运行结果,可以查看线上示例

    SafeObserver - 更好的 Observer

    上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:

    • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)
    • complete 或者 error 触发之后再调用 next 方法是没用的
    • 调用 unsubscribe 方法后,任何方法都不能再被调用了
    • completeerror 触发后,unsubscribe 也会自动调用
    • nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费
    • nextcompleteerror是可选的。按需处理即可,不必全部处理

    为了完成上述目标,我们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。SafeObserver 的具体实现如下:

    class SafeObserver {
      constructor(destination) {
        this.destination = destination;
      }
      
      next(value) {
        // 尚未取消订阅,且包含next方法
        if (!this.isUnsubscribed && this.destination.next) {
          try {
            this.destination.next(value);
          } catch (err) {
            // 出现异常时,取消订阅释放资源,再抛出异常
            this.unsubscribe();
            throw err;
          }
        }
      }
      
      error(err) {
        // 尚未取消订阅,且包含error方法
        if (!this.isUnsubscribed && this.destination.error) {
          try {
            this.destination.error(err);
          } catch (e2) {
            // 出现异常时,取消订阅释放资源,再抛出异常
            this.unsubscribe();
            throw e2;
          }
          this.unsubscribe();
        }
      }
    
      complete() {
        // 尚未取消订阅,且包含complete方法
        if (!this.isUnsubscribed && this.destination.complete) {
          try {
            this.destination.complete();
          } catch (err) {
            // 出现异常时,取消订阅释放资源,再抛出异常
            this.unsubscribe();
            throw err;
          }
          this.unsubscribe();
        }
      }
      
      unsubscribe() { // 用于取消订阅
        this.isUnsubscribed = true;
        if (this.unsub) {
          this.unsub();
        }
      }
    }

    myObservable - 使用 SafeObserver

    function myObservable(observer) {
      const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象
      const datasource = new DataSource(); // 创建数据源
      datasource.ondata = (e) => safeObserver.next(e);
      datasource.onerror = (err) => safeObserver.error(err);
      datasource.oncomplete = () => safeObserver.complete();
    
      safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法
        datasource.destroy();
      };
      // 绑定this上下文,并返回unsubscribe方法
      return safeObserver.unsubscribe.bind(safeObserver); 
    }

    使用示例:

    const unsub = myObservable({
      next(x) { console.log(x); },
      error(err) { console.error(err); },
      complete() { console.log('done')}
    });

    具体运行结果,可以查看线上示例

    Operators - 也是函数

    Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。接下来我们来实现常用的 map 操作符:

    Observable 实现:

    class Observable {
      constructor(_subscribe) {
        this._subscribe = _subscribe;
      }
      
      subscribe(observer) {
        const safeObserver = new SafeObserver(observer);
        safeObserver.unsub = this._subscribe(safeObserver);
        return safeObserver.unsubscribe.bind(safeObserver);
      }
    }

    map 操作符实现:

    function map(source, project) {
      return new Observable((observer) => {
        const mapObserver = {
          next: (x) => observer.next(project(x)),
          error: (err) => observer.error(err),
          complete: () => observer.complete()
        };
        return source.subscribe(mapObserver);
      });
    }

    具体运行结果,可以查看线上示例

    改进 Observable - 支持 Operator 链式调用

    如果把 Operator 都写成如上那种独立的函数,我们链式代码会逐渐变丑:

    map(map(myObservable, (x) => x + 1), (x) => x + 2);

    对于上面的代码,想象一下有 5、6 个嵌套着的 Operator,再加上更多、更复杂的参数,基本上就没法儿看了。

    你也可以试下 Texas Toland 提议的简单版管道实现,合并压缩一个数组的Operator并生成一个最终的Observable,不过这意味着要写更复杂的 Operator,上代码:JSBin。其实写完后你会发现,代码也不怎么漂亮:

    pipe(myObservable, map(x => x + 1), map(x => x + 2));

    理想情况下,我们想将代码用更自然的方式链起来:

    myObservable.map(x => x + 1).map(x => x + 2);

    幸运的是,我们已经有了这样一个 Observable 类,我们可以基于 prototype 在不增加复杂度的情况下支持多 Operators 的链式结构,下面我们采用prototype方式再次实现一下 Observable

    Observable.prototype.map = function (project) {
        return new Observable((observer) => {
            const mapObserver = {
                next: (x) => observer.next(project(x)),
                error: (err) => observer.error(err),
                complete: () => observer.complete()
            };
            return this.subscribe(mapObserver);
        });
    };

    现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承 Observable 类,然后在子类中重写某些内容以优化程序。

    接下来我们来总结一下该部分的内容:Observable 就是函数,它接受 Observer 作为参数,又返回一个函数。如果你也写了一个函数,接收一个 Observer 作为参数,又返回一个函数,那么,它是异步的、还是同步的 ?其实都不是,它就只是一个函数。任何函数的行为都依赖于它的具体实现,所以当你处理一个 Observable 时,就把它当成一个普通函数,里面没有什么黑魔法。当你要构建 Operator 链时,你需要做的其实就是生成一个函数将一堆 Observers 链接在一起,然后让真正的数据依次穿过它们。

    Rx.Observable.create

    var observable = Rx.Observable
        .create(function(observer) {
            observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
            observer.next('Lolo');
        });
        
    // 订阅这个 Observable    
    observable.subscribe(function(value) {
        console.log(value);
    });

    以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。

    需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

    var observable = Rx.Observable
        .create(function(observer) {
            observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
            observer.next('Lolo');
        });
        
    console.log('start');
    observable.subscribe(function(value) {
        console.log(value);
    });
    console.log('end');

    以上代码运行后,控制台的输出结果:

    start
    Semlinker
    Lolo
    end

    当然我们也可以用它处理异步行为:

    var observable = Rx.Observable
        .create(function(observer) {
            observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext
            observer.next('Lolo');
            
            setTimeout(() => {
                observer.next('RxJS Observable');
            }, 300);
        })
        
    console.log('start');
    observable.subscribe(function(value) {
        console.log(value);
    });
    console.log('end');

    以上代码运行后,控制台的输出结果:

    start
    Semlinker
    Lolo
    end
    RxJS Observable

    从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。

    Observable - Creation Operator

    RxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:

    • create
    • of
    • from
    • fromEvent
    • fromPromise
    • empty
    • never
    • throw
    • interval
    • timer

    上面的例子中,我们已经使用过了 create 操作符,接下来我们来看一下其它的操作符:

    of

    var source = Rx.Observable.of('Semlinker', 'Lolo');
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log(error);
        }
    });

    以上代码运行后,控制台的输出结果:

    Semlinker
    Lolo
    complete!

    from

    var arr = [1, 2, 3];
    var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路"
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log(error);
        }
    });

    以上代码运行后,控制台的输出结果:

    1
    2
    3
    complete!

    fromEvent

    Rx.Observable.fromEvent(document.querySelector('button'), 'click');

    fromPromise

    var source = Rx.Observable
      .fromPromise(new Promise((resolve, reject) => {
        setTimeout(() => {
          resolve('Hello RxJS!');
        },3000)
    }));
      
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log(error);
        }
    });

    以上代码运行后,控制台的输出结果:

    Hello RxJS!
    complete!

    empty

    var source = Rx.Observable.empty();
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log(error);
        }
    });

    以上代码运行后,控制台的输出结果:

    complete!

    empty 操作符返回一个空的 Observable 对象,如果我们订阅该对象,它会立即返回 complete 信息。

    never

    var source = Rx.Observable.never();
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log(error);
        }
    });

    never 操作符会返回一个无穷的 Observable,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable 对象。

    throw

    var source = Rx.Observable.throw('Oop!');
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log('Throw Error: ' + error);
        }
    });

    以上代码运行后,控制台的输出结果:

    Throw Error: Oop!

    throw 操作如,只做一件事就是抛出异常。

    interval

    var source = Rx.Observable.interval(1000);
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log('Throw Error: ' + error);
        }
    });

    以上代码运行后,控制台的输出结果:

    0
    1
    2
    ...

    interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。

    timer

    var source = Rx.Observable.timer(1000, 5000);
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log('Throw Error: ' + error);
        }
    });

    以上代码运行后,控制台的输出结果:

    0 # 1s后
    1 # 5s后
    2 # 5s后
    ...

    timer 操作符支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 操作符也可以只传递一个参数,具体如下:

    var source = Rx.Observable.timer(1000);
    
    source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log('Throw Error: ' + error);
        }
    });

    以上代码运行后,控制台的输出结果:

    0
    complete!

    Subscription

    有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 Subscription 对象的 unsubscribe 方法来释放资源。具体示例如下:

    var source = Rx.Observable.timer(1000, 1000);
    
    // 取得subscription对象
    var subscription = source.subscribe({
        next: function(value) {
            console.log(value);
        },
        complete: function() {
            console.log('complete!');
        },
        error: function(error) {
            console.log('Throw Error: ' + error);
        }
    });
    
    setTimeout(() => {
        subscription.unsubscribe();
    }, 5000);

    RxJS - Observer

    Observer (观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。

    Observer 接口定义

    interface Observer<T> {
      closed?: boolean; // 标识是否已经取消对Observable对象的订阅
      next: (value: T) => void;
      error: (err: any) => void;
      complete: () => void;
    }

    Observer 中的三个方法的作用:

    • next - 每当 Observable 发送新值的时候,next 方法会被调用
    • error - 当 Observable 内发生错误时,error 方法就会被调用
    • complete - 当 Observable 数据终止后,complete 方法会被调用。在调用 complete 方法之后,next 方法就不会再次被调用

    接下来我们来看个具体示例:

    var observable = Rx.Observable
        .create(function(observer) {
                observer.next('Semlinker');
                observer.next('Lolo');
                observer.complete();
                observer.next('not work');
        });
        
    // 创建一个观察者
    var observer = {
        next: function(value) {
            console.log(value);
        },
        error: function(error) {
            console.log(error);
        },
        complete: function() {
            console.log('complete');
        }
    }
    
    // 订阅已创建的observable对象
    observable.subscribe(observer);

    以上代码运行后,控制台的输出结果:

    Semlinker
    Lolo
    complete

    上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 not work

    另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下:

    var observer = {
        next: function(value) {
            console.log(value);
        }
    };

    有时候 Observable 可能是一个无限的序列,例如 click 事件,对于这种场景,complete 方法就永远不会被调用。

    我们也可以在调用 Observable 对象的 subscribe 方法时,依次传入 next、error、complete 三个函数,来创建观察者:

    observable.subscribe(
        value => { console.log(value); },
        error => { console.log('Error: ', error); },
        () => { console.log('complete'); }
    );

    Pull vs Push

    Pull 和 Push 是数据生产者和数据的消费者两种不同的交流方式。

    什么是Pull?

    在 "拉" 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。

    每一个 JavaScript 函数都是一个 "拉" 体系,函数是数据的生产者,调用函数的代码通过 ''拉出" 一个单一的返回值来消费该数据。

    const add = (a, b) => a + b;
    let sum = add(3, 4);

    ES6介绍了 iterator迭代器Generator生成器 — 另一种 "拉" 体系,调用 iterator.next() 的代码是消费者,可从中拉取多个值

    什么是Push?

    在 "推" 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。

    Promise(承诺) 是当今 JS 中最常见的 "推" 体系,一个Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者),但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。

    RxJS 引入了 Observables (可观察对象),一个全新的 "推" 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。

    生产者 消费者
    pull拉 被请求的时候产生数据 决定何时请求数据
    push推 按自己的节奏生产数据 对接收的数据进行处理

    接下来我们来看张图,从而加深对上面概念的理解:

    图片描述

    Observable vs Promise

    Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

    MagicQ 单值 多值
    拉取(Pull) 函数 遍历器
    推送(Push) Promise Observable
    • Promise

      • 返回单个值
      • 不可取消的
    • Observable

      • 随着时间的推移发出多个值
      • 可以取消的
      • 支持 map、filter、reduce 等操作符
      • 延迟执行,当订阅的时候才会开始执行

    延迟计算 & 渐进式取值

    延迟计算

    所有的 Observable 对象一定会等到订阅后,才开始执行,如果没有订阅就不会执行。

    var source = Rx.Observable.from([1,2,3,4,5]);
    var example = source.map(x => x + 1);

    上面的示例中,因为 example 对象还未被订阅,所以不会进行运算。这跟数组不一样,具体如下:

    var source = [1,2,3,4,5];
    var example = source.map(x => x + 1); 

    以上代码运行后,example 中就包含已运算后的值。

    渐进式取值

    数组中的操作符如:filter、map 每次都会完整执行并返回一个新的数组,才会继续下一步运算。具体示例如下:

    var source = [1,2,3,4,5];
    var example = source
                    .filter(x => x % 2 === 0) // [2, 4]
                      .map(x => x + 1) // [3, 5]

    关于数组中的 mapfilter 的详细信息,可以参考 - RxJS Functional Programming

    为了更好地理解数组操作符的运算过程,我们可以参考下图:

    图片描述
    查看原图

    虽然 Observable 运算符每次都会返回一个新的 Observable 对象,但每个元素都是渐进式获取的,且每个元素都会经过操作符链的运算后才输出,而不会像数组那样,每个阶段都得完整运算。具体示例如下:

    var source = Rx.Observable.from([1,2,3,4,5]);
    var example = source
                  .filter(x => x % 2 === 0)
                  .map(x => x + 1)
    
    example.subscribe(console.log);

    以上代码的执行过程如下:

    • source 发出 1,执行 filter 过滤操作,返回 false,该值被过滤掉
    • source 发出 2,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 3,最后通过 console.log 输出
    • source 发出 3,执行 filter 过滤操作,返回 false,该值被过滤掉
    • source 发出 4,执行 filter 过滤操作,返回 true,该值被保留,接着执行 map 操作,值被处理成 5,最后通过 console.log 输出
    • source 发出 5,执行 filter 过滤操作,返回 false,该值被过滤掉

    为了更好地理解 Observable 操作符的运算过程,我们可以参考下图:

    图片描述
    查看原图

    学习资源

    参考资源


    展开全文
  • Observable 之冷和热

    2019-05-19 03:07:34
    本文讲的是Observable 之冷和热, 简单来说:如果不想重复创建生产者(producer),你需要使用热 Observable 冷:Observable 自行创建生产者 // COLD var cold = new Observable((observer) => { var producer...
    本文讲的是Observable 之冷和热,

    简单来说:如果不想重复创建生产者(producer),你需要使用热 Observable

    冷:Observable 自行创建生产者

    // COLD
    var cold = new Observable((observer) => {
      var producer = new Producer();
      // have observer listen to producer here
    });
    

    热:Observable 使用已存在的生产者

    // HOT
    var producer = new Producer();
    var hot = new Observable((observer) => {
      // have observer listen to producer here
    });
    

    深入解析

    我上篇文章通过自行实现学习 Observable 阐述了 Observable 是种函数。虽旨在揭开 Observable 的神秘外衣,但并没有触及其最令人困惑的部分:“冷”和“热”的概念。

    Observable 只是函数!

    Observable 只是一个将观察者 (Observer) 连接到生产者的函数。意味着,它们并不需要自行创建生产者。只需要让一个观察者订阅生产者的消息,并提供一种取消监听的方式。这种订阅可通过像函数一样“调用” Observable,给它传递一个观察者。

    什么是“生产者”?

    生产者是 Observable 的数据源。可以是一个 websocket 连接、DOM 事件、迭代器或一个遍历某数组的操作。可以是你用来获取并向 observer.next(value) 传递值的任何东西。。

    冷 Observable:在内部创建生产者

    一个“冷”的 Observable 的生产者创建和激活发生在订阅期。就是说若将 observable 比作函数,那么生产者是在“调用函数”时创建和激活的。

    1. 创建生产者
    2. 激活生产者
    3. 开始监听生产者
    4. 单播

    下面例子是“冷”的,因为 WebSocket 连接是在订阅回调“内部”被创建和监听的,而订阅回调函数只有在订阅 Observable 时才会被执行。

    const source = new Observable((observer) => {
      const socket = new WebSocket('ws://someurl');
      socket.addEventListener('message', (e) => observer.next(e));
      return () => socket.close();
    });
    

    上述source的所有订阅者都会有一个自己的 WebSocket,取消订阅时用close()将其关闭。因此该数据源是真正的单播,因为其生产者只向一个观察者发送值。此 JSBin 例子说明了此概念

    热 Observable:在外部创建生产者

    热 Observable 的生产者在订阅回调函数外被创建或激活(备注1)。

    1. 共享一个生产者的引用
    2. 监听生产者
    3. 组播(multicast)(备注2)

    若我们改变一下之前的例子,把 WebSocket 的创建移到 Observable 外,就是个“热” Observable:

    const socket = new WebSocket('ws://someurl');
    
    const source = new Observable((observer) => {
      socket.addEventListener('message', (e) => observer.next(e));
    });
    

    source的所有订阅者共享一个 WebSocket 实例,该 socket 的消息会组播给所有订阅者。但这引入一个小问题:我们没法用 observable 承载销毁该 socket 的逻辑。无论出错、完成,还是取消订阅,都不会关闭该连接。我们做的只是把“冷” Observable 变“热”此 JSBin 例子说明了此概念

    为什么需要热 Observable?

    在第一个冷 Observable 的例子里你可以看见,一直保有所有的冷 Observable 实例可能会有问题。首先,如果你需要订阅这个 observable 多次,而这个 observable 会创建类似于 WebSocket 这样的,占用如网络连接般稀缺资源的实例,你肯定不希望创建多个连接。而实际上,我们很容易忽略订阅多次的事实。例如当你需要过滤出 socket 消息值的奇/偶数序列,在此场景下你会创建两个订阅:

    source.filter(x => x % 2 === 0)
      .subscribe(x => console.log('even', x));
    
    source.filter(x => x % 2 === 1)
      .subscribe(x => console.log('odd', x));
    

    Rx Subjects

    在我们把 Observable 从冷转热之前,需要介绍一种新类型:Rx Subject,它有以下特性:

    1. 它是一个 Observable, 包含了 Observable 的所有操作方法。
    2. 它是一个 Observer, 通过 duck-typing 实现了一些长得和 Observer 相似的接口。当被像 Observable 订阅时,会发出你使用类似 Observer 的 next 方法传入的值。
    3. 支持组播。通过 subscribe() 传入的所有观察者会被加入一个内部的观察者列表里保存。
    4. 结束状态明确。在取消订阅、完成或出错之后就无法再被使用。
    5. 可以对自己传值。补充下第 2 条,使用 next 对其传值,会触发它的 Observable 相关回调。

    Rx Subject 的名字得于第 3 条特性,“Subject” 在 Gang of Four(译者注:经典《设计模式》的几位作者)的观察者模式中,是实现了 addObserver 方法的类。在我们的例子中,addObserver 就是 subscribe一个展示 Rx Subject 行为的 JSBin 例子

    把 Observable 从冷变热

    有了 Rx Subject 的加持,我们可以用上一点函数式编程让 Observable 从冷转热:

    function makeHot(cold) {
      const subject = new Subject();
      cold.subscribe(subject);
      return new Observable((observer) => subject.subscribe(observer));
    }
    

    makeHot 函数接受一个冷的 Observable cold,创建一个 subject 订阅 cold 的消息,最后该函数返回一个热 Observable, 它的生产者为 subject一个 JSBin 示例

    不过还有一个小问题,我们没有直接订阅数据源,如果想取消订阅,该怎么做呢?可以用引用计数解决:

    function makeHotRefCounted(cold) {
      const subject = new Subject();
      const mainSub = cold.subscribe(subject);
      let refs = 0;
      return new Observable((observer) => {
        refs++;
        let sub = subject.subscribe(observer);
        return () => {
          refs--;
          if (refs === 0) mainSub.unsubscribe();
          sub.unsubscribe();
        };
      });
    }
    

    现在我们有一个热 Observable,且当其所有订阅取消了,用来计数的 refs 变为 0 时,便可以取消对原先冷 Observable 的订阅。一个 JSBin 例子

    在 RxJS 里使用 publish() 或 share()

    你也许不该使用类似于上面 makeHot 这样的函数,而应该使用 publish() 或 share() 这样的函数 Observable 转热的途径,在 Rx 里有高效简洁的方式。为说明使用多种 Rx 操作符(译者注:operator,之后都作此翻译)来做这件事情,能专门写一篇文章,不过这不是本文的目的。真正的目的在于加强对“冷”“热”之分的理解。

    在 RxJS 5 里,share() 操作符创建一个有引用计数的热 Observable,且可以在失败时重试,或在成功时重复执行。因为 Subject 在出错、完成或取消订阅后便不能再被重用,share() 操作符会更新重建已结束的 Subject,从而使得返回的 Observable 能够被再次订阅。

    一个在 RxJS 5 里使用 share() 创建热数据源的 JSBin 例子,也展示了重试的方法

    “温” Observable

    看完如上所述,能知道 Observable 虽然 “只是函数”,却能有冷热之分。它还能监听两个生产者?一个由它创建,一个由它关闭?有点像不良的小伎俩,非其不用的场景并不多。例如多路 socket 数据源,共享一个 socket 连接,但分别有自己的数据订阅和过滤机制。

    冷和热都只和生产者有关

    如果在 Observable 内操作一个共享的生产者,是“热”的。而在 Observable 内部创建生产者,是“冷”的。那假如你二者皆有,是什么?我猜它是“温”的。

    备注

    1. 说生产者在订阅回调内部被“激活”,而不是在之后某合适时机被“创建”,可能有点奇怪,不过通过代理(proxy),的确是可以的。通常“热” Observable 的生产者在订阅回调外部被创建和激活。

    2. 热 Observable 通常是组播的,虽说它也许对应的是一个只支持单个监听回调的生产者。在此处说它是“组播”的,可能不是完全准确。





    原文发布时间为:2017年6月26日

    本文来自云栖社区合作伙伴掘金,了解相关信息可以关注掘金网站。
    展开全文
  • Observable 在开始讲服务之前,我们先来看一下一个新东西——Observable(可观察对象),是属于RxJS库里面的一个对象,可以用来处理异步事件,例如HTTP请求(实际上,在Angular中,所有的HTTP请求返回的都是Obse...

    如果有任何的非技术障碍,比如如何新建Angular项目,请先到我的"Angular7入门辅助教程"专栏参考这篇博客:Angular7入门辅助教程——前篇

    Observable

    在开始讲服务之前,我们先来看一下一个新东西——Observable(可观察对象),是属于RxJS库里面的一个对象,可以用来处理异步事件,例如HTTP请求(实际上,在Angular中,所有的HTTP请求返回的都是Observable),或许,你以前接触过一个叫promise的东西,它们本质上面是相同的:都是生产者主动向消费者“push”产品,而消费者是被动接收的,但是他们两者还是有很大区别的:Observable可以发送任意多值,并且,在被订阅之前,它是不会执行的!这是promise不具备的特点,下面砸门来详细了解一下Observable

    心法篇

    • Observable用于在发送方和接收方之间传输消息,为了更好地理解,你可以将这些消息看成是
    • 在创建Observable对象时,需要传入一个函数作为构造函数的参数,这个函数叫订阅者函数,这个函数也就是生产者向消费者推送消息的地方
    • 在被消费者subscribe(订阅)之前,订阅者函数不会被执行,直到subscribe()函数被调用,该函数返回一个subscription对象,里面有一个unsubscribe()函数,消费者可以随时拒绝消息的接收!
    • subscribe()函数接收一个observer(观察者)对象作为入参
    • 消息的发送可以是同步的,也可以是异步
    • 我们可以使用一系列的RxJS操作符,在这些消息被接收方接收之前,对它们进行一系列的处理、转换,因为这些操作符都是纯函数没有副作用,可以放心使用,并不会产生期望之外的结果!

    详细教程篇

    1、observer(观察者)

    有了可观察对象(发送方),怎么少得了observer(观察者)来观察可观察对象呢!observer是一个对象,其中包含三个属性:next,error,complete,它们都是函数

    • next:以接收的值作为入参,在正常情况下执行,可选
    • error:出错的情况下执行,可选
    • complete:传输完成的情况下执行,可选

    当然,这都是由你自己代码决定的,(请继续往下阅读你就会明白)

    2、初识Observable

    我们先通过RxJS库中的(of)方法来创建一个Observable,而不是通过构造函数来创建(下一小节的内容)

    在vscode中新建项目(文件夹),取名testRxJS,在该目录下新建一个.ts文件,取名teaching1.0.ts,内容如下

    import { of } from "rxjs";
    
    function f1() {
        let observable = of(1, 2, 3);
        observable.subscribe({ next: num => console.log(num) });
    }
    f1();

     切换到vscode的命令行(ctrl+~),cd到本项目所在的文件夹,输入tsc .\teaching1.0.ts,然后输入node .\teaching.1.0.js,可以看到控制台有以下输出

    代码解析

     (注意,如果上面关于怎样运行ts程序,或者vscode操作你不熟悉的话,请先阅读vscode中使用TypeScript,以及vscode一些常用的快捷键,特别是箭头函数,如果你不熟悉的话,后面的内容将无法进行!)

    • RxJS中的of方法用于创建一个Observable,它会将其参数一个一个的发送给接收方,正如这里所看到的,它将1,2,3分别发送
    • subscribe()函数中接受一个observer对象,但这里,只定义了next方法,可以发现next方法接受一个参数,而这个参数就是生产者发送过来的值!然后将其打印在控制台上,(至于为什么这个值就是生产者发送的值,请继续阅读,其实说白了,是由你代码决定的,在下一小节,我们将会使用Observable的构造函数来定义一个Observable,并自己定义订阅者函数

    3、订阅者函数

    上面说了很多这样的句子:请继续阅读,那么,在这一小节,我将揭晓谜底

    同目录下新建一个teaching1.1.ts文件(注意,这篇博客牵涉的所有代码都在该目录(testRxJS)下,后面就不多讲了),内容如下

    function f2() {
        const observable = Observable.create(observer => {
            try {
                observer.next(1);
                observer.next(2);
                observer.next(3);
            } catch (e) {
                observer.error(e);
            }
            observer.complete();
        });
        const observer = {
            next: num => console.log(num),
            error: e => console.log(e),
            complete: () => console.log('complete!!!')
        }
        observable.subscribe(observer);
    }
    f2();

    用上面同样的方式运行,结果如下

    可以发现,这运行效果同用of创建的Observable是一样的,只不过这里我还为observer定义了complete方法,所以它多输出了“complete!!!”。或许你也猜到了,这段代码就是of的内幕!!(注意,肯定不是of的源代码啊!),而且,如果出了错,就会被catch到,从而执行observer的error方法,通过这一小节,是不是对Observable有了更清晰的认识——原来都是有你的代码决定的,没有想象中的那么神秘

    4、subscribe()(订阅)

    在被消费者subscribe(订阅)之前,订阅者函数不会被执行,直到subscribe()函数被调用,该函数返回一个subscription对象,里面有一个unsubscribe()函数,消费者可以随时拒绝消息的接收!也就是说:在Observable调用subscribe函数之前,什么也不会发生,就像下面这段代码,控制台什么输出内容都没有

    function f1() {
        let observable = of(1, 2, 3);
    }
    f1();

    直到你订阅这个observable对象,像下面这样

    function f1() {
        let observable = of(1, 2, 3);
        observable.subscribe({ next: num => console.log(num) });
    }
    f1();

    4、异步发送消息

    其实,可以发现,上面一下节生产者发送消息的方式是同步的!这一小节,我们来个异步发送消息(等待2秒,再向消费者发送数字4),代码如下

    function f2() {
        const observable = Observable.create(observer => {
            try {
                let time = 0;
                observer.next(1);
                observer.next(2);
                observer.next(3);
                const intervalId = setInterval(() => {
                    console.log(`wait ${++time}s`);
                }, 900)
                setTimeout(() => { observer.next(4); clearInterval(intervalId) }, 2000);
            } catch (e) {
                observer.error(e);
            }
            // observer.complete(); // 注意不能立即调用complete函数,不然会终止消息传输
            setTimeout(() => observer.complete(), 3000)
        });
        const observer = {
            next: num => console.log(num),
            error: e => console.log(e),
            complete: () => console.log('complete!!!')
        }
        observable.subscribe(observer);
    }
    f2();

    运行结果如下

    代码解析

    这里的代码可能有点复杂,但也是有基本元素组成:定时器箭头函数,只要了解这两个知识点,这段代码应该就没有难度,但需要注意下面几点

    • complete不能立即调用,因为数字4的传输实在2秒后,如果你立即调用complete函数,就会中断传输,数字4也就接收不到了
    • 注意到setInterval中的时间间隔并不是1000ms,我把它写成了900ms,可能是一些微小的时间延迟的原因,写成1000ms并不会产生期望的结果
    • 最后产生的结果就是:数字4被异步发送了!!!

    5、初识unsubscribe()函数

    在这一小节,我们直接调用unsubscribe()函数,只为看看效果,下一小节,我们将自己写一个subscribe函数,并返回一个包含unsubscribe函数的对象,用于中断信息传输

    function f3() {
        const obs = Observable.create(observer => {
            observer.next(1);
            setTimeout(() => observer.next(2), 2000); // 等待两秒才发送下一个值
        });
    
        let suber = obs.subscribe({
            next: x => console.log("接收到:", x),
        });
    
        setTimeout(() => suber.unsubscribe(), 1000); // 在一秒后取消订阅
    }
    f3();

    运行结果如下

    可以发现,接收方只接收到了数字1,

    代码解析

    • 因为数字2 在两秒钟后才会被发送,而消费者在1秒钟后便中断了消息传输,所以,控制台上面只打印了数字1

    6、自定义subscribe()

    看到这里,我想你应该也有些想法了,前面说过,Observable的subscribe函数返回一个subscription对象,其中有一个unsubscribe函数,用于取消订阅,你应该对subscribe函数的内部实现有着很多想法,现在,我们试着自己写一个!(在这里,我们就不需要Observable对象了,我们使用函数调用来模拟订阅),代码如下

    function f4() {
        function subscribe(observer) {
            var intervalID = setInterval(() => {
                observer.next('launch.....');
            }, 1000);
    
            return {
                unsubscribe: () => clearInterval(intervalID)
            }
        }
    
        var subscription = subscribe({ next: (x) => console.log(x) });
        setTimeout(() => subscription.unsubscribe(), 5000);
    }
    f4();

    运行结果如下

    代码解析

    • 这段代码的意思是:每隔1秒向控制台打印“launch....”,在第5秒的时候,取消打印(模拟取消订阅的过程)
    • 可以发现,这个subscribe函数返回了一个对象!该对象中有一个unsubscribe函数!
    • 通过这段代码,我们就模拟了Observable的subscribe函数的内部实现(仅仅是模拟而已)

    7、使用RxJS中的操作符

    心法篇的时候,我就讲过这些操作符的重要作用以及优点,实际上,在开发实际应用的时候,你离不开它,在这里,我只举几个简单的例子,大概了解一下他们的用法就好

    • 使用单个操作符

    在这里,以map这个操作符为例,它可以接收可观察对象发送的值,并将其转换另外的形式,并以一个可观察对象发送这些新值,现在,我们来写一段有趣的代码:将原来的可观察对象发送的值全部转换为“hello world”,并订阅这个操作符返回的新的可观察对象,并输出值,代码如下

    function f5() {
        const observable = of(1, 2, 3);
        const opt = map(num => 'hello world');
        const newObservable = opt(observable);
        newObservable.subscribe(data => console.log(data));
    }
    f5();

    运行结果如下

    可以发现map将1,2,3转换成了三个hello world并输出在控制台上,你也可以发现,这种方式代码结构并不是很清晰(特别是在有多个操作符的情况下),实际上,并不推荐这种写法,下面我们来看看另外一种更好的写法

    • 使用多个操作符

    我们可以使用管道(pipe),将多个操作符链接起来,并将操作符返回的结果组合成一个,这样,代码结构更清晰,在这里,我将把map、tap这两个操作符链接起来,使得,你既可以看到原可观察对象发送的值,你也可以看到转换后的结果

    tip:tap操作符,可以理解为窥探,就是“不打扰”可观察对象发送的值,但是又可以取得这些值进行处理,常见的用法就是——调试功能

    function f6() {
        const observable = of(1, 2, 3);
        const newObservable = observable.pipe(
            tap(num => console.log(num)),
            map(num => 'hello world')
        );
        newObservable.subscribe(data => console.log(data));
    }
    f6();

    运行结果如下

    这些写代码是不是很清晰呀!

    问题篇

    请问你是否可以猜到利用管道使用多个操作符的内幕?或者说,你可以自己写一个pipe函数,来实现操作符的功能吗?

    更新中。。。

     

    展开全文
  • Observable入门

    千次阅读 2018-05-24 09:35:16
    1. 什么是Observable和 Observer?1.1 ObservableObservable是能向观察它的人(或外界)在一定时间段内推送值的实体。Observable实体可能会一直推送下去,也可能在某时间点结束,也可能在某时间点抛出error故...

    1.  什么是Observable和 Observer?

    1.1 Observable

    Observable是能向观察它的人(或外界)在一定时间段内推送值的实体。

    Observable实体可能会一直推送下去,也可能在某时间点结束,也可能在某时间点抛出error

    故Observable可能的行为为:

    向observer推送值,告诉observer结束,告诉observer有异常(error)

     

    例子:

    Observale 实体:

    第1步向观察它的实体(observer)推送值1

    第2步向观察它的实体(observer)推送值2

    第3步向观察它的实体(observer)推送值3

    然后 等待10分钟

    第4步向observer推送值4

    第5步告诉observer结束了

     

    实现为编程语言为:

    observer.next(1);

    observer.next(2);

    observer.next(3);

    setTimeout(()=>{

               observer.next(4);

               observer.complete();

    }, 1000*60*10);

     

    1.2 Observer

    是观察Observable的行为后,对其行为进行响应的实体。

    做出的响应有三类:

    对于推送值的响应

    对于结束的响应

    对于observable抛出的error的响应

     

    2.  http请求的Observable的例子

    谁是observable?

    HTTP的response是一个observable:它对观察它的实体在timeout时间内

    推送值或抛出异常

     

    谁是observer?

    就是得到response后,做处理。

    对推送值做处理 或者是 对error做处理

     

    3.  如何生成Observable

    3.1 自己把observable的动作序列编程实现,比如第一个例子

    3.2 已有的返回observable的服务

    比如http的get

     

    3.3 rxjs库里,提供构造简单的observable实体的方法。

    一个简单的observable是一个接一个的推送值,供推送n次

    Rxjs提供的方法有:

    Observable.of(x1,x2,…,xn);

    或者

    Observable.from([x1,x2,…,xn]);

     

    4.  Observable的算符

    【Old Observable】-- operator –> 【New Observable】

     

    Map filter reduce pipe

    4.1 map最简单

    Observablex.map(x => 2*x) 则之前推送为10时,经过算符,推送20

    4.2 filter也不难

    Observablex.filter(x => x%2 === 0) 则推送是偶数时,接着推送;推送是奇数时,则不推送

     

    4.3 todo


    5.  理解Angular中对参数的操作

    ngOnInit(){

    this.hero$ =this.route.paramMap.pipe( switchMap((params:ParamMap)=>this.service.getHero(params.get('id'))));

    }

    注意:ParamMap类型不是Observable; this.route.paramMap返回的是Observable<ParamMap>

    根本目的是 想做一个map,使得输入的每一个 params得到对应的 Hero。

    即 params:ParamMap => Hero

    但是,因为this.service.getHero 返回的是 Observable<Hero>

    所以 需要把返回的flat化。

    所以 当 函数的输入输出类型是  ClassA => ClassB 时,使用map

    当 函数的输入输出类型是 ClassA=> Observable<ClassB>时,使用 switchMap


    展开全文
  • 2019独角兽企业重金招聘Python工程师标准>>> ...
  • Observable

    2019-06-11 19:06:37
    ②订阅 ObservableObservable 事件监听 ④Observable 的Dispose ①可被监听的序列-概念 ❶Observable理解 Observable<T> 类用于描述元素异步产生的序列,既可观察序列,是 Rx 框架的基础 作用是:异步...
  • 什么是Observable 简单讲就是一个可观察对象,创建可观察对象后,就可以订阅subscribe这个对象,当可观察对象发送消息时,即可收到消息,然后去做一些事情.典型的观察者模式. 怎么实现 定义Observable类 class ...
  • Observable subscribe流程介绍

    千次阅读 2019-06-24 11:27:28
    转载请以链接形式标明出处: ...我们直接看Observable的subscribe方法 public final Disposable subscribe() { return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Funct...
  • observable()方法

    千次阅读 2017-09-28 17:18:58
    observable()方法,用于设置监控属性,这样就可以监控viewModule中的属性值的变化,从而就可以动态的改变某个元素中的值 viewModule对象的形式有两种,一种可以是var obj={};的形式,另一种是通过new 实例化的对象,...
  • RxJava----操作符:组合操作符

    千次阅读 2016-06-16 18:04:56
    本节介绍如何把多个数据源的数据组合为一个数据源的操作函数。 Concatenation 一个数据流发射完后继续发射下一个数据流是一种很常见的组合方法。...public static final <T> Observable<T> concat
  • Observable的创建

    千次阅读 2018-03-28 17:29:24
    Observable的创建1.使用create( ),最基本的创建方式:normalObservable = Observable.create(new Observable.OnSubscribe&lt;String&gt;() { @Override public void call(Subscriber&lt;? super ...
  • Observable.interval定时执行

    千次阅读 2019-10-22 17:37:03
    Observable.interval方法可以定时执行自定义的逻辑,常用方法如下: /** * 定时执行 * * @param period 每次执行的间隔的时间 * @param unit 时间单位 * @return Observable对象 */ public static ...
  • Android RxJava操作符一览

    千次阅读 2018-12-18 07:55:51
    直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤...
  • Android RxJava使用介绍(四) RxJava的操作符

    万次阅读 多人点赞 2015-07-18 23:56:02
    本篇文章继续介绍以下类型的操作符 Combining Observables(Observable的组合...Combining Observables(Observable的组合操作符)combineLatest操作符combineLatest操作符把两个Observable产生的结果进行合并,合并的结果
  • 以下的创建方法不需要手动再重新调用Subscriber中的方法 一、just 创建发送指定值的Observerble,just只是简单的原样发射,将数组或Iterable当做... Observable myObservable = Observable.just("just1","just1",
  • Rxjs的zip和switchMap方法

    千次阅读 2018-07-11 09:42:40
    1,Zip将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。如果最后一个参数是函数, 这个函数被用来计算最终发出的值.否则, 返回一个顺序包含所有...
  • RxJava操作符大全

    万次阅读 多人点赞 2016-09-13 16:55:17
    再也不用担心记不住RxJava中的操作符了,RxJava操作符大全,需要用时Ctrl+F一下,还有使用例子哦。
  • RxJava 合并组合两个(或多个)Observable数据源

    万次阅读 多人点赞 2017-02-07 12:48:05
    关键词:合并 Observable前言在RxJava中, 同时处理多个Observables是很常见的一种操作。下面我们简单分析下几个组合类的操作符。Merge在异步的世界经常会创建这样的场景,我们有多个来源但是只想有一个结果:多输入...
  • RxJava操作符(08-条件和布尔操作)

    万次阅读 2019-08-12 18:09:24
    转载请标明出处: ...本文出自:【openXu的博客】目录:All Amb Contains DefaultIfEmpty SequenceEqual SkipUntil SkipWhile TakeUntil TakeWhile 源码下载1. All  判定是否Observable发射的所有
  • Angular2 之 Promise vs Observable

    千次阅读 2017-05-27 20:34:35
    如果看成状态机 Promise 具有 3 个状态:pending、resolved、rejected...Observable 有 N + 3 个状态:idle、pending、resolved_0、resolved_1 … resolved_N、completed 和 error。总结:相比于 Promise 这个有限状态
1 2 3 4 5 ... 20
收藏数 42,055
精华内容 16,822
关键字:

observable