精华内容
下载资源
问答
  • ReactiveX

    2019-10-03 14:09:19
    The real power comes with the “reactive extensions” (hence “ReactiveX”) — operators that allow you to transform, combine, manipulate, and work with the sequences of items em...

    http://reactivex.io 

    The real power comes with the “reactive extensions” (hence “ReactiveX”) — operators that allow you to transform, combine, manipulate, and work with the sequences of items emitted by Observables.

    These Rx operators allow you to compose asynchronous sequences together in a declarative manner with all the efficiency benefits of callbacks but without the drawbacks of nesting callback handlers that are typically associated with asynchronous systems.

    转载于:https://www.cnblogs.com/feng9exe/p/9004650.html

    展开全文
  • ReactiveX

    2016-03-21 08:27:41
    ReactiveX http://reactivex.io/intro.html Rx介绍 ReactiveX的历史 ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meije...

    ReactiveX

    http://reactivex.io/intro.html

    Rx介绍

    ReactiveX的历史

    ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 reactivex.io

    什么是ReactiveX

    微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。

    ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

    ReactiveX的应用

    很多公司都在使用ReactiveX,例如Microsoft、Netflix、Github、Trello、SoundCloud。

    ReactiveX宣言

    ReactiveX不仅仅是一个编程接口,它是一种编程思想的突破,它影响了许多其它的程序库和框架以及编程语言。

    Rx模式

    使用观察者模式

    • 创建:Rx可以方便的创建事件流和数据流
    • 组合:Rx使用查询式的操作符组合和变换数据流
    • 监听:Rx可以订阅任何可观察的数据流并执行操作

    简化代码

    • 函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
    • 简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
    • 异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
    • 轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

    使用Observable的优势

    Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操作符,它让你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。

    Observable通过使用最佳的方式访问异步数据序列填补了这个间隙

    单个数据 多个数据
    同步 T getData()
    异步 Future<T> getData()

    Rx的Observable模型让你可以像使用集合数据一样操作异步事件流,对异步事件流使用各种简单、可组合的操作。

    Observable可组合

    对于单层的异步操作来说,Java中Future对象的处理方式是非常简单有效的,但是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,要想实现还是可以做到的,但是非常困难,或许你可以用Future.get(),但这样做,异步执行的优势就完全没有了。从另一方面说,Rx的Observable一开始就是为组合异步数据流准备的。

    Observable更灵活

    Rx的Observable不仅支持处理单独的标量值(就像Future可以做的),也支持数据序列,甚至是无穷的数据流。Observable是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的全部优雅与灵活。

    Observable是异步的双向push,Iterable是同步的单向pull,对比:

    事件 Iterable(pull) Observable(push)
    获取数据 T next() onNext(T)
    异常处理 throws Exception onError(Exception)
    任务完成 !hasNext() onCompleted()

    Observable无偏见

    Rx对于对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何满足你的需求的,你擅长或偏好的方式都可以。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码将所有与Observable的交互都当做是异步的。

    Observable是如何实现的?

    public Observable<data> getData();
    • 它能与调用者在同一线程同步执行吗?
    • 它能异步地在单独的线程执行吗?
    • 它会将工作分发到多个线程,返回数据的顺序是任意的吗?
    • 它使用Actor模式而不是线程池吗?
    • 它使用NIO和事件循环执行异步网络访问吗?
    • 它使用事件循环将工作线程从回调线程分离出来吗?

    从Observer的视角看,这些都无所谓,重要的是:使用Rx,你可以改变你的观念,你可以在完全不影响Observable程序库使用者的情况下,彻底的改变Observable的底层实现。

    使用回调存在很多问题

    回调在不阻塞任何事情的情况下,解决了Future.get()过早阻塞的问题。由于响应结果一旦就绪Callback就会被调用,它们天生就是高效率的。不过,就像使用Future一样,对于单层的异步执行来说,回调很容易使用,对于嵌套的异步组合,它们显得非常笨拙。

    Rx是一个多语言的实现

    Rx在大量的编程语言中都有实现,并尊重实现语言的风格,而且更多的实现正在飞速增加。

    响应式编程

    Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。

    你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好。使用Observable,在数据准备好时,生产者将数据推送给消费者。数据可以同步或异步的到达,这种方式更灵活。

    下面的例子展示了相似的高阶函数在Iterable和Observable上的应用

    // Iterable
    getDataFromLocalMemory()
      .skip(10)
      .take(5)
      .map({ s -> return s + " transformed" })
      .forEach({ println "next => " + it })
    
    // Observable
    getDataFromNetwork()
      .skip(10)
      .take(5)
      .map({ s -> return s + " transformed" })
      .subscribe({ println "onNext => " + it })

    Observable类型给GOF的观察者模式添加了两种缺少的语义,这样就和Iterable类型中可用的操作一致了:

    1. 生产者可以发信号给消费者,通知它没有更多数据可用了(对于Iterable,一个for循环正常完成表示没有数据了;对于Observable,就是调用观察者的onCompleted方法)
    2. 生产者可以发信号给消费者,通知它遇到了一个错误(对于Iterable,迭代过程中发生错误会抛出异常;对于Observable,就是调用观察者(Observer)的onError方法)

    有了这两种功能,Rx就能使Observable与Iterable保持一致了,唯一的不同是数据流的方向。任何对Iterable的操作,你都可以对Observable使用。

    名词定义

    这里给出一些名词的翻译

    • Reactive 直译为反应性的,有活性的,根据上下文一般翻译为反应式、响应式
    • Iterable 可迭代对象,支持以迭代器的形式遍历,许多语言中都存在这个概念
    • Observable 可观察对象,在Rx中定义为更强大的Iterable,在观察者模式中是被观察的对象,一旦数据产生或发生变化,会通过某种方式通知观察者或订阅者
    • Observer 观察者对象,监听Observable发射的数据并做出响应,Subscriber是它的一个特殊实现
    • emit 直译为发射,发布,发出,含义是Observable在数据产生或变化时发送通知给Observer,调用Observer对应的方法,文章里一律译为发射
    • items 直译为项目,条目,在Rx里是指Observable发射的数据项,文章里一律译为数据,数据项
    展开全文
  • ReactiveX简介

    千次阅读 2018-12-11 20:59:23
    一、ReactiveX简介 在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式。 ReactiveX的官网地址为: http://reactivex.io/ ReactiveX官网对于自身的介绍是: An API for ...

    一、ReactiveX简介

    在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式

    • ReactiveX的官网地址为:
     http://reactivex.io/
    

    ReactiveX官网对于自身的介绍是:

    An API for asynchronous programming with observable streams
    

    实质上我们可以对其解读为三部分:

    ReactiveX的解读
    ①An API: 首先它是个编程接口,不同语言提供不同实现。例如Java中的RxJava。
    ②For asynchronous programming: 在异步编程设计中使用。例如开启子线程处理耗时的网络请求。
    ③With observable streams: 基于可观察的事件流。例如观察者模式中观察者对被观察者的监听。

    而ReactiveX结合了如下三部分内容:

    1. 观察者模式,即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。
    2. Iterator模式,即迭代流式编程模式。
    3. 函数式编程模式,即提供一系列函数样式的方法供快速开发。

    Reactive的模式图如下:

    图1.1 ReactiveX的模式图

    二、RxJava的使用

    1、RxJava的优势

    在Android的SDK中,给开发者提供的用于异步操作的原生内容有AsyncTask和Handler。对于简单的异步请求来说,使用Android原生的AsyncTask和Handler即可满足需求,但是对于复杂的业务逻辑而言,依然使用AsyncTask和Handler会导致代码结构混乱,代码的可读性非常差。
    但是RxJava的异步操作是基于观察者模式实现的,在越来越复杂的业务逻辑中,RxJava依旧可以保持简洁

    2、RxJava的配置

    首先,在Android Studio中配置Module的build.gradle,在这里我们使用的版本是1.2版本,并且导入RxAndroid,辅助RxJava完成线程调度:

            implementation "io.reactivex:rxjava:1.2.0"
            implementation "io.reactivex:rxandroid:1.2.0"
    

    然后,RxJava基于观察者设计模式,其中的关键性三个步骤如下:

    (1)Observable被观察者

    Observable被观察者创建的代码如下:

            Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("Alex");
                    subscriber.onCompleted();
                }
            });
    

     

     

    在这里,要强调的是Observable被观察者是类类型,其中有诸多方法,我们关注其构造函数与创建Observable对象的方法,查看如下图对应的视图结构:

    图2.2.1 Observable被观察者对象的视图结构

    查看源码:

            protected Observable(OnSubscribe<T> f) {
               this.onSubscribe = f;
            }
    
            public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
            
            }
    
            public static <T> Observable<T> create(OnSubscribe<T> f) {
               return new Observable<T>(RxJavaHooks.onCreate(f));
            }
      
            public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
               return create((OnSubscribe<T>)syncOnSubscribe);
            }
    
            public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
               return create((OnSubscribe<T>)asyncOnSubscribe);
            }
    

    通过源码分析,可知Observable提供了create()方法来获取Observable实例对象。
    此外,除了基本的创建的方法,Observable还提供了便捷的创建Observable序列的两种方式,代码如下:

    • 第一种,会将参数逐个发送
            Observable<String> observable1 = Observable.just("Alex","Payne");
    
    • 第二种,会将数组元素逐个转换完毕后逐个发送
            String[] observableArr = {"Alex", "Payne"};
            Observable<String> observable2 = Observable.from(observableArr);
    

    其中Observable.just()方法会调用from()方法,详情可查看源码。

    (2)Observer观察者

    Observer观察者创建的代码如下:

            Observer<String> observer = new Observer<String>() {
    
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError,Error Info is:" + e.getMessage());
                }
    
                @Override
                public void onNext(String s) {
                    Log.e(TAG, s);
                }
            };
    

    Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下图所示Observer的视图结构:

    图2.2.2 Observer观察者对象的视图结构


    那么在RxJava中,Observer有其接口实现类对象Subscriber,它们的使用onNext、onCompleted、onError方法是一样的,但是Subscriber对于Observer接口进行了拓展,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,代码如下:

     

            Subscriber<String> subscriber = new Subscriber<String>() {
                @Override
                public void onStart() {
                    Log.e(TAG, "onStart");
                }
    
                @Override
                public void onCompleted() {
                    Log.e(TAG, "onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError,Error Info is:" + e.getMessage());
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.e(TAG, s);
                }
            };
    

    其中,onStart()方法会在事件未发送前被调用,可以用于订阅关系建立前的准备工作,例如将数据清空或者重置,在Subscriber中默认是空实现,我们可以在该方法中调用自己的业务逻辑代码。在如下的视图结构中我们可以看到Subscriber的拓展内容,重点是add()、unsubscribe()方法以及名为subscription的Subscription队列

    图2.2.3 Subscriber对象视图结构

     

    (3)Subscribe订阅关系

    Observable与observer形成订阅关系代码如下:

                observable.subscribe(observer);
                //或者
                observable.subscribe(subscriber);
    

    那么我们以observable.subscribe(observer)为例在这里继续查看源码,查看subscribe()方法到底做了什么:

     

    图2.3.1 Observable调用Subscribe将Observer转换为Subscriber对象

    Observer转换为Subscriber对象在这里得到印证。

    • 在之后的内容中统一以Subscriber作为订阅观察者对象

    继续深入,我们可以看到订阅关系中的关键步骤(仅核心代码):

                subscriber.onStart();
    
                RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    
                return RxJavaHooks.onObservableReturn(subscriber);
    

    在这里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等价于OnSubscribe.call(subscriber),见下图2.3.2:

    图2.3.2 RxJavaHooks.onObservableStart()转换为OnSubscribe

    在return RxJavaHooks.onObservableReturn(subscriber)这里等价于return subscription,见下图2.3.3:

    图2.3.3 RxJavaHooks.onObservableReturn()转换为Subscrition

     

    • 可以看到,subscriber() 做了3件事:
    1. 调用 Subscriber.onStart() 。该方法用于在订阅关系建立之前的准备。
    2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的内部接口,而事件发送的逻辑在这里开始运行。从这也可以看出,在 RxJava 中当 subscribe() 方法执行的时候订阅关系确立,Observable 开始发送事件。
    3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便后续调用unsubscribe()。

    三、RxJava的不完整回调

    1、不完整回调的代码示例

            Observable<String> observable = Observable.just("Alex", "Payne");
            Action1<String> onNextAction = new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.e(TAG, s);
                }
            };
            Action1<Throwable> onErrorAction = new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.e(TAG, "onError,Error Info is:" + throwable.getMessage());
                }
            };
            Action0 onCompletedAction = new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "onCompleted");
                }
            };
    
            // 根据onNextAction 来定义 onNext()
            observable.subscribe(onNextAction);
            // 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()
            observable.subscribe(onNextAction, onErrorAction);
            // 根据onNextAction 来定义 onNext()、根据onErrorAction 来定义 onError()、onCompletedAction 来定义 onCompleted()
            observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
    

    2、不完整回调的原理分析

    在这里我们可以看到:

    Action0无参数泛型无返回值类型,而Subscriber中的onCompleted()方法也没有参数泛型
    Action1有1个参数泛型无返回值类型 ,onNextAction设置的参数泛型为String,而Subscriber中的onNext()方法参数泛型也是String(和本文中观察者对象中的OnNext方法对比)
    Action1有1个参数泛型无返回值类型,onErrorAction设置的参数泛型为Throwable,而Subscriber中的onError()方法参数泛型也是Throwable

    那么,我们来查看observable.subscribe(onNextAction)的源码,在这里, Action1可以被当成一个包装对象,将onNext()方法进行包装作为不完整的回调传入到observable.subscribe()中

    图3.2.1 传入的onNextAction最终被包装成ActionSubscriber

     

    我们来看看Action1有何玄机,Action1的源码如下图所示:

    图3.2.2 Action1接口源码

    实质上,这种根据参数泛型的个数且无返回值类型的包装在RxJava中有多种如下图所示的体现,例如Action0的参数个数为0,Action1的参数个数为1以此类推:

    图3.2.3 根据参数泛型的个数且无返回值类型的包装

     

    四、RxJava的线程切换

    1、Scheduler线程调度器

    如果不指定线程,默认是在调用subscribe方法的线程上进行回调,那么如果子线程中调用subscibe方法,而想在主线程中进行UI更新,则会抛出异常。当然了RxJava已经帮我们考虑到了这一点,所以提供了Scheduler线程调度器帮助我们进行线程之间的切换。
    实质上,Scheduler线程调度器和RxJava的操作符有紧密关联,我将在下一篇文章中进行详细介绍。

    • RxJava内置了如下所示几个的线程调度器:
    1. Schedulers.immediate():在当前线程中执行
    2. Schedulers.newThread():启动新线程,在新线程中进行操作
    3. Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    4. Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    5. Schedulers.trampoline():会将任务按添加顺序依次放入当前线程中等待执行。线程一次只执行一个任务,其余任务排队等待,一个任务都执行完成后再开始下一个任务的执行。
    • 此外RxJava还提供了用于测试的调度器Schedulers.test() 及 可自定义Scheduler—-Schedulers.form() 。

    RxAndroid并且其为我们提供了AndroidSchedulers.mainThread()进行主线程的回调

    2、线程控制

    调用Observable对象的subscribeOn()、observeOn()方法即可完成线程控制。

    • subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
    • observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
                Observable.just("Alex", "Payne")
               .subscribeOn(Schedulers.io())//指定 subscribe() 所发生的线程
               .unsubscribeOn(Schedulers.io())//事件发送完毕后,及时取消发送
               .observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所运行在的线程
               .subscribe(new Action1<String>() {
                   @Override
                     public void call(String s) {
                         Log.e(TAG, s);
                    }
                });
    

    五、总结

    本文主要介绍了RxJava的由来、使用步骤、部分内容的原理解析。在下篇文章中我会详细介绍RxJava的操作符。希望本文对你在学习RxJava的路上有所启发。

    小礼物走一走,来简书关注我



    作者:Alex_Payne
    链接:https://www.jianshu.com/p/2d3d7c77dc92
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    展开全文
  • rxexamples:ReactiveX示例
  • ReactiveX 简介

    2019-03-25 12:29:51
    ReactiveX中,观察者订阅了一个Observable。然后,该观察者对Observable发出的任何项目或项目序列做出反应。 这种模式有利于并发操作,因为它不需要在等待Observable发出对象时阻塞,而是以观察者的形式创建一个...

    Observable

    在ReactiveX中,观察者订阅了一个Observable。然后,该观察者对Observable发出的任何项目或项目序列做出反应。

    这种模式有利于并发操作,因为它不需要在等待Observable发出对象时阻塞,而是以观察者的形式创建一个哨兵,随时准备在Observable所做的任何时候做出适当的反应。这个页面解释了反应模式是什么以及Observables和观察者是什么(以及观察者如何订阅Observables)。

    其他页面显示了如何使用各种Observable运算符将Observable链接在一起并更改其行为。本文档附有“大理石图表”的说明。以下是大理石图表如何表示Observables的Observables和转换:

    Background

    在许多软件编程任务中,您或多或少地期望您编写的指令将按照您编写的顺序逐个执行并逐步完成。但是在ReactiveX中,许多指令可以并行执行,其结果随后由“观察者”按任意顺序捕获。而不是调用方法,而是以“Observable”的形式定义检索和转换数据的机制。,“然后订阅一个观察者,此时,先前定义的机制开始行动,观察员站立哨兵,以便在准备好时捕获并响应其排放。

    这种方法的一个优点是,当你有一堆不依赖于彼此的任务时,你可以同时启动所有任务,而不是等到每个任务完成后再开始下一个 - 这样,你的整个捆绑任务只需要与捆绑中最长的任务一样长。

    有许多术语用于描述这种异步编程和设计模型。本文档将使用以下术语:观察者订阅Observable。Observable通过调用观察者的方法发出项目或向其观察者发送通知。

    在其他文件和其他背景下,我们所谓的“观察者”有时被称为“用户”,“观察者”或“反应堆”。这种模型通常被称为“反应堆模式”。

    Establishing Observers

    此页面使用类似Groovy的伪代码作为示例,但在许多语言中都有ReactiveX实现。

    在普通的方法调用中 - 也就是说,不是ReactiveX中典型的异步并行调用 - 流程是这样的:

    1. 调用方法。
    2. 将该方法的返回值存储在变量中。
    3. 使用该变量及其新值来做一些有用的事情。
    // make the call, assign its return value to `returnVal`
    returnVal = someMethod(itsParameters);
    // do something useful with returnVal

    在异步模型中,流程更像是:

    1. 定义一个方法,该方法对异步调用的返回值执行一些有用的操作;这种方法是观察者的一部分。
    2. 将异步调用本身定义为Observable。
    3. 通过订阅将观察者附加到该Observable(这也启动了Observable的操作)。
    4. 继续你的事业;每当调用返回时,观察者的方法将开始对其返回值或值进行操作 -  Observable发出的项。

    看起来像这样:

    // defines, but does not invoke, the Subscriber's onNext handler
    // (in this example, the observer is very simple and has only an onNext handler)
    def myOnNext = { it -> do something useful with it };
    // defines, but does not invoke, the Observable
    def myObservable = someObservable(itsParameters);
    // subscribes the Subscriber to the Observable, and invokes the Observable
    myObservable.subscribe(myOnNext);
    // go on about my business

    onNext, onCompleted, and onError

    Subscribe方法是将观察者连接到Observable的方法。您的观察者实现以下方法的某些子集:

    • onNext

    只要Observable发出一个项目,Observable就会调用此方法。此方法将Observable发出的项作为参数。

    • onError

    Observable调用此方法以指示它无法生成预期数据或遇到其他一些错误。它不会进一步调用onNext或onCompleted。onError方法将参数指示导致错误的原因。

    • onCompleted

    如果Observable在最后一次调用onNext,如果它没有遇到任何错误,则调用此方法。

    根据Observable合约的条款,它可以调用onNext零次或多次,然后可以通过调用onCompleted或onError而不是两者来跟随这些调用,这将是它的最后一次调用。按照惯例,在本文档中,对onNext的调用通常称为项目的“发射”,而对onCompleted或onError的调用则称为“通知”。

    更完整的订阅调用示例如下所示:

    def myOnNext     = { item -> /* do something useful with item */ };
    def myError      = { throwable -> /* react sensibly to a failed call */ };
    def myComplete   = { /* clean up after the final response */ };
    def myObservable = someMethod(itsParameters);
    myObservable.subscribe(myOnNext, myError, myComplete);
    // go on about my business

    Unsubscribing

    在一些ReactiveX实现中,有一个专门的观察​​者接口Subscriber,它实现了一个unsubscribe方法。您可以调用此方法来指示订阅服务器不再对其当前订阅的任何Observable感兴趣。那些Observable可以(如果他们没有其他感兴趣的观察者)选择停止生成要发出的新项目。

    此取消订阅的结果将通过适用于观察者订阅的Observable的运算符链级联,这将导致链中的每个链接停止发出项目。但是,这并不能保证立即发生,即使在没有观察者观察这些发射之后,Observable也有可能产生并尝试发射物品一段时间。

    Some Notes on Naming Conventions

    ReactiveX的每种语言特定实现都有自己的命名怪癖。虽然实现之间存在许多共性,但没有规范的命名标准。

    此外,这些名称中的一些在其他情境中具有不同的含义,或者在特定实现语言的习语中看起来很尴尬。例如,有onEvent命名模式(例如onNext,onCompleted,onError)。

    在某些上下文中,这些名称将指示通过哪些方法注册事件处理程序。但是,在ReactiveX中,它们自己命名事件处理程序。

    “Hot” and “Cold” Observables

    Observable何时开始发出其物品序列?这取决于Observable。“热”Observable可以在创建项目后立即开始发出项目,因此任何后来订阅该Observable的观察者都可以开始在中间某处观察序列。另一方面,“冷”Observable等待观察者在开始发射物品之前订阅它,因此这样的观察者保证从一开始就看到整个序列。

    在ReactiveX的一些实现中,还存在称为“可连接”可观察的东西。这样的Observable在调用Connect方法之前不会开始发出项目,无论是否有任何观察者都订阅了它。

    Composition via Observable Operators

    Observable和观察者只是ReactiveX的开始。

    它们本身只不过是标准观察者模式的轻微扩展,更适合处理一系列事件而不是单个回调。

    真正的力量来自“反应式扩展”(因此是“ReactiveX”) - 允许您转换,组合,操作和处理Observable发出的项目序列的运算符。

    这些Rx运算符允许您以声明方式组合异步序列,同时具有回调的所有效率优势,但没有嵌套通常与异步系统相关联的回调处理程序的缺点。

    本文档将有关各种运算符的信息及其用法示例分组到以下页面中:

    Creating Observables

    CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStart, and Timer

    Transforming Observable Items

    BufferFlatMapGroupByMapScan, and Window

    Filtering Observables

    DebounceDistinctElementAtFilterFirstIgnoreElementsLastSampleSkipSkipLastTake, and TakeLast

    Combining Observables

    And/Then/WhenCombineLatestJoinMergeStartWithSwitch, and Zip

    Error Handling Operators

    Catch and Retry

    Utility Operators

    DelayDoMaterialize/DematerializeObserveOnSerializeSubscribeSubscribeOn,TimeIntervalTimeoutTimestamp, and Using

    Conditional and Boolean Operators

    AllAmbContainsDefaultIfEmptySequenceEqualSkipUntilSkipWhileTakeUntil, and TakeWhile

    Mathematical and Aggregate Operators

    AverageConcatCountMaxMinReduce, and Sum

    Converting Observables

    To

    Connectable Observable Operators

    ConnectPublishRefCount, and Replay

    Backpressure Operators

    a variety of operators that enforce particular flow-control policies

    这些页面包含一些运算符的信息,这些运算符不属于ReactiveX的核心,而是在一个或多个特定于语言的实现和/或可选模块中实现。

    Chaining Operators

    大多数运算符都在Observable上运行并返回一个Observable。这允许您在链中一个接一个地应用这些运算符。链中的每个运算符都会修改由前一个运算符的运算产生的Observable。

    还有其他模式,如Builder模式,其中特定类的各种方法通过方法的操作修改该对象,对同一类的项进行操作。这些模式还允许您以类似的方式链接方法。

    但是在Builder模式中,方法在链中出现的顺序通常并不重要,因为Observable运算符的顺序很重要。一组Observable运算符不能在原始的Observable上独立运行,它发起链,但它们依次运行,每个运算符都由运算符在链中的前一个运算符生成。

    展开全文
  • reactivex
  • Add ReactiveX component

    2021-01-08 21:01:08
    <div><p>ReactiveX combines the Observer pattern with the Iterator pattern and functional programming with collections to fill the need for an ideal way of managing sequences of events.</p><p>该提问...
  • Who uses ReactiveX

    2021-01-03 21:03:47
    <div><p>On the frontpage, I'm planning to include a section "Who uses Rx" listing a few companies that use Rx in a significant way or as a core ...ReactiveX/reactivex.github.io</p></div>
  • Added reactiveX min operator

    2020-12-06 20:51:47
    <div><p>Added reactiveX min operator</p><p>该提问来源于开源项目:ReactiveX/RxPHP</p></div>
  • ReactiveX 操作符

    2019-10-12 15:39:58
    ReactiveX 操作符创建操作fromjustcreatedeferintervalrangetimerempty never error变换操作mapscanbufferwindowgroupByflatMap过滤操作结合操作joinmergeandstartWithcombineLatest辅助操作条件和布尔操作 ...
  • <div><p>The ReactiveX has an RxGo project which <a href="https://github.com/ReactiveX/RxGo/issues/2">doesn't have any code</a>. How about contributing this code there?</p><p>该提问来源于开源项目&#...
  • ReactiveX编程范式

    2017-01-04 00:31:00
    ReactiveX http://reactivex.io/ An API for asynchronous programmingwith observable streams The Observer pattern done right ReactiveX is a combination of the best ideas from the Observer pattern, ...
  • ReactiveX的Unity实现,响应式编程,能很优雅的完成一些功能的编写
  • ReactiveX-Observable

    2018-11-18 17:23:58
    (此篇文章翻译自ReactiveX的官方文档) 在ReactiveX中,Observer(观察者)订阅了一个Observable(观察对象)。那么这个观察者会对观察对象发生的任何单一的变化,或者是一连串的变化做出反应,也就是收到观察对象...
  • <div><p>How to add a new language of docs, such as Chinese? Just put the new markdown files to subfolders, such as <code>/cn</code>?</p><p>该提问来源于开源项目&#...ReactiveX/reactivex.github.io</p></div>
  • ReactiveX -Rx

    2019-10-02 15:37:44
    ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理...
  • ReactiveX网站 + 。 如果您想做出贡献,请确保对从其生成master分支的developer分支进行更改。 使用Docker进行构建和运行(可选) 如果已安装Docker,则可以使用此存储库中的Dockerfile来构建和运行ReactiveX网站...
  • ReactiveX Observable规范

    千次阅读 2016-12-16 15:08:31
    Observable规范(Observable Contract): ... “The Observable Contract”, 在ReactiveX的官方文档,网站以及源码注释上经常出现。是基于微软在2010发表的 Rx Design Guidelines(用于描述Rx.Net是如何实现Reactiv
  • ReactiveX-驯服的异步性 我在有关ReactiveX的演讲材料。 滑梯 幻灯片位于slides/目录中。 使用LaTeX的类创建。 main.tex文件包含LaTeX幻灯片源代码。 安装了LaTeX发行版, Beamer软件包和工具后,您可以通过发出...
  • ReactiveX-简介

    2018-05-27 00:17:00
    ReactiveX是一个API,它有很多实现。 Observable补充了异步遍历的空白 single items multiple items synchronous T getData() Iterable<T> getData() asynchronous Future<T> ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,546
精华内容 1,418
关键字:

reactivex