精华内容
下载资源
问答
  • 这个 axios-observable 的 API 和 axios 的 API 几乎一样,让你平滑过渡。 因此文档反映了 axios 之一(将指出一些例外情况)。 特征 从浏览器发出 从 node.js 发出请求 支持Observable API 拦截请求和响应 转换...
  • zen-observable-ts 和瘦包装器,支持 ESM 导出和 CommonJS 导出,TypeScript 类型由@types/zen-observable 。 用法 使用或等工具安装zen-observable-ts ,您可以按名称导入Observable类: import { Observable } ...
  • npm install --save react-reducer-observable 用法 像redux-observable一样,该库围绕史诗的思想。 您可以阅读更多有关史诗的构建方式及其核心概念的。 要创建您的上下文,请传入您的化简器,史诗和依赖项。 const...
  • npm install --save redux-observable UMD 我们在npm软件包中发布了一个UMD版本。 您可以通过 CDN使用它: 观看介绍 文献资料 讨论 欢迎大家进入我们的! 自定义表情符号 保存此: 将可观看redux的旋转徽标添加到...
  • Observable Store是一个前端状态管理库,它提供了一种简单而强大的方法来管理前端应用程序中的状态。 前端状态管理已经变得如此复杂,以至于我们许多人花费在开发状态管理代码上的时间要多于其他应用程序。 ...
  • 用于RxJS Observable的并发模式安全React钩子。 简单,灵活,可测试且性能卓越。 React和RxJS的无缝集成。 并发模式安全。 可观察对象的道具,状态和上下文。 可观察到的状态和道具事件。 使用React组件流进行...
  • Eventful是Ruby的Observable模块之上的一个小扩展,它实现了命名事件,块侦听器和事件冒泡。 它提供比Observable通常允许的更加灵活的事件处理行为,后者通常要求侦听器是实现update对象,并且没有提供基于事件类型...
  • observable-jekyll:教程:如何将ObservableHQ笔记本嵌入到CMS中
  • 大家都是在Reactive Extensions for Javascript诞生于几年前,随着angular2正式版的发布,它将会被更多开发者所认知。这篇文章我们来详细介绍RxJs提供的Observable对象,有需要的朋友们可以参考借鉴。
  • Observable Slim最初是 JS框架的一部分,在该框架中它辅助状态管理,状态突变触发器和单向数据绑定。 可观察到的Slim与Nimbly分离开来,以便为框架范围之外的其他用例提供服务。 安装 < script src =" ...
  • vue-stator Vue.observable()在Vue.observable()使用Vue.observable() ,这意味着分配和数组更改将自动触发更新。 因此, vue-stator只能让您定义action ,而不是动作和突变。 为了使从Vuex的过渡变得容易,它为操作...
  • var observable = require ( 'observable' ) , obj = observable ( 1 ) , id ; console . log ( obj ( ) ) ; // 1 obj ( 2 ) ; console . log ( obj ( ) ) ; // 2 obj . sub ( function ( evt , val ) { console...
  • 主要介绍了angular 用Observable实现异步调用的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • $ npm install observable-to-promise 用法 const observableToPromise = require ( 'observable-to-promise' ) ; ( async ( ) => { const promise = observableToPromise ( Observable . of ( 1 , 2 ) ) ; ...
  • 该项目是通过引导的。 redux-observable-demo Github用户访存:
  • var Observable = require ( "../" ) ; var assert = require ( 'assert' ) ; var sinon = require ( 'sinon' ) ; describe ( "Observable" , function ( ) { var obsv , listener ; var clock ; beforeEach ( ...
  • import observable from "proxy-observable" ; const soldier = { name : "Titus Pullo" , age : 36 , inventory : observable ( { sword : "Dagger" , coins : 0 } ) , friends : observable ( [ "Gaius ...
  • Observable

    2018-11-11 10:57:13
    ②订阅 ObservableObservable 事件监听 ④Observable 的Dispose ①可被监听的序列-概念 ❶Observable理解 Observable<T> 类用于描述元素异步产生的序列,既可观察序列,是 Rx 框架的基础 作用是:异步...

    ①可被监听的序列-概念

    ❶Observable理解

    • Observable<T> 类用于描述元素异步产生的序列,既可观察序列,是 Rx 框架的基础

    • 作用是:异步地产生一系列的 Event(事件),即一个 Observable<T>对象会随着时间推移不定期地发出 event(element : T)

    • Event 还可以携带数据,它的泛型<T> 就是用来指定这个 Event 携带的数据的类型

    • 每一个Observable的实例都是一个序列

    • Observable序列相比于Swift序列的关键优势点在于它能够异步地接收元素

    • 有可观察序列,还需要一个 Observer(订阅者)来订阅它,这个订阅者才能收到 Observable<T>不时发出的 Event

    Observable(ObservableType)等效于Sequence
    observableType.subscribe(_:)方法等效于Sequence.makeIterator()
    ObservableType.subscribe(_:)接收一个观察者ObserverType参数,它将被订阅自动接收由可观察到的序列事件和元素,而不是在返回的生成器上手动调用next()
    • 如果一个Observable发出一个next事件(Event.next(Element)),它还能够继续发出更多的事件
    • 如果一个Observable发出一个error事件(Event.error(ErrorType))或者一个completed事件(Event.completed),那么这个Observable序列就不能给订阅者发送其他的事件了

    ❷创建序列

    框架已经创建好了许多常用的序列。例如:button的点击,textField的当前文本,switch的开关状态,slider的当前数值等等

    一些自定义的序列是需要自己创建的

    • 创建序列最直接的方法就是调用 Observable.create

    创建一个 [0, 1, ... 4] 的序列

    • 调用 Observable.create,在构建函数里面描述元素的产生过程

    • observer.onNext(0)就代表产生了一个元素,他的值是 0

    • 又产生 4个元素分别是 1, 2, ... 4

    • 最后,用 observer.onCompleted() 表示元素已经全部产生,没有更多元素了

    let numbers: Observable<Int> = Observable.create { observer -> Disposable in
          observer.onNext(0)  
          observer.onNext(1)
          observer.onNext(2)
          observer.onNext(3)
          observer.onNext(4)
          observer.onCompleted()
         return Disposeable.create()
      }
    复制代码

    可以用这种方式来封装功能组件,例如:闭包回调

    • 在闭包回调中,如果任务失败,就调用 observer.onError(error!)
    • 如果获取到目标元素,就调用 observer.onNext(jsonObject)
    • 由于这个序列只有一个元素,所以在成功获取到元素后,就直接调用 observer.onCompleted() 来表示任务结束
    • 最后 Disposables.create { task.cancel() } 说明如果数据绑定被清除(订阅被取消)的话,就取消网络请求。
    typealias JSON = Any
    let json: Observable<JSON> = Observable.create { (observer) -> Disposable in
        let task = URLSession.shared.dataTask(with: ...) { data, _, error in
            guard error == nil else {
                observer.onError(error!)
                return
            }
            guard let data = data,
                let jsonObject = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves)
                else {
                observer.onError(DataError.cantParseJSON)
                return
            }
            observer.onNext(jsonObject)
            observer.onCompleted()
        }
        task.resume()
        return Disposables.create { task.cancel() }
    }
    复制代码

    以上就将传统的闭包回调转换成序列。然后用 subscribe 方法来响应这个请求的结果

    json.subscribe(
        onNext: { json in
            print("取得 json 成功: \(json)")}, 
        onError: { error in
            print("取得 json 失败 Error: \(error.localizedDescription)")},
        onCompleted: {
            print("取得 json 任务成功完成")}
        )
        .disposed(by: disposeBag)
    复制代码

    ❸Event

    事件 Event 的定义

    • Event 就是一个枚举,一个 Observable 是可以发出 3 种不同类型的 Event 事件
    public enum Event<Element>{
        case next(Element)
        case error(Swift.Error)
        case completed
    }
    复制代码

    next

    • next 事件就是那个可以携带数据 的事件,可以说它就是一个“最正常”的事件
    1--2--3--->
    复制代码

    error

    • error 事件表示一个错误,它可以携带具体的错误内容,一旦 Observable 发出了 error event,则这个 Observable 就等于终止了,以后它再也不会发出 event 事件了
    -------1-----2------ x  
    复制代码

    completed

    • completed:completed 事件表示 Observable 发出的事件正常地结束了,跟 error 一样,一旦 Observable 发出了 completed event,则这个 Observable 就等于终止了,以后它再也不会发出 event 事件了
    ------1 ------2--------3-------|
    复制代码

    ❹Observable 与 Sequence比较

    • 可以把每一个 Observable 的实例想象成于一个 Swift 中的 Sequence

    • 即一个 Observable(ObservableType)相当于一个序列 Sequence(SequenceType)

    • ObservableType.subscribe(_:) 方法其实就相当于 SequenceType.generate()

    区别

    • Swift 中的 SequenceType 是同步的循环,而 Observable 是异步的
    • Observable 对象会在有任何 Event 时候,自动将 Event 作为一个参数通过 ObservableType.subscribe(_:) 发出,并不需要使用 next 方法

    ②订阅 Observable

    • 有了Observable,还要用 subscribe() 方法来订阅它,接收它发出的 Event

    订阅方法1

    ❶使用 subscribe() 订阅一个 Observable 对象

    • 该方法的 block 的回调参数就是被发出的 event 事件,将其直接打印出来
    let observable = Observable.of("A","B","C")
    observable.subscribe { event in
                         print(event)}
    //print
    next(A)
    next(B)
    next(C)
    completed
    复制代码

    ❷如果获取到事件里的数据,通过 event.element

    let observable = Observable.of("A","B","C")
    observable.subscribe{ event in
                        print(eventt.element)}????????????
    //print
    Optional("A")
    Optional("B")
    Optional("C")	q
    nil
    复制代码

    订阅方法2

    RxSwift还提供另一个subscribe 方法,它可以把 event 进行分类

    • 通过不同的 block 回调处理不同类型的 event(其中 onDisposed 表示订阅行为被 dispose 后的回调)
    • 同时会把 event 携带的数据直接解包出来作为参数,方便我们使用
    let observable = Observable("A","B","C")
    observable.subscribe(
        onNext:{ element in
                  print(element) },
        onError:{error in
                print(error)},
        onCompleted:{
            print("completed")},
        onDisposed:{
            print("disposed")
        }
    )
    //print
    A
    B
    C
    completed
    disposed
    复制代码
    • subscribe() 方法的 onNext、onError、onCompleted 和 onDisposed 这四个回调 block 参数都是有默认值的,即它们都是可选的。我们可以只处理 onNext 而不管其他的情况
    let observable = Observable.of("A","B","C")
    observable.subscribe(
       onNext:{element in
              print(element)}
    )
    //print
    A
    B
    C
    复制代码

    ③Observable 事件监听

    监听事件的生命周期

    doOn 介绍

    使用 doOn 方法来监听事件的生命周期,它会在每一次事件发送前被调用

    同时它和 subscribe 一样,可以通过不同的 block 回调处理不同类型的 event

    • do(onNext:) 方法就是在 subscribe(onNext:) 前调用
    • 而 do(onCompleted:) 方法则会在 subscribe(onCompleted:) 前面调用

    栗子

    let observable = Observable.of("A","B","C")
    observable.do(
                onNext: { element in 
                        print("Intercepted Next:", element)},
                onError: {error in
                           print("Intercepted Error:", error)},
                onCompleted: {
                     print("Intercepted Completed")
                },
                onDispose: {
                    print("Intercepted Disposed")
                }
                )
               .subscribe(
               onNext: {element in
                       print(element)},
                   onError: {error in
                            print(error)},
                   onCompleted:{
                       print("completed")},
                   onDisposed:{
                       print("disposed")
                   }
               )
    复制代码

    ④Observable 的Dispose

    ❶Observable 从创建到终结流程

    一个 Observable 序列被创建出来后它不会马上就开始被激活从而发出 Event,而是要等到它被某个人订阅了才会激活它 而Observable序列激活之后要一直等到它发出了 .error 或者 .completedevent后,它才被终结

    ❷dispose() 方法

    除了 dispose() 方法之外,更经常用 DisposeBag 的对象来管理多个订阅行为的销毁

    • 以把一个DisposeBag对象看成一个垃圾袋,把用过的订阅行为都放进去,感觉有点像OC释放池
    • 而这个DisposeBag就会在自己快要dealloc的时,对它里面的所有订阅行为都调用 dispose() 方法
    let disposeBag = DisposeBag()
    //第1个Observable,和订阅
    let observable1 = Observable.of("A","B","C")
    observable1.subscribe{ event in
                         print(event)}
               .disposed(by: disposeBag)
    //第2个Observable,和订阅
    let observable2 = Obserable.of(1,2,3)
    observable2.subscribe{event in
                         print(event)}
               .disposed(by: disposeBag)
    复制代码

    转载于:https://juejin.im/post/5be80ac4e51d457844615d24

    展开全文
  • 就会遇到多组件状态共享的情况, Vuex当然可以解决这类问题,不过就像 Vuex官方文档所说的,如果应用不够大,为避免代码繁琐冗余,最好不要使用它,今天我们介绍的是 vue.js 2.6 新增加的 Observable API ,通过使用...
  • 它是的子类型,这意味着它可以像observable-store一样使用,但也增加了localStorage持久性。 安装 npm install local-observable-store 如何使用 要求并初始化local-observable-store: var LocalStore = require ...
  • 这个例子也可以用 Observable.zip() 解决,我们在前面的例子中使用过。 然而,.groupBy 允许我们创建一系列带键的匿名 Observables,它们可以被平面映射和一起订阅,因此代码更清晰。 当您想要混合不同的 API 源时...
  • React可观察 使用React和Rxjs Redux学习项目 简单的审查和铲除报告,以供工作期间参考 [step.3]更改为Rxjs6后,可观察的运球处理整体上更改为Pipe。
  • 主要给大家介绍了关于RxSwift学习教程之Observable的相关资料,文中详细的给大家介绍了关于新建Observable、订阅Observable和取消订阅并消除内存泄漏等相关的内容,需要的朋友可以参考借鉴,下面来一起看看吧。
  • React可观察 类型 ReactObservableComponent = (props: 混合) => Observable
  • 重新定义 Observable Observable 的观点转变 这篇文章是对 Observable 是什么的观点的转变。 它讨论了 Observable 的当前模型并剖析了它的属性,以期定义它的核心是什么。 Observable 被为一个函数,它将值传播给...
  • ReactRedux-Observable 尝试在上更干净地实现该示例。 该项目是通过。
  • Observable ###基础 概述 一个观察者订阅(Observer)一个可观察对象(Observable)。观察者对观察对象发射的数据或数据序列做出响应 普通方法调用流程 调用某方法 用变量保存方法保存结果 使用变量新值继续操作 ...

    Observable

    基础

    概述

    一个观察者订阅(Observer)一个可观察对象(Observable)。观察者对观察对象发射的数据或数据序列做出响应

    普通方法调用流程

    1. 调用某方法
    2. 用变量保存方法保存结果
    3. 使用变量新值继续操作
    • 代码表示为
    // make the call, assign its return value to `returnVal`
    returnVal = someMethod(itsParameters);
    // do something useful with returnVal
    

    异步模型流程

    1. 定义方法,方法拿着异步调用的返回值并处理
    2. 将异步调用本身定义为Observable
    3. 观察者通过订阅(Subscribe)操作关联到Observable
    4. 继续业务逻辑处理
    • 代码表示为
      // defines, but does not invoke, the Subscriber's onNext handler
      // (in this example, the observer is very simple and has only an onNext handler)
      def myOnNext = { it -> do something useful with it };
      // defines, but does not invoke, the Observable
      def myObservable = someObservable(itsParameters);
      // subscribes the Subscriber to the Observable, and invokes the Observable
      myObservable.subscribe(myOnNext);
      // go on about my business
    

    回调方法

    • onNext(T item)
      • Observable调用这个方法发射数据,参数为Observable发射的数据,可以被调用多次
    • onError(Exception ex)
    • 当Observable遇到错误或者无返回期望值的数据时调用,会终止Observable,后续不会调用onNext和onComplete,参数为抛出异常
    • onComplete
    • 正常终止,如果没有遇到错误,Observable会在最后一次调用onNext之后调用此方法
    • 例子
     def myOnNext     = { item -> /* do something useful with item */ };
     def myError      = { throwable -> /* react sensibly to a failed call */ };
     def myComplete   = { /* clean up after the final response */ };
     def myObservable = someMethod(itsParameters);
     myObservable.subscribe(myOnNext, myError, myComplete);
     // go on about my business
    

    取消订阅

    特殊观察者接口Subscriber,取消订阅方法unsubscribe方法

    操作符列表

    • 创建操作
    • Create、Defer、Empty/Never/Throw、From、Interval、Just、Range、Repeat、Start、Timer
    • 变换操作
    • Buffer、FlatMap、GroupBy、Map、Scan、Window
    • 过滤操作
    • Debounce、Distinct、ElementAt、Filter、First、IgnoreElements、Last、Sample、Skip、SkipLast、Take、TakeLast
    • 组合操作
    • And/Then/When、CombineLatest、Join、Merge、StartWith、Switch、Zip
    • 错误处理
    • Catch、Retry
    • 辅助操作
    • Delay、Do、Materialize/Dematerialize、ObserveOn、Serialize、Subscribe、SubscribeOn、TimeInterval、TimeOut、Timestamp、Using
    • 条件和布尔操作
    • All、Amb、Contains、DefaultIfEmpty、SequenceEqual、SkipUntil、SkipWhile、TakeUntil、TakeWhile
    • 算数和集合操作
    • Average、Concat、Count、Max、Min、Reduce、Sum
    • 转换操作
    • To
    • 连接操作
    • Connect、Publish、RefCount、Replay
    • 反压操作
    • 用于增加特殊的流程控制策略的操作符

    操作符

    创建操作

    Create

    使用一个函数从头开始创建一个Observable

    • 示例
    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            try {
                if (!observer.isUnsubscribed()) {
                    for (int i = 1; i < 5; i++) {
                        observer.onNext(i);
                    }
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        }
     } ).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    
    Defer

    直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的

    • switchCase
      • 有条件的创建并返回一个可能的Observables集合中的一个
    Empty/Never/Throw
    • Empty
      • 创建一个不发射任何数据但是正常终止的Observable,实现为empty
    • Never
      • 创建一个不发射数据也不终止的Observable,实现为never
    • Throw
      • 创建一个不发射数据以一个错误终止的Observable,实现为error
    From

    将其他种类的对象和数据类型转换为Observable,包括Future、Iterable和数组

    • 示例
    Integer[] items = { 0, 1, 2, 3, 4, 5 };
    Observable myObservable = Observable.from(items);
    
    myObservable.subscribe(
        new Action1<Integer>() {
            @Override
            public void call(Integer item) {
                System.out.println(item);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable error) {
                System.out.println("Error encountered: " + error.getMessage());
            }
        },
        new Action0() {
            @Override
            public void call() {
                System.out.println("Sequence complete");
            }
        }
    );
    
    • RxJavaAsyncUtil
    • runAsync2
    • decode
    Interva

    创建一个按固定时间间隔发射整数序列的Observable

    just

    创建一个发射指定值的Observable

    • 示例
    Observable.just(1, 2, 3)
              .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    
    Range

    创建一个发射特定整数序列的Observable

    Repeat

    创建一个发射特定数据重复多次的Obserable

    • repeatWhen
      • 有条件的重新订阅和发射原来的Observable
    • doWhile
      • 在原始序列的每次重复后检查某个条件,如果满足条件才重复发射
    • whileDo
      • 在原始序列的每次重复前检查某个条件,如果满足条件才重复发射
    Start

    返回一个Observable,它发射一个类似于函数声明的值

    • 从运算结果中获取值的方法
    • functions、futures、actions、callables、runnables
    • toAsync
    • 对于函数(functions),这个操作符调用这个函数获取一个值,然后返回一个会发射这个值给后续观察者的Observable(和start一样)。对于动作(Action),过程类似,但是没有返回值,在这种情况下,这个操作符在终止前会发射一个null值。 注意:这个函数或动作只会被执行一次,即使多个观察者订阅这个返回的Observable。
    • startFuture
    • 参数为返回Future的函数,startFuture会立即调用这个函数返回Future对象,然后调用Future的get()方法尝试获取它的值。返回一个发送这个值给后续观察者的Observable
    • deferFuture
    • 与startFuture不同的是当有观察者订阅它返回的Observable时,才会立即调用Future的get()方法
    • fromFuture
    • 参数为action,返回Observable,一旦Action终止,发射传递给fromAction的数据
    • fromCallable
    • 参数为Callable,返回发射这个Callable的结果的Obserable
    • fromRunable
    • 参数为Runable,返回发射这个Runable的数据
    • forEachFuture
    • 返回一个Future并且在get()方法处阻塞,直到原始Observable执行完毕,然后返回,完成还是错误依赖于原始Observable是完成还是错误
    Timer

    创建一个Observable,在一个给定的延迟后发射一个特殊的值

    变换操作

    Buffer

    定期收集Observable的数据放进一个数据包裹,然后发送这些数据包裹,而不是一次发射一个值

    • 注意
      • 如果原来的Observable发射了一个onError通知,Buffer会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据
    • buffer(count)
      • 以列表形式发射非重叠的缓存,每一个缓存至多包含来自原始Observable的count项数据(最后发射的列表数据可能少于count项)
    • buffer(count,skip)
      • 从原始Observable第一项数据开始建立缓存,每当收到skip项数据,用count项数据填充缓存
    • buffer(bufferClosingSelector)
      • buffer(bufferClosingSelector)开始将数据收集到一个List,然后它调用bufferClosingSelector生成第二个Observable,当第二个Observable发射一个TClosing时,buffer发射当前的List,然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。它会一直这样做直到原来的Observable执行完成。
    • buffer(boundary)
      • 监视一个名叫boundary的Observable,每当这个Observable发射了一个值,它就创建一个新的list开始收集来自原始Observable的数据并发送原来的List
    • buffer(bufferOpenings,bufferClosingSelector)
      • 监视叫bufferOpenings的Observable(它发射BufferOpenings对象),每当bufferOpenings发射了一个数据时,它就创建一个新的List开始收集原始Observable的数据,并将bufferOpenings传递给closingSelector函数。这个函数返回一个Observable。buffer监视这个Observable,当它检测到一个来自这个Observable的数据时,就关闭List并且发射它自己的数据(之前的那个List)。
    • buffer(timespan, unit)
      • 定期以List的形式发射新的数据,每个时间段,收集来自原始Observable的数据(从前面一个数据包裹之后,或者如果是第一个数据包裹,从有观察者订阅原来的Observale之后开始)
    • buffer(timespan, unit, count)
      • 每当收到来自原始Observable的count项数据,或者每过了一段指定的时间后,buffer(timespan, unit, count)就以List的形式发射这期间的数据,即使数据项少于count项
    • buffer(timespan, timeshift, unit)
      • 在每一个timeshift时期内都创建一个新的List,然后用原始Observable发射的每一项数据填充这个列表(在把这个List当做自己的数据发射前,从创建时开始,直到过了timespan这么长的时间)。如果timespan长于timeshift,它发射的数据包将会重叠,因此可能包含重复的数据项。
    • buffer-backpressure
      • 使用Buffer操作符实现反压backpressure(意思是,处理这样一个Observable:它产生数据的速度可能比它的观察者消费数据的速度快)。
      • Buffer操作符可以将大量的数据序列缩减为较少的数据缓存序列,让它们更容易处理。例如,你可以按固定的时间间隔,定期关闭和发射来自一个爆发性Observable的数据缓存。这相当于一个缓冲区。
        • 示例代码
          // noinspection JSAnnotator
           Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
        
    FlatMap

    将一个发射数据的Observable变换为多个Observables,然后将发射的数据合并后放进一个单独的Observables

    • FlatMap对Observables发射的数据做的是合并操作,有可能是会交错的
    • FlatMapIterable
      • 成对的打包数据,然后生成Iterable而不是原始数据和生成的Observables,处理方式相同
    • concatMap
      • 按次序连接而不是合并生成的Observables,然后产生自己的数据序列
    • switchMap
      • 当原始Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视产生执之前的那个数据的Observable,只监视当前这一个
    • spit
      • 将一个发射字符串的Observable转换为另一个发射字符串的Observable,只不过,后者将原始的数据序列当做一个数据流,使用一个正则表达式边界分割它们,然后合并发射分割的结果。
    GroupBy

    将一个Observable分拆成一些Observables集合,他们中的每一个发射原始Observable的一个子序列

    • groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。
    Map

    对Observable发射的每一项数据应用一个函数,执行变换操作

    • cast
      • 将原始Observable发射的每一项数据都强制转换为一个指定的类型,然后发射数据。
    • encode
      • 将一个发射字符串的Observable变换为一个发射字节数组的Observable
    • byLine
      • 将一个发射字符串的Observable变换为一个按行发射来自原始Observable的字符串的Observable。
    Scan

    连续的对数据序列的每一项应用一个函数,然后连续发射结果

    • 示例代码
    Observable.just(1, 2, 3, 4, 5)
        .scan(new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer sum, Integer item) {
                return sum + item;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    
    Window

    个人不常用,暂无描述

    过滤操作

    DeBounce/throttleWithTimeout

    仅在过了一段指定的时间后还没发射数据时才发射一个数据

    • 会过滤掉发射速率过快的数据项
    Distinct

    过滤掉重复的数据项,只允许还没有发射过的数据项通过

    • 示例代码
        Observable.just(1, 2, 1, 1, 2, 3)
                  .distinct()
                  .subscribe(new Subscriber<Integer>() {
                @Override
                public void onNext(Integer item) {
                    System.out.println("Next: " + item);
                }
        
                @Override
                public void onError(Throwable error) {
                    System.err.println("Error: " + error.getMessage());
                }
        
                @Override
                public void onCompleted() {
                    System.out.println("Sequence complete.");
                }
            });
      
    Filter

    只发射通过测试的数据

    • 示例代码
      Observable.just(1, 2, 3, 4, 5)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer item) {
                      return( item < 4 );
                    }
                }).subscribe(new Subscriber<Integer>() {
              @Override
              public void onNext(Integer item) {
                  System.out.println("Next: " + item);
              }
      
              @Override
              public void onError(Throwable error) {
                  System.err.println("Error: " + error.getMessage());
              }
      
              @Override
              public void onCompleted() {
                  System.out.println("Sequence complete.");
              }
          });
      
    • ofType
      • 一个特殊形式。它过滤一个Observable只返回指定类型的数据

    结合操作

    And/Then/When

    使用Pattern和Plan作为中介,将两个或多个Observable发射的数据合并在一起

    Join

    任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一个数据,就结合两个Observable发射的数据

    Merge

    合并多个Observables的发射物

    • 示例代码
    Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
    Observable<Integer> evens = Observable.just(2, 4, 6);
    
    Observable.merge(odds, evens)
              .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    
    StartWitch

    在数据序列的开头插入一条指定的数据

    Switch

    将一个发射多个Obversables的Observable转换成另一个单独的Observerable,后者发射那些Observables最近发射的数据

    Zip

    通过一个函数将多个Observeables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

    错误处理

    Catch

    从onError通知中回复发射数据,拦截原始的Observable的onError同时,将它替换为其他的数据项或数据序列,让产生的Observable能够正常终止或者根本不停止

    Retry

    如果原始Observable遇到错误,重新订阅它期望它能正常终止

    辅助操作

    Delay

    延迟一段指定的时间再来发射来自Observable的发射物

    Do

    注册一个动作作为原始Observable生命周期时间的一种占位符

    ObserveOn

    指定一个观察者在哪个调度器上观察这个Observable

    Subscribe

    操作来自Observable的发射物和通知,是观察者与Observable之间的桥梁

    SubscribeOn

    指定Observable自身在哪个调度器上面执行

    TimeOut

    对原始Observable的一个镜像,超时会发错误通知

    展开全文
  • zen-observable, 一种面向Javascript的可见 observe一种面向Javascript的可见。 要求承诺或者承诺 polyfill 。安装npm install zen-observable用法import Observable from 'zen-obs
  • rxjs 的 observable 是什么?

    千次阅读 2021-01-28 09:49:32
    RxJS 是响应式编程 (reactive programming) 强大的工具,今天...如果你之前已经使用过 RxJS,并希望了解 Observable 及 Operators (操作符) 的内部工作原理,那么这篇文章非常适合你。 什么是 Observable Observab

    原文链接: https://segmentfault.com/a/1190000009924164

    RxJS 是响应式编程 (reactive programming) 强大的工具,今天我们将深入介绍 Observables 和 Observers 的内容,以及介绍如何创建自己的操作符 (operators)。

    如果你之前已经使用过 RxJS,并希望了解 Observable 及 Operators (操作符) 的内部工作原理,那么这篇文章非常适合你。

    什么是 Observable

    Observable 就是一个拥有以下特性的函数:

    • 它接收一个 observer 对象作为参数,该对象中包含 nexterror 和 complete 方法
    • 它返回一个函数,用于在销毁 Observable 时,执行清理操作

    在我们实现的示例中,我们将定义一个简单的 unsubscribe 函数来实现取消订阅的功能。然而在 RxJS 中,返回的是 Subcription 对象,该对象中包含一个 unsubscribe 方法。

    一个 Observable 对象设置观察者 (observer),并将它与生产者关联起来。该生产者可能是 DOM 元素产生的 click 或 input 事件,也可能是更复杂的事件,如 HTTP。

    为了更好地理解 Observable,我们来自定义 Observable。首先,我们先来看一个订阅的例子:

    const node = document.querySelector('input[type=text]');
    
    const input$ = Rx.Observable.fromEvent(node, 'input');
    
    input$.subscribe({
      next: (event) => console.log(`You just typed ${event.target.value}!`),
      error: (err) => console.log(`Oops... ${err}`),
      complete: () => console.log(`Complete!`)
    });
    

    该示例中,Rx.Observable.formEvent() 方法接收一个 input 元素和事件名作为参数,然后返回一个 $input Observable 对象。接下来我们使用 subscribe() 方法来定于该 Observable 对象。当触发 input 事件后,对应的值将会传递给 observer 对象。

    什么是 Observer

    Observer (观察者) 非常简单,在上面的示例中,观察者是一个普通的对象,该对象会作为 subscribe() 方法的参数。此外 subscribe(next, error, complete) 也是一个有效的语法,但在本文中我们将讨论对象字面量的形式。

    当 Observable 对象产生新值的时候,我们可以通过调用 next() 方法来通知对应的观察者。若出现异常,则会调用观察者的 error() 方法。当我们订阅 Observable 对象后,只要有新的值,都会通知对应的观察者。但在以下两种情况下,新的值不会再通知对应的观察者:

    • 已调用 observer 对象的 complete() 方法
    • 消费者对数据不再感兴趣,执行取消订阅操作

    此外在执行最终的 subscribe() 订阅操作前,我们传递的值可以经过一系列的链式处理操作。执行对应操作的东西叫操作符,每个操作符执行完后会返回一个新的 Observable 对象,然后继续我们的处理流程。

    什么是 Operator

    正如上面所说的,Observable 对象能够执行链式操作,具体如下所示:

    const input$ = Rx.Observable.fromEvent(node, 'input')
      .map(event => event.target.value)
      .filter(value => value.length >= 2)
      .subscribe(value => {
        // use the `value`
    });
    

    上面代码的执行流程如下:

    • 假设用户在输入框中输入字符 a
    • Observable 对象响应对应的 input 事件,然后把值传递给 observer
    • map() 操作符返回一个新的 Observable 对象
    • filter() 操作符执行过滤操作,然后又返回一个新的 Observable 对象
    • 最后我们通过调用 subscribe() 方法,来获取最终的值

    简而言之,Operator 就是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。

    自定义 Observable

    Observable 构造函数

    function Observable(subscribe) {
      this.subscribe = subscribe;
    }
    

    每个 subscribe 回调函数被赋值给 this.subscribe 属性,该回调函数将会被我们或其它 Observable 对象调用。

    Observer 示例

    在我们深入介绍前,我们先来看一个简单的示例。之前我们已经创建完 Observable 函数,现在我们可以调用我们的观察者 (observer),然后传递数值 1,然后订阅它:

    const one$ = new Observable((observer) => {
      observer.next(1);
      observer.complete();
    });
    
    one$.subscribe({
      next: (value) => console.log(value) // 1
    });
    

    即我们订阅我们创建的 Observable 实例,然后通过 subscribe() 方法调用通过构造函数设置的回调函数。

    Observable.fromEvent

    下面就是我们需要的基础结构,即在 Observable 对象上需要新增一个静态方法 fromEvent :

    Observable.fromEvent = (element, name) => { };
    

    接下来我们将参考 RxJS 为我们提供的方法来实现自定义的 fromEvent() 方法:

    const node = document.querySelector('input');
    const input$ = Observable.fromEvent(node, 'input');
    

    按照上面的使用方式,我们的 fromEvent() 方法需要接收两个参数,同时需要返回一个新的 Observable 对象,具体如下:

    Observable.fromEvent = (element, name) => {
      return new Observable((observer) => {
        
      });
    };
    

    接下来我们来实现事件监听功能:

    Observable.fromEvent = (element, name) => {
      return new Observable((observer) => {
        element.addEventListener(name, (event) => {}, false);
      });
    };
    

    那么我们的 observer 参数来自哪里? 其实 observer 对象就是包含 nexterror 和 complete 方法的对象字面量。

    需要注意的是,我们的 observer 参数不会被传递,直到 subscribe() 方法被调用。这意味着 addEventListener() 方法不会被调用,除非你订阅该 Observable 对象。

    当我们调用 subscribe() 方法,之前设置的 this.subscribe 回调函数会被调用,对应的参数是我们定义的 observer 对象字面量,接下来将使用新的值,作为 next() 方法的参数,调用该方法。

    很好,那接下来我们要做什么?之前版本我们只是设置了监听,但没有调用 observer 对象的 next() 方法,接下来让我们来修复这个问题:

    Observable.fromEvent = (element, name) => {
      return new Observable((observer) => {
        element.addEventListener(name, (event) => {
          observer.next(event);
        }, false);
      });
    };
    

    如你所知,当销毁 Observables 对象时,需要调用一个函数用来执行清理操作。针对目前的场景,在销毁时我们需要移除事件监听:

    Observable.fromEvent = (element, name) => {
      return new Observable((observer) => {
        const callback = (event) => observer.next(event);
        element.addEventListener(name, callback, false);
        return () => element.removeEventListener(name, callback, false);
      });
    };
    

    我们没有调用 complete() 方法,因为该 Observable 对象处理的 DOM 相关的事件,在时间维度上它们可能是无终止的。

    现在让我们来验证一下最终实现的功能:

    const node = document.querySelector('input');
    const p = document.querySelector('p');
    
    function Observable(subscribe) {
      this.subscribe = subscribe;
    }
    
    Observable.fromEvent = (element, name) => {
      return new Observable((observer) => {
        const callback = (event) => observer.next(event);
        element.addEventListener(name, callback, false);
        return () => element.removeEventListener(name, callback, false);
      });
    };
    
    const input$ = Observable.fromEvent(node, 'input');
    
    const unsubscribe = input$.subscribe({
      next: (event) => {
        p.innerHTML = event.target.value;
      }
    });
    
    // automatically unsub after 5ssetTimeout(unsubscribe, 5000);
    

    自定义操作符

    创建我们自己的操作符应该会更容易一些,现在我们了解 Observable 和 Observable 背后的概念。我们将在 Observable 的原型对象上添加一个方法:

    Observable.prototype.map = function (mapFn) { };
    

    该方法的功能与 JavaScript 中的 Array.prototype.map 方法类似:

    const input$ = Observable.fromEvent(node, 'input')
            .map(event => event.target.value);
    

    所以我们需要应用回调函数并调用它,这用于获取我们所需要的数据。在我们这样做之前,我们需要流中的最新值。这里是巧妙的部分,在 map() 操作符中,我们需要访问 Observable 实例。因为 map 方法在原型上,我们可以通过以下方式访问 Observable 实例:

    Observable.prototype.map = function (mapFn) {
      const input = this;
    };
    

    接下来我们在返回的 Observable 对象中执行 input 对象的订阅操作:

    Observable.prototype.map = function(mapFn) {
      const input = this;
      return new Observable((observer) => {
        return input.subscribe();
      });
    };
    

    我们返回了 input.subscribe() 方法执行的结果,因为当我们执行取消订阅操作时,将会依次调用每个 Observable 对象取消订阅的方法。

    最后我们来完善一下 map 操作符的内部代码:

    Observable.prototype.map = function (mapFn) {
      const input = this;
      return new Observable((observer) => {
        return input.subscribe({
          next: (value) => observer.next(mapFn(value)),
          error: (err) => observer.error(err),
          complete: () => observer.complete()
        });
      });
    };
    

    现在我们已经可以执行链式操作了:

    const input$ = Observable.fromEvent(node, 'input')
      .map(event => event.target.value);
    
    input$.subscribe({
      next: (value) => {
        p.innerHTML = value;
      }
    });
    

    我有话说

    Observable 与 Promise 有什么区别?

    Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

    Untitled

    • Promise
      • 返回单个值
      • 不可取消的
    • Observable
      • 随着时间的推移发出多个值
      • 可以取消的
      • 支持 map、filter、reduce 等操作符
      • 延迟执行,当订阅的时候才会开始执行

    什么是 SafeObserver ?

    上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:

    • 传入的 Observer 对象可以不实现所有规定的方法 (next、error、complete 方法)
    • 在 complete 或者 error 触发之后再调用 next 方法是没用的
    • 调用 unsubscribe 方法后,任何方法都不能再被调用了
    • complete 和 error 触发后,unsubscribe 也会自动调用
    • 当 nextcompleteerror 出现异常时,unsubscribe 也会自动调用以保证资源不会浪费
    • nextcompleteerror是可选的。按需处理即可,不必全部处理

    为了完成上述目标,我们得把传入的匿名 Observer 对象封装在一个 SafeObserver 里以提供上述保障。

    若想进一步了解详细信息,请参考 Observable详解 文章中 "自定义 Observable" 章节的内容。

    参考资源

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 63,928
精华内容 25,571
关键字:

observable