rxjava_rxjava2 - CSDN
rxjava 订阅
[1]  RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.译文RxJava - JVM响应式扩展Reactive Extensions 用于使用Java VM的可观察序列编写异步和基于事件的程序的库。 展开全文
[1]  RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.译文RxJava - JVM响应式扩展Reactive Extensions 用于使用Java VM的可观察序列编写异步和基于事件的程序的库。
信息
开发商
Netflix
软件授权
Apache License 2.0
软件名称
RxJava
更新时间
2017-06-1
软件版本
2.1.7
软件平台
Java
软件语言
Java
软件大小
2.74MB
putty优点
ReactiveXis a library for composing asynchronous and event-based programsby using observable sequences.It extendsthe observer patternto support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. [2]  译文:ReactiveX是一个通过使用可观察序列来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据和/或事件序列,并增加了运算符,使您可以声明性地组合序列,同时抽象出对低级线程,同步,线程安全性,并发数据结构和非线程等事物的关注阻塞I / O
收起全文
  • 适合所有初中级工程师,从RxJava的每个知识点进行讲解,并在每个知识点中都带有文字说明和代码例子说明。通过这次系统化的学习,您将可以完整的学习到RxJava家族的所有特性和常用的操作符,掌握操作符的使用,掌握...
  • 适合所有初中级工程师,从RxJava的每个知识点进行讲解,并在每个知识点中都带有文字说明和代码例子说明。通过这次系统化的学习,您将可以完整的学习到RxJava家族的所有特性和常用的操作符,掌握操作符的使用,掌握...
  • RXJava最全学习教程

    2018-08-12 17:41:19
    给对 RxJava 感兴趣的人一些入门的指引 2. 给正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析 二、为什么学RxJava RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and ...

    一、背景

    这篇文章的目的有两个: 1. 给对 RxJava 感兴趣的人一些入门的指引 2. 给正在使用 RxJava 但仍然心存疑惑的人一些更深入的解析

    二、为什么学RxJava

    RxJava 在 GitHub 主页上的自我介绍是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

    然而,对于初学者来说,这太难看懂了。因为它是一个『总结』,而初学者更需要一个『引言』。

    其实, RxJava 的本质可以压缩为异步这一个词。说到根上,它就是一个实现异步操作的库,而别的定语都是基于这之上的。

    RxJava 好在哪

    换句话说,『同样是做异步,为什么人们用它,而不用现成的 AsyncTask / Handler / XXX / … ?』

    一个词:简洁。

    异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

    这里写图片描述

    假设有这样一个需求:界面上有一个自定义的视图 imageCollectorView ,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组 File[] folders 中每个目录下的 png 图片都加载出来并显示在 imageCollectorView 中。需要注意的是,由于读取图片的这一过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。常用的实现方式有多种,我这里贴出其中一种:

    new Thread() {
        @Override
        public void run() {
            super.run();
            for (File folder : folders) {
                File[] files = folder.listFiles();
                for (File file : files) {
                    if (file.getName().endsWith(".png")) {
                        final Bitmap bitmap = getBitmapFromFile(file);
                        getActivity().runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                imageCollectorView.addImage(bitmap);
                            }
                        });
                    }
                }
            }
        }
    }.start();

    而如果使用 RxJava ,实现方式是这样的:

    Observable.from(folders)
        .flatMap(new Func1<File, Observable<File>>() {
            @Override
            public Observable<File> call(File file) {
                return Observable.from(file.listFiles());
            }
        })
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getName().endsWith(".png");
            }
        })
        .map(new Func1<File, Bitmap>() {
            @Override
            public Bitmap call(File file) {
                return getBitmapFromFile(file);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) {
                imageCollectorView.addImage(bitmap);
            }
        });

    那位说话了:『你这代码明明变多了啊!简洁个毛啊!』大兄弟你消消气,我说的是逻辑的简洁,不是单纯的代码量少(逻辑简洁才是提升读写代码速度的必杀技对不?)。观察一下你会发现, RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想如果还要求只选取前 10 张图片,常规方式要怎么办?如果有更多这样那样的要求呢?再试想,在这一大堆需求实现完两个月之后需要改功能,当你翻回这里看到自己当初写下的那一片迷之缩进,你能保证自己将迅速看懂,而不是对着代码重新捋一遍思路?)。

    另外,如果你的 IDE 是 Android Studio ,其实每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑:

    Observable.from(folders)
        .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
        .filter((Func1) (file) -> { file.getName().endsWith(".png") })
        .map((Func1) (file) -> { getBitmapFromFile(file) })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });

    有一段逻辑非常复杂,包含了多次内存操作、本地文件操作和网络操作,对象分分合合,线程间相互配合相互等待,一会儿排成人字,一会儿排成一字。如果使用常规的方法来实现,肯定是要写得欲仙欲死,然而在使用 RxJava 的情况下,依然只是一条链式调用就完成了。它很长,但很清晰。

    所以, RxJava 好在哪?就好在简洁,好在那把什么复杂逻辑都能穿成一条线的简洁。

    三、API 介绍和原理简析

    这个我就做不到一个词说明了……因为这一节的主要内容就是一步步地说明 RxJava 到底怎样做到了异步,怎样做到了简洁。

    1. 概念:扩展的观察者模式

    RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。

    观察者模式

    先简述一下观察者模式,已经熟悉的可以跳过这一段。

    观察者模式面向的需求是:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。程序的观察者模式和这种真正的『观察』略有不同,观察者不需要时刻盯着被观察者(例如 A 不需要每过 2ms 就检查一次 B 的状态),而是采用注册(Register)或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某某状态,你要在它变化的时候通知我。 Android 开发中一个比较典型的例子是点击监听器 OnClickListener 。对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。当然,这也得益于我们可以随意定制自己程序中的观察者和被观察者,而警察叔叔明显无法要求小偷『你在作案的时候务必通知我』。

    OnClickListener 的模式大致如下图:

    这里写图片描述

    如图所示,通过 setOnClickListener() 方法,Button 持有 OnClickListener 的引用(这一过程没有在图上画出);当用户点击时,Button 自动调用 OnClickListener 的 onClick() 方法。另外,如果把这张图中的概念抽象出来(Button -> 被观察者、OnClickListener -> 观察者、setOnClickListener() -> 订阅,onClick() -> 事件),就由专用的观察者模式(例如只用于监听控件点击)转变成了通用的观察者模式。如下图:

    这里写图片描述

    而 RxJava 作为一个工具库,使用的就是通用形式的观察者模式。

    RxJava 的观察者模式

    RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

    与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

    onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

    onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

    在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    RxJava 的观察者模式大致如下图:

    这里写图片描述

    四、基本实现

    基于以上的概念, RxJava 的基本实现主要有三点:

    1) 创建 Observer

    Observer 即观察者,它决定事件触发的时候将有怎样的行为。 RxJava 中的 Observer 接口的实现方式:

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };

    除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的:

    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };

    不仅基本使用方式一样,实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

    onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

    unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。

    创建 Observable

    Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });

    可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用三次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

    create() 方法是 RxJava 最基本的创造事件序列的方法。基于这个方法, RxJava 还提供了一些方法用来快捷创建事件队列,例如:

    just(T…): 将传入的参数依次发送出来。

    Observable observable = Observable.just("Hello", "Hi", "Aloha");
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    String[] words = {"Hello", "Hi", "Aloha"};
    Observable observable = Observable.from(words);
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();

    上面 just(T…) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等价的。

    Subscribe (订阅)

    创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

    Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码)

    // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }

    可以看到,subscriber() 做了3件事:

    调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。

    调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。

    将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

    整个过程中对象间的关系如下图:

    可以看到,subscriber() 做了3件事:

    调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。

    调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。

    将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe().

    整个过程中对象间的关系如下图:

    这里写图片描述

    这里写图片描述

    除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 还支持不完整定义的回调,RxJava 会自动根据定义创建出 Subscriber 。形式如下:

    Action1<String> onNextAction = new Action1<String>() {
        // onNext()
        @Override
        public void call(String s) {
            Log.d(tag, s);
        }
    };
    Action1<Throwable> onErrorAction = new Action1<Throwable>() {
        // onError()
        @Override
        public void call(Throwable throwable) {
            // Error handling
        }
    };
    Action0 onCompletedAction = new Action0() {
        // onCompleted()
        @Override
        public void call() {
            Log.d(tag, "completed");
        }
    };
    
    // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
    observable.subscribe(onNextAction);
    // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
    observable.subscribe(onNextAction, onErrorAction);
    // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
    observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

    简单解释一下这段代码中出现的 Action1 和 Action0。 Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的;由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调。这样其实也可以看做将 onCompleted() 方法作为参数传进了 subscribe(),相当于其他某些语言中的『闭包』。 Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1 可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调。事实上,虽然 Action0 和 Action1 在 API 中使用最广泛,但 RxJava 是提供了多个 ActionX 形式的接口 (例如 Action2, Action3) 的,它们可以被用以包装不同的无返回值的方法。

    场景示例

    下面举两个例子:

    a. 打印字符串数组

    将字符串数组 names 中的所有字符串依次打印出来

       String[] names = {"a", "b"};
            Observable.from(names).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                Log.d(tag, name);
                }
            });

    由 id 取得图片并显示

    由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错

      Observable.create(new Observable.OnSubscribe<Bitmap>() {
                @Override
                public void call(Subscriber<? super Bitmap> subscriber) {
                    Drawable drawable = getTheme().getDrawable(drawableRes));
                    subscriber.onNext(drawable);
                    subscriber.onCompleted();
    
                }
            }).subscribe(new Subscriber<Bitmap>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Bitmap s) {
    
                }
            });

    正如上面两个例子这样,创建出 Observable 和 Subscriber ,再用 subscribe() 将它们串起来,一次 RxJava 的基本使用就完成了。非常简单。

    这里写图片描述

    在 RxJava 的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。而要实现异步,则需要用到 RxJava 的另一个概念: Scheduler 。

    五、线程控制 —— Scheduler (一)

    在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

    Scheduler 的 API (一)

    在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

    Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

    Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

    Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

    Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

    另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

    文字叙述总归难理解,上代码:

    Observable.just(1, 2, 3, 4)
        .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
        .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer number) {
                Log.d(tag, "number:" + number);
            }
        });

    上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

    而前面提到的由图片 id 取得图片并显示的例子,如果也加上这两句:

    int drawableRes = ...;
    ImageView imageView = ...;
    Observable.create(new OnSubscribe<Drawable>() {
        @Override
        public void call(Subscriber<? super Drawable> subscriber) {
            Drawable drawable = getTheme().getDrawable(drawableRes));
            subscriber.onNext(drawable);
            subscriber.onCompleted();
        }
    })
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Observer<Drawable>() {
        @Override
        public void onNext(Drawable drawable) {
            imageView.setImageDrawable(drawable);
        }
    
        @Override
        public void onCompleted() {
        }
    
        @Override
        public void onError(Throwable e) {
            Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
        }
    });

    那么,加载图片将会发生在 IO 线程,而设置图片则被设定在了主线程。这就意味着,即使加载图片耗费了几十甚至几百毫秒的时间,也不会造成丝毫界面的卡顿。

    Scheduler 的原理 (一)

    RxJava 的 Scheduler API 很方便,也很神奇(加了一句话就把线程切换了,怎么做到的?而且 subscribe() 不是最外层直接调用的方法吗,它竟然也能被指定线程?)。然而 Scheduler 的原理需要放在后面讲,因为它的原理是以下一节《变换》的原理作为基础的。

    好吧这一节其实我屁也没说,只是为了让你安心,让你知道我不是忘了讲原理,而是把它放在了更合适的地方。

    六、变换

    终于要到牛逼的地方了,不管你激动不激动,反正我是激动了。

    RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因。所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 API。

    Observable.just("images/logo.png") // 输入类型 String
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) { // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) { // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });
    

    这里出现了一个叫做 Func1 的类。它和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于, Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

    可以看到,map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回,而在经过 map() 方法后,事件的参数类型也由 String 转为了 Bitmap。这种直接变换对象并返回的,是最常见的也最容易理解的变换。不过 RxJava 的变换远不止这样,它不仅可以针对事件对象,还可以针对整个事件队列,这使得 RxJava 变得非常灵活。我列举几个常用的变换:

    map(): 事件对象的直接变换,具体功能上面已经介绍过。它是 RxJava 最常用的变换。 map() 的示意图

    这里写图片描述

    flatMap(): 这是一个很有用但非常难理解的变换,因此我决定花多些篇幅来介绍它。 首先假设这么一种需求:假设有一个数据结构『学生』,现在需要打印出一组学生的名字。实现方式很简单:

    Student[] students = ...;
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            Log.d(tag, name);
        }
        ...
    };
    Observable.from(students)
        .map(new Func1<Student, String>() {
            @Override
            public String call(Student student) {
                return student.getName();
            }
        })
        .subscribe(subscriber);
    

    很简单。那么再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程。)首先可以这样实现:

    Student[] students = ...;
    Subscriber<Student> subscriber = new Subscriber<Student>() {
        @Override
        public void onNext(Student student) {
            List<Course> courses = student.getCourses();
            for (int i = 0; i < courses.size(); i++) {
                Course course = courses.get(i);
                Log.d(tag, course.getName());
            }
        }
        ...
    };
    Observable.from(students)
        .subscribe(subscriber);
    

    依然很简单。那么如果我不想在 Subscriber 中使用 for 循环,而是希望 Subscriber 中直接传入单个的 Course 对象呢(这对于代码复用很重要)?用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?

    这个时候,就需要用 flatMap() 了:

    Student[] students = ...;
    Subscriber<Course> subscriber = new Subscriber<Course>() {
        @Override
        public void onNext(Course course) {
            Log.d(tag, course.getName());
        }
        ...
    };
    Observable.from(students)
        .flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(subscriber);
    

    从上面的代码可以看出, flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。但需要注意,和 map() 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中。 flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat。

    flatMap() 示意图:

    这里写图片描述

    扩展:由于可以在嵌套的 Observable 中添加异步代码, flatMap() 也常用于嵌套的异步操作,例如嵌套的网络请求。示例代码(Retrofit + RxJava):

    networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
        .flatMap(new Func1<String, Observable<Messages>>() {
            @Override
            public Observable<Messages> call(String token) {
                // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
                return networkClient.messages();
            }
        })
        .subscribe(new Action1<Messages>() {
            @Override
            public void call(Messages messages) {
                // 处理显示消息列表
                showMessages(messages);
            }
        });
    

    传统的嵌套请求需要使用嵌套的 Callback 来实现。而通过 flatMap() ,可以把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

    throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器: RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。

    此外, RxJava 还提供很多便捷的方法来实现事件序列的变换,这里就不一一举例了。

    变换的原理:lift()

    这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

    // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber);
            }
        });
    }
    

    这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)——

    subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。

    当含有 lift() 时:

    1.lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;

    2.而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;

    3.当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;

    4.而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。

    这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。

    精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。

    如果你更喜欢具象思维,可以看图:

    这里写图片描述

    或者可以看动图

    这里写图片描述

    两次和多次的 lift() 同理,如下图:

    这里写图片描述

    举一个具体的 Operator 的实现。下面这是一个将事件中的 Integer 对象转换成 String 的例子,仅供参考:

    observable.lift(new Observable.Operator<String, Integer>() {
        @Override
        public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
            // 将事件序列中的 Integer 对象转换为 String 对象
            return new Subscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    subscriber.onNext("" + integer);
                }
    
                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }
    
                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }
            };
        }
    });
    

    讲述 lift() 的原理只是为了让你更好地了解 RxJava ,从而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map() flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。

    compose: 对 Observable 整体的变换

    除了 lift() 之外, Observable 还有一个变换方法叫做 compose(Transformer)。它和 lift() 的区别在于, lift() 是针对事件项和事件序列的,而 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。你可以这么写:

    observable1
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber1);
    observable2
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber2);
    observable3
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber3);
    observable4
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber1);

    你觉得这样太不软件工程了,于是你改成了这样:

    private Observable liftAll(Observable observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
    ...
    liftAll(observable1).subscribe(subscriber1);
    liftAll(observable2).subscribe(subscriber2);
    liftAll(observable3).subscribe(subscriber3);
    liftAll(observable4).subscribe(subscriber4);

    可读性、可维护性都提高了。可是 Observable 被一个方法包起来,这种方式对于 Observale 的灵活性似乎还是增添了那么点限制。怎么办?这个时候,就应该用 compose() 来解决了:

    public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
        @Override
        public Observable<String> call(Observable<Integer> observable) {
            return observable
                .lift1()
                .lift2()
                .lift3()
                .lift4();
        }
    }
    ...
    Transformer liftAll = new LiftAllTransformer();
    observable1.compose(liftAll).subscribe(subscriber1);
    observable2.compose(liftAll).subscribe(subscriber2);
    observable3.compose(liftAll).subscribe(subscriber3);
    observable4.compose(liftAll).subscribe(subscriber4);

    像上面这样,使用 compose() 方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必被包在方法的里面了。

    compose() 的原理比较简单,不附图喽。

    线程控制:Scheduler (二)

    除了灵活的变换,RxJava 另一个牛逼的地方,就是线程的自由控制。

    1) Scheduler 的 API (二)

    前面讲到了,可以利用 subscribeOn() 结合 observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。可是在了解了 map() flatMap() 等变换方法后,有些好事的(其实就是当初刚接触 RxJava 时的我)就问了:能不能多切换几次线程?

    答案是:能。因为 observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 并不是(严格说应该为『不一定是』,但这里不妨理解为『不是』)subscribe() 参数中的 Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber 。换句话说,observeOn() 指定的是它之后的操作所在的线程。因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可。上代码:

    Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .map(mapOperator) // 新线程,由 observeOn() 指定
        .observeOn(Schedulers.io())
        .map(mapOperator2) // IO 线程,由 observeOn() 指定
        .observeOn(AndroidSchedulers.mainThread) 
        .subscribe(subscriber);  // Android 主线程,由 observeOn() 指定

    如上,通过 observeOn() 的多次调用,程序实现了线程的多次切换。

    不过,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

    又有好事的(其实还是当初的我)问了:如果我非要调用多次 subscribeOn() 呢?会有什么效果?

    这个问题先放着,我们还是从 RxJava 线程控制的原理说起吧。

    2) Scheduler 的原理(二)

    其实, subscribeOn() 和 observeOn() 的内部实现,也是用的 lift()。具体看图(不同颜色的箭头表示不同的线程):

    subscribeOn() 原理图:

    这里写图片描述
    observeOn() 原理图:
    这里写图片描述

    从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

    最后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整):

    延伸:doOnSubscribe()

    然而,虽然超过一个的 subscribeOn() 对事件处理的流程没有影响,但在流程之前却是可以利用的。

    在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化。然而 onStart() 由于在 subscribe() 发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe() 被调用时的线程。这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测 subscribe() 将会在什么线程执行。

    而与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

    示例代码:

    Observable.create(onSubscribe)
        .subscribeOn(Schedulers.io())
        .doOnSubscribe(new Action0() {
            @Override
            public void call() {
                progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
            }
        })
        .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subscriber);

    如上,在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

    RxJava 的适用场景和使用方式

    1. 与 Retrofit 的结合

    Retrofit 是 Square 的一个著名的网络请求库。没有用过 Retrofit 的可以选择跳过这一小节也没关系,我举的每种场景都只是个例子,而且例子之间并无前后关联,只是个抛砖引玉的作用,所以你跳过这里看别的场景也可以的。

    Retrofit 除了提供了传统的 Callback 形式的 API,还有 RxJava 版本的 Observable 形式 API。下面我用对比的方式来介绍 Retrofit 的 RxJava 版 API 和传统版本的区别。

    以获取一个 User 对象的接口作为例子。使用Retrofit 的传统 API,你可以用这样的方式来定义请求:

    @GET("/user")
    public void getUser(@Query("userId") String userId, Callback<User> callback);

    在程序的构建过程中, Retrofit 会把自动把方法实现并生成代码,然后开发者就可以利用下面的方法来获取特定用户并处理响应:

    getUser(userId, new Callback<User>() {
        @Override
        public void success(User user) {
            userView.setUser(user);
        }
    
        @Override
        public void failure(RetrofitError error) {
            // Error handling
            ...
        }
    };

    而使用 RxJava 形式的 API,定义同样的请求是这样的:

    @GET("/user")
    public Observable<User> getUser(@Query("userId") String userId);

    使用的时候是这样的:

    getUser(userId)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<User>() {
            @Override
            public void onNext(User user) {
                userView.setUser(user);
            }
    
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable error) {
                // Error handling
                ...
            }
        });

    看到区别了吗?

    当 RxJava 形式的时候,Retrofit 把请求封装进 Observable ,在请求结束后调用 onNext() 或在请求失败后调用 onError()。

    对比来看, Callback 形式和 Observable 形式长得不太一样,但本质都差不多,而且在细节上 Observable 形式似乎还比 Callback 形式要差点。那 Retrofit 为什么还要提供 RxJava 的支持呢?

    因为它好用啊!从这个例子看不出来是因为这只是最简单的情况。而一旦情景复杂起来, Callback 形式马上就会开始让人头疼。比如:

    假设这么一种情况:你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。使用 Callback 方式大概可以这么写:

    getUser(userId, new Callback<User>() {
        @Override
        public void success(User user) {
            processUser(user); // 尝试修正 User 数据
            userView.setUser(user);
        }
    
        @Override
        public void failure(RetrofitError error) {
            // Error handling
            ...
        }
    };

    有问题吗?

    很简便,但不要这样做。为什么?因为这样做会影响性能。数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。所以为了提升性能,这段代码可以优化一下

    getUser(userId, new Callback<User>() {
        @Override
        public void success(User user) {
            new Thread() {
                @Override
                public void run() {
                    processUser(user); // 尝试修正 User 数据
                    runOnUiThread(new Runnable() { // 切回 UI 线程
                        @Override
                        public void run() {
                            userView.setUser(user);
                        }
                    });
                }).start();
        }
    
        @Override
        public void failure(RetrofitError error) {
            // Error handling
            ...
        }
    };

    性能问题解决,但……这代码实在是太乱了,迷之缩进啊!杂乱的代码往往不仅仅是美观问题,因为代码越乱往往就越难读懂,而如果项目中充斥着杂乱的代码,无疑会降低代码的可读性,造成团队开发效率的降低和出错率的升高。

    这时候,如果用 RxJava 的形式,就好办多了。 RxJava 形式的代码是这样的:

    getUser(userId)
        .doOnNext(new Action1<User>() {
            @Override
            public void call(User user) {
                processUser(user);
            })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<User>() {
            @Override
            public void onNext(User user) {
                userView.setUser(user);
            }
    
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable error) {
                // Error handling
                ...
            }
        });

    后台代码和前台代码全都写在一条链中,明显清晰了很多。

    再举一个例子:假设 /user 接口并不能直接访问,而需要填入一个在线获取的 token ,代码应该怎么写?

    Callback 方式,可以使用嵌套的 Callback

    @GET("/token")
    public void getToken(Callback<String> callback);
    
    @GET("/user")
    public void getUser(@Query("token") String token, @Query("userId") String userId, Callback<User> callback);
    

    getToken(new Callback() {
    @Override
    public void success(String token) {
    getUser(token, userId, new Callback() {
    @Override
    public void success(User user) {
    userView.setUser(user);
    }

            @Override
            public void failure(RetrofitError error) {
                // Error handling
                ...
            }
        };
    }
    
    @Override
    public void failure(RetrofitError error) {
        // Error handling
        ...
    }
    

    });

    ###倒是没有什么性能问题,可是迷之缩进毁一生,你懂我也懂,做过大项目的人应该更懂。
    ###而使用 RxJava 的话,代码是这样的:

    @GET(“/token”)
    public Observable getToken();

    @GET(“/user”)
    public Observable getUser(@Query(“token”) String token, @Query(“userId”) String userId);

    
    getToken()
        .flatMap(new Func1<String, Observable<User>>() {
            @Override
            public Observable<User> onNext(String token) {
                return getUser(token, userId);
            })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<User>() {
            @Override
            public void onNext(User user) {
                userView.setUser(user);
            }
    
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable error) {
                // Error handling
                ...
            }
        });

    各种异步操作

    前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。

    RxBus

    RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用 Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto ,目前为止没有不良反应。

    七、最后

    对于 Android 开发者来说, RxJava 是一个很难上手的库,因为它对于 Android 开发者来说有太多陌生的概念了。可是它真的很牛逼。因此,我写了这篇《RXJava最全学习教程》,希望能给始终搞不明白什么是 RxJava 的人一些入门的指引,或者能让正在使用 RxJava 但仍然心存疑惑的人看到一些更深入的解析。无论如何,只要能给各位同为 Android 工程师的你们提供一些帮助,这篇文章的目的就达到了。

    展开全文
  • 适合所有初中级工程师,从RxJava的每个知识点进行讲解,并在每个知识点中都带有文字说明和代码例子说明。通过这次系统化的学习,您将可以完整的学习到RxJava家族的所有特性和常用的操作符,掌握操作符的使用,掌握...
  • 介绍RxJava(RxAndroid)基础入门知识,帮助入门上手RxJava(RxAndroid)库。

    前言

      RxAndroid是RxJava在Android上的一个扩展,大牛JakeWharton的项目。据说和Retorfit、OkHttp组合起来使用,效果不是一般的好。而且用它似乎可以完全替代eventBus和OTTO,这么牛的东西当然要研究研究了 ,看看它到底有多厉害。

    正文

    相关资源

      RxJava的GitHub地址:https://github.com/ReactiveX/RxJava
      RxAndroid的GitHub地址:https://github.com/ReactiveX/RxAndroid
      中文文档:https://mcxiaoke.gitbooks.io/rxdocs/content/
      一篇写的比较好的入门RxJava的文章的地址:http://gank.io/post/560e15be2dca930e00da1083

    1.RxJava是干嘛的

      Rx(Reactive Extensions)是一个库,用来处理事件和异步任务,在很多语言上都有实现,RxJava是Rx在Java上的实现。简单来说,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过Obserable和Observer的机制,实现所谓响应式的编程体验。
      Android的童鞋都知道,处理异步事件,现有的AsyncTask、Handler,不错的第三方事件总线EventBus、OTTO等等都可以处理。并且大部分童鞋应该都很熟练了。而且经我目前的学习来看,RxJava这个库,上手确实有门槛,不是拿来就能用。但是作为一个猿,那些可能出现的优秀的框架技术,及时的跟进和学习是必要的,从中汲取营养才能帮助自己成长。况且有童鞋已经表示,它完全可以替代EventBus和OTTO,来看看吧。

    2.RxJava的优势

      最概括的两个字:简洁。而且当业务越繁琐越复杂时这一点就越显出优势——它能够保持简洁。
      简单的demo看不出来,真正投入项目使用了应该就有体会了。它提供的各种功能强悍的操作符真的很强大。

    3.基本使用流程

      这里只介绍Android Studio的接入方式,如果你还在用Eclipse的话,我建议你换了。
      配置buile.gradle:(以下为当前最新版本,如有更新请到上述GitHub链接查看更新)

    dependencies {
      compile 'io.reactivex:rxandroid:1.2.1'
      compile 'io.reactivex:rxjava:1.1.6'
      }

      配置完之后就可以使用RxJava的API了。介绍两个个关键的类:
      (1)Observable (2)Subscriber 即:被观察者(Observable)和观察者(Subscriber),其实我觉得叫发布者和订阅者更好理解一些,但大家都叫被观察者和观察者。
      主干的使用过程就是1.创建被观察者。2.创建观察者。3.将二者建立联系。完毕。然后被观察中发出信息触发观察者的动作,执行相应的方法,就这样。你先别急着吐槽它很平庸。它的强大在于这个过程中提供的各种操作变换的技巧会让你可以简洁的处理相当繁琐的代码逻辑。
    先看一个简单的demo:

            //创建一个被观察者(发布者)
            Observable observable = Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1001);
                    subscriber.onNext(1002);
                    subscriber.onNext(1003);
                    subscriber.onCompleted();
                }
            });
    
            //创建一个观察者
            Subscriber<Integer> subscriber = new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    Log.d(TAG, "onCompleted.. ");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "subscriber onError.. " + e.getMessage());
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext.. integer:" + integer);
                }
            };
    
            //注册观察者(这个方法名看起来有点怪,还不如写成regisiterSubscriber(..)或者干脆addSubscriber(..))
            //注册后就会开始调用call()中的观察者执行的方法 onNext() onCompleted()等
            observable.subscribe(subscriber);

      上面的例子中,当Observable发射数据时,会依次调用Subscriber的onNext()方法,将发射的数据作为参数传给onNext(),如果出错,则会调用Subscriber的onError()方法,完成所有数据发射后,调用onCompleted()方法,整个过程完毕。
      但是,subcribe()方法默认在当前线程被调用。所以,这样使用的话,被观察者和观察者的所有的动作都是在同一个线程完成的,没卵用…
      但是当然肯定不会就这个程度了,RxJava有两个方法可以很方便的指定观察者和被观察者代码运行的线程,RxAndroid还有一个扩展,可以指定在UI线程运行。你懂的!
    如下:

    //设置观察者和发布者代码所要运行的线程后注册观察者
    observable.subscribeOn(Schedulers.immediate())//在当前线程执行subscribe()方法
    .observeOn(AndroidSchedulers.mainThread())//在UI线程执行观察者的方法
    .subscribe(subscriber);

      通过Scheduler作为参数来指定代码运行的线程,非常方便,好用到不行…其他常用的参数还有:
      Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。 
      Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
      Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免建不必要的线程。
      Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
      另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    4.Observable创建方式

      以上介绍了主干使用流程,从这里我们往细一点再看。前文说了,RxJava的强大之处在于它的各种操作符。在创建Observable对象的方式上,同样有很多方便的操作符的实现,上面是通过Observable.create()方法创建的observable对象,这里介绍其他几个常用的方法。
      
      通过from创建Observable:

         //Teacher为一个数据Bean,包含姓名,年龄,住址三个字段
         List<Teacher> teachers = new ArrayList<>();
            for (int i = 0; i < 4; i++) {
                teachers.add(new Teacher("name" + i, i, "place" + i));
            }
            //from方法支持继承了Interable接口的参数,所以常用的数据结构(Map、List..)都可以转换
            Observable fromObservale = Observable.from(teachers);
            fromObservale.subscribe(new Subscriber<Teacher>() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "from(teachers)  onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "from(teachers)  " + e.getMessage());
                }
    
                @Override
                public void onNext(Teacher teacher) {
                    //依次接收到teachers中的对象
                    Log.d(TAG, "from(teachers)  onNext:" + teacher.toString());
                }
            });

      用from方法创建Observable,可以传入一个数组,或者一个继承了Iterable的类的对象作为参数,也就是说,java中常用的数据结构如List、Map等都可以直接作为参数传入from()方法用以构建Observable。这样,当Observable发射数据时,它将会依次把序列中的元素依次发射出来。
      
      通过just创建Observable:  

    //Just类似于From,但是From会将数组或Iterable的元素具取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。
    //Just接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable
    Observable justObservable = Observable.just(1, "someThing", false, 3.256f, new Teacher("Jhon", 25, "NewYork"));
    justObservable.subscribe(new Subscriber() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "just(...)  onCompleted");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "just(...)  onError:" + e.getMessage());
        }
    
        @Override
        public void onNext(Object o) {
            Log.d(TAG, "just(...)  onNext:" + o.toString());
        }
    });

      just直接接收object作为参数,原样发射出来,也是非常方便的。
      
    通过timer创建Observable:  

    //timer()创建一个Observable,它在一个给定的延迟后发射一个特殊的值 设定执行方法在UI线程执行
    //延时两秒后发射值
    //实测 延时2s后发送了一个0
    Observable timerObservable = Observable.timer(2, TimeUnit.SECONDS, AndroidSchedulers.mainThread());
       timerObservable.subscribe(
               new Subscriber() {
                   @Override
                   public void onCompleted() {
                       Log.i(TAG, "timer(...)  onCompleted");
                       refreshStr("timer(...)  onCompleted\n");
                   }
    
                   @Override
                   public void onError(Throwable e) {
                       Log.e(TAG, "timer(...)  onError:" + e.getMessage());
                       refreshStr("timer(...)  onError:" + e.getMessage());
                   }
    
                   @Override
                   public void onNext(Object o) {
                       Log.d(TAG, "timer(...)  onNext:" + o.toString());
                       refreshStr("timerObservable 延时两秒触发 发送值:" + o.toString());
                   }
               }
       );

      timer有定时的作用,延时发送一个值0。
      
    通过range创建Observable(这里叠加使用一个repeat方法):  

    //range 发射从n到m的整数序列 可以指定Scheduler设置执行方法运行的线程
    //repeat方法可以指定重复触发的次数
    Observable rangeObservable = Observable.range(3, 7).repeat(2);
    rangeObservable.subscribe(
    //在不写观察者的情况下,可以使用Action1和Action0这两个接口来实现不完整定义的回调; 参见:ActionSubscriber
    //Action1<T>可以代替实现onNext(); Action1<Throwable>可以代替实现onError(); Action0可以代替实现onConplete()
            new Action1() {
                @Override
                public void call(Object o) {
                    Log.e(TAG, "range(3, 7).repeat(2)  onNext:"+o.toString());
                }
            },
            new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.e(TAG, "range(3, 7).repeat(2)  "+throwable.getMessage());
                }
            },
            new Action0() {
                @Override
                public void call() {
                    Log.i(TAG, "range(3, 7).repeat(2)  onCompleted");
                }
            });

      range发射从n到m的整数序列,repeat可以指定重复次数,以上发射的次序为:3,4,5,6,7,3,4,5,6,7。这里用到的Action0和Action1是两个可以替代Subscriber的接口,具体可以参见相关文档和源码实现,这里不深入介绍。
      其他还有Interval、Defer、Start等方法就不一一介绍了,本文主要是帮助初次接触的童鞋入门,RxJava的操作符非常丰富,这里很难一一说明,更多的内容要还需要大家自己去熟悉和探究。

    5.变换操作

      除了多样的Observable创建方式,RxJava还有一个神奇的操作就是变换。通过自己定义的方法,你可以将输入的值变换成另一种类型再输出(比如输入url,输出bitmap),单一变换、批量变换、甚至实现双重变换,嵌套两重异步操作!并且代码格式一如既往的干净平整。是不是很牛?
      
      使用map()方法做转换:

    Runnable run = new Runnable() {
         @Override
         public void run() {
             //将文件路径转换为bitmap发出 观察者直接收到bitmap进行处理
             Observable observable = Observable.just(imgFilePath);
             observable.map(new Func1<String, Bitmap>() {
                 @Override
                 public Bitmap call(String imgFilePath) {
                     return getBitmapFromAssets(imgFilePath);
                 }
             }).subscribeOn(Schedulers.immediate())//当前线程(子线程)发布
                     .observeOn(AndroidSchedulers.mainThread())//UI线程执行(更新图片)
                     .subscribe(new Subscriber<Bitmap>() {
                         @Override
                         public void onCompleted() {
                             Log.i(TAG, "observable.map(..)  onCompleted");
                         }
    
                         @Override
                         public void onError(Throwable e) {
                             Log.i(TAG, "observable.map(..)  onError" + e.getMessage());
                         }
    
                         @Override
                         public void onNext(Bitmap bitmap) {
                             //显示图片
                             iv.setImageBitmap(bitmap);
                         }
                     });
         }
     };
     new Thread(run).start();

      map()方法是最基本的变换操作,这里只变换了一个数据,将文件路径解析为Bitmap显示出来。你当然也可以多传入几个参数或者用from操作符传入一个数组或者集合等,批量操作,并且同时指定代码运行的线程。而且这些所有的操作都可以在一条链式代码中全部完成,易读易维护。你是不是已经有一点体会到它的威力了?
      
      flatMap()实现双重变换
      flatMap()将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。即:第一次转换时,它依次将输入的数据转换成一个Observable,然后将这些Observable发射的数据集中到一个Observable里依次发射出来。觉得莫名其妙?来看一个实际例子:

    Subscriber subscriber = new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            Log.i(TAG,"Observable.just(array1,array2).flatMap  onCompleted\n\n");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.e(TAG,"Observable.just(array1,array2).flatMap   onError  "+e.getMessage());
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG,"Observable.just(array1,array2).flatMap  integer = "+integer);
        }
    };
    
    //flatMap可以实现一个双重转换,在它的回调方法中会返回一个observable对象,但它并不会直接发射这个对象
    //而是将这个observable对象要发射的值 集中到一个新的observable对象中依次发射
    //如本例,第一层Observable依次发射两个数组,经过flatmap转换之后,变成变成两个依次发射数组元素的observable
    // 最后在subscriber中接收到的直接是整型数,等于将两个数组"铺开"了,直接发射整数,这就是大概地"flat"的含义吧
    // flatMap方法可以很灵活的使用,实现双重变换,满足很多不同情况下的需求,比如处理嵌套的异步代码等,非常棒!
    Integer[] array1 = {1, 2, 3, 4}, array2 = {5, 6, 7, 8};
    Observable.just(array1,array2).flatMap(new Func1<Integer[], Observable<?>>() {
        @Override
        public Observable<?> call(Integer[] ints) {
            Observable observable = Observable.from(ints);
            return observable;
        }
    }).subscribe(subscriber);

      这里flatMap()方法将最初传入的两个数组在第一次变换时,通过from操作符变换成两个Observable,然后在将这两个Observable发射的数据全部集中到一个新的Observable中集中发射,等于将两个数组”铺开”了,依次发射出来它们的元素。具体转换的方法由你指定,使用的方式是比较灵活的。
      比如有的商城类应用的需求:先要拿到某类别的一个产品列表,然后列表中有具体产品展示图片的url,需要你拿到产品列表信息后依次去请求图片,成功后更新到UI页面上,使用flatMap,你肯定知道怎么写了吧,是不是比CallBack跳来跳去的舒服一些?

      scan()变换
      scan操作符对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
      当看到这里的时候,我已经由衷的在感叹,这些操作符实在太TM丰富了,然后对它的强大已经开始有所体会和感悟了。这种产生斐波那契数列的操作都给封装进去了,而且函数由你自定义,你能用它做成什么,可能在灵感到来之前你自己都想不到。
      demo代码:

    //scan 会将输入的第一个元素当作参数做一个函数运算(函数由你实现,规定需要两个参数,此时另一个默认没有),然后发射结果
    // 同时,运算结果会被当作函数的与第二个参数与第二个元素再进行函数运算,完成后发射结果
    // 然后将这个结果与第三个元素作为函数的参数再次运算...直到最后一个元素
    Observable.just(1, 2, 3, 4).scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            //integer是第一个元素或上一次计算的结果,integer2是下一轮运算中新的序列中元素
            Log.d(TAG, "scan call   integer:" + integer + "   integer2:" + integer2);
            return integer + integer2;
        }
    }).subscribe(new Subscriber<Integer>() {
        @Override
        public void onCompleted() {
            Log.i(TAG, "Observable.just(1,2,3,4).scan   onCompleted..");
            initViewState();
        }
    
        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "Observable.just(1,2,3,4).scan  onError  " + e.getMessage());
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "Observable.just(1,2,3,4).scan  onNext()..  integer = " + integer);
            /**
             * 第一次为1,然后是3(1+2),6(3+3),10(6+4)
             */
        }
    });

      注释和上面的说明都很清晰了,就不再赘述。同样,关于转换操作,也还有很多其他的操作符,如wiindow() buffer() 等已实现的方法,具体参见文档吧。

    6.过滤、结合操作

      在文档的分类中,还有两片基础API是过滤和结合的操作符,例如:Filter、Skip、Take、Merage、Zip等等,本来打算一起列举的,但是想想其实如果熟悉了上面的内容,这两块相关的API上手其实也很容易了。如果入门目的已经达到,再讲这个显得有点啰嗦。所以略去,如果以后有心得,在开篇另讲那些操作符的使用,RxJava的知识,主要还是要靠自己多熟悉,多研究。

    尾声

      本文主要希望帮助初次接触RxJava的童鞋入门,讲的一些基础知识,RxJava太大,内容太丰富,入门之后需要下的功夫也不少,希望大家都能day day up!
      最后,如发现内容有误,请斧正!
      非常感谢!

    展开全文
  • RXJAVA的使用总结

    2018-09-13 15:11:47
    前些日子在项目中引入了RXJAVA,也算是初步入门了RXJAVA的使用。使用起来感觉还是挺方便的,唯一需要注意的就是线程的切换以及调用的方法是否是在该线程应该使用的。如UI相关操作一定要在主线程中。 一、什么是...

    前些日子在项目中引入了RXJAVA,也算是初步入门了RXJAVA的使用。使用起来感觉还是挺方便的,唯一需要注意的就是线程的切换以及调用的方法是否是在该线程应该使用的。如UI相关操作一定要在主线程中。

    一、什么是RXJAVA

    RXJAVA是一个库,用来支持我们需求里需要异步操作的地方。它比起handler等异步操作的实现方式来说,显得更为简洁。把整个操作整合成一条流水线,从上游到下游都能够看的清。

    二、RXJAVA的原理

    RXJAVA的实现,是一种扩展式的观察者模式。

    RXJAVA中有四种概念。observable(被观察者),observer(观察者),subscribe(订阅),事件。Observable和Observer通过subscribe来实现订阅关系。与传统的观察者模式不同,除了onNext事件外,rxjava还提供了onCompleted和onError。当不再有onNext事件发送时,将以onCompleted事件作为结束。当处理过程中出现异常时,会触发onError,同时队列自动终止,不允许再有事件发出。onCompleted和onError在一个序列中有且只有一个,二者互斥,只能出现一个。

    说了这么多,还是要看源码,看一看源码中,到底是如何实现的。由于本人的项目中使用的是rxjava2,所以源码就按照rxjava2来解析了。

    作为事件的发送方,我们需要一个被观察者来发送事件,被观察者是Observale类,使用它的create方法来创建一个实例。在create方法中,只有一个参数,ObservableOnSubscribe<T> ,可以看到这个参数接收泛型,实际上这个T就是我们发送事件时候所要传递的内容。

    跟进之后发现ObservableOnSubscribe是一个接口,里面只有一个需要实现的方法,就是subscribe方法,这个方法也是只有一个参数,ObservableEmitter<T>,继续跟进,看到Observable继承自Emitter<T>,增加了setDisposable等方法,而最后来到Emitter,看到它里面有我们熟悉的onNext,onCompleted以及onError方法。

    在结合一个创建的常规操作。

    至此,我想大家应该都能看明白了。在create方法调用时,我们需要定义subscribe方法被调用时应该做些什么。而这些内容又是由emitter来实现的,如发送哪些事件。而这个subscribe方法看名字大概猜得出是在订阅时被触发。让我们往后看是不是这样。

    被观察者Observable还有一个方法,是subscribe。这个方法只接收一个参数,Observer<? super T>,不过这个方法里面相对复杂,我们一步一步看。

    首先,先检验传入的观察者是否为空,为空则报错。之后用try尝试对observer进行转换。我们知道rxjava中有一些操作符,就是对observer进行转换操作的,这里调用了RxJavaPlugins里的onSubscribe方法获取到一个转换后的observer。之后又对这个转换后的observer进行了为空校验。最后subscribeActual方法,传入参数是这个转换后的observer。但是跟进后发现subscribeActual是一个抽象方法,那么我们就得去寻找一个它是在哪里实现的。

    找了半天,发现刚才只分析了create的参数,但是没有分析其方法。所以还得回到create方法里。

    我们看到create方法最后一步return,返回了一个RxJavaPlugins里的onAssembly方法,里面的参数是一个ObserverableCreate方法。看名字应该能猜到这是实例化Observable的地方了。让我们追进去看看。

    onAssemble方法会把传入的参数进行func处理,之后还是返回传入的参数,也就是最终获取的就是ObservableCreate方法生成的内容。

    跟进ObservableCreate方法后感觉一切就明朗了。先放一张图。

    有木有!刚才我们所希望找到的subscribeActual终于出现了。这个ObservableCteate继承自Observable,并且实现了它的抽象方法subscribeActual。让我们捋一捋逻辑。Observerable调用了create方法最终调用了ObservableCreate方法来获取一个实例,这个实例实现了抽象方法subscribeActual,在这个实例调用subscribe方法时,会调用subscribeActual方法。也就是订阅的那一刻,开始发送事件了。

    发送事件的内容,就在我们创建的ObservableOnSubscribe的subscribe方法里。现在还剩下一点,发送的事件,是如何做到接收的。这就要看看subscribeActual里的前两行代码了。

    先看看CreateEmitter。

    CreateEmitter继承自ObservableEmitter,并且构造方法接收一个参数,观察者observer。之后把这个观察者自己持有。之后在自己的onNext方法里调用观察者的onNext。看到这是不是差不多都明白了。emitter调用onNext事件时,会调用其内部持有的观察者的onNext事件。也就完成了上游发送事件,下游接收并处理事件的任务。接下来还有一点,就是observer的onSubscribe方法,看看这里是干什么的。

    跟进代码,我们看到onSubscribe方法传进来一个参数Disposable,这个东西实际上是用于拦截的。我们看到CreateEmitter实现了Disposable,并且observer的onSubscribe的方法传进去的就是一个CreateEmitter。而Disposable需要实现两个方法。一个是dispose。一个是isDisposed。这两个方法一个是注册拦截,一个是获取拦截状态。

    可以看到这两个方法底层都调用了DisposableHelper类,这是一个查看拦截状态的帮助类。当调用dispose时,就会把这个CreateEmitter注册进去,之后看到调用CreateEmitter的onNext等方法时都会调用isDisposed方法来判断这个发射器是否已经被注册拦截状态了,如果已经被注册,则不再执行后续操作。

    好了,到这里,一套发送事件接收事件的流程就梳理完了。让我们简单总结一下每个部分的功能。

    被观察者的创建,Observerable调用create方法,传入参数是发送事件的内容。create方法最终调用ObservableCreate方法获取一个实例,实际上是调用了ObservableCteate的构造方法。而ObservableCteate实现了Observable里的抽象方法subscribeActual,这个方法是在Observable的subscribe方法执行时调用,也就是订阅时调用。在subscribeActual方法里,调用CreateEmitter方法生成一个发射器,这个发射器以观察者作为参数,实现了Dispoasble接口和ObservableEmitter接口。CreateEmitter持有观察者,并在自己的onNext等事件里调用观察者的onNext等事件,从而实现发送接收处理。在调用观察者的onNext等事件前,会先判断拦截状态。如果拦截则不做任何操作。我们可以在观察者的onSubscribe方法里获取到发射器(发射器本身实现了拦截接口Disposable),可以持有它,并在后续onNext等事件里决定是否开启拦截,以阻止后续操作。

    三、RXJAVA如何实现轻松切换线程的

    我们使用RXJAVA的一个比较重要的原因就是它切换线程及其方便。那么还是从源码里看一看,RXJAVA中的切换线程操作是如何实现的。切换线程这部分其实要讲的还挺多的,源码部分内容也不少,后期写一篇专门的文章来总结。本篇主要是总结自己项目中的使用方式。

    四、自己如何使用RXJAVA的(网络请求封装)

    终于来到重点了。在理解了RXJAVA轻松切换线程的情况下,我们可以在上面大做文章了。网络请求的处理是我们开发过程中需要切换线程比较频繁的地方。通常耗时操作网络请求都会放在子线程中处理,而根据返回结果对UI进行的操作要放在主线程中进行。原始的方法就是在主线程中开启一个子线程,发起网络请求。之后在网络请求回调函数中在post到主线程。也有借助handler来实现的。通常这些方式实现之后,在读代码找关系逻辑的时候真的很绕,需要跳来跳去的。但是有了RXJAVA之后一切就不一样了,把请求和响应全在一条线上搞定。

    首先还是来回顾一下RXJAVA的工作流程。Observable创建,添加发送事件内容,选择上游线程,选择下游线程,以Observer为参数订阅,Observer决定响应处理。

    在这个流程中,其实我们大部分的操作已经可以实现了,选择上游线程就是新建一个子线程,选择下游线程就是回归到主线程,Observer的响应事件就是在主线程对UI操作的事件。现在唯一的问题就是上游事件如何发送以及发送的时机。

    其实在这里最关键的封装就是把网络请求的回调方法放到上游事件发送里,把请求的响应结果作为信号发送,让下游处理。(如果处理的内容比较多,操作复杂,可以处理完再发送。我们的原则是在UI线程里做较少的操作。)那么这一步是如何实现的呢。

    建造一个请求帮助类。这个帮助类提供一个发送请求方法,这个发送请求方法接收一个参数,这个参数是网络请求响应的监听接口。在上游调用请求方法,同时new一个监听接口作为参数传递给请求方法。这个监听接口实现的方法,其内容可以调用上游的发射器,将想要发送的内容在上游发送出去。

    简单点说,就是让网络请求接口的响应函数,在上游发送事件。这个操作是在上游子线程完成的。

    经过这样的封装,我们就实现了异步网络请求结果通知主线程操作。

    后续的处理都和平常一样了,值得注意的是由于现在都放在一串代码中执行了,所以很容易忽略两边的操作是处在不同线程的。建议在命名方法时,标注出此方法是在哪种线程中使用的,这样代码的逻辑更加清晰。

    展开全文
  • ReactiveX详解 RxJava2基础 RxAndroid ReactiveX(Reactive Extensions),一般简写为Rx,是一个使用可观察数据流进行异步编程的编程接口。由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供...

    简介

    什么是ReactiveX

    ReactiveX: An API for asynchronous programming with observable streams.

    ReactiveX(Reactive Extensions),一般简写为Rx,是一个使用可观察数据流进行异步编程的编程接口。由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便地编写异步和基于事件的程序,现在已经有了RxJava、RxJS、Rx.NET、RxScala、RxClojure、RxSwift等基本所有主流语言的实现,受到越来越多的开发者的欢迎,开源社区也普遍在使用。
    ReactiveX不仅仅是一个API,它是一种思想、一种编程突破,它影响了许多其它的API、框架甚至编程语言。

    为什么要使用ReactiveX

    无论你开发app还是后台应用,你总会时不时地编写一些异步或基于事件的代码,但你会发现很难完美地处理工作线程和主线程的切换、异常的处理、线程的取消、线程同步等等问题,而且在多个线程协调处理业务逻辑时代码结构变得异常的复杂而且还容易出错。
    使用Rx,你可以:

    函数式编程
    对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
    精简代码
    Rx的操作符经常可以将复杂的难题简化为很少的几行代码
    更好地处理异步错误
    传统的try/catch对于异步计算过程中的错误无能为力,但Rx提供了很好的错误处理机制
    轻松处理并发
    Rx的Observable和Scheduler让开发者可以摆脱底层的线程同步和各种并发问题

    也就是说,ReactiveX的Observable模型让你对异步事件流的处理就像平时对数据集(如数组)进行简单、可组合的处理一样,让你从混乱的回调中解脱出来,写出更高可读性、更不容易产生Bug的代码。
    Rx结合了观察者模式、迭代器模式和函数式编程的优秀思想。Rx扩展观察者模式以支持数据/事件序列,添加了一些操作符以使你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO 。

    Observable是可组合的
    传统的并发处理中,Java Futures技术对于简单的无嵌套异步操作是简单高效的,如benjchristensen/FuturesA.java。但是一旦涉及到嵌套那么代码复杂度将有一个非常可观的增加,如benjchristensen/FuturesB.java,一个带条件的异步任务需要等待另一个异步任务拿到的数据才能工作,而这就可能会产生一些其他的异步任务被阻塞的问题。
    使用Future很难正确地组合多个带条件的异步流(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,如果你对并发编程非常精通还是可以做到的,但是整个代码逻辑也会变得异常复杂且很容易出错,或者过早地阻塞在Future.get(),这样的话异步执行的优势就完全没有了。或许你会考虑使用回调来解决Future.get()过早阻塞的问题,但是对于嵌套的异步任务来说,使用回调同样会使代码变得混乱,如benjchristensen/CallbackB.java。Rx的Observable从另一方面讲就是为了组合异步数据流准备的,使用RX你可以非常方便地组合多个Observable。
    Observable是灵活的
    Rx的Observable不但可以像Futures一样发射单个值,还可以发射数据序列甚至无穷数据流。也就是说无论数据是怎样的都可以抽象为一个Observable。
    Observable拥有Iterable所有灵活、优雅的特点。Iterable是同步的单向pull,Observable是异步的双向push,即Iterable是需要手动遍历获取数据的,Observable是主动发射数据的;Iterable通过T next()方法获取数据,Observable通过onNext(T)发射数据;Iterable遇到错误会抛出异常(throws Exception),Observable遇到错误会调用onError(Exception)方法;Iterable可以利用!hasNext()方法判断是否完成,Observable通过调用onCompleted()方法表示完成。
    Observable是不固执的
    Rx对于一些特定并发或异步源是没有偏见的,Observable可以用任何方式实现,如线程池、事件循环、非阻塞IO、Actor模式,或者任何满足你的需求、偏好或擅长技术的实现。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码都会将所有与Observable的交互当做是异步的。

    RxJava/RxAndroid

    RxJava是JVM上的ReactiveX实现 – 一个在 Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。
    RxJava目前版本是2.1.0,2.x版本相对于1.x版本有了很大的改动,甚至可以说是重写,而1.x版本截止到2017年6月1号不会增加新的操作符,仅仅修复Bug,截止到2018年3月31号完全停止开发维护。所以1.x版本的RxJava就不再关注了。
    RxAndroid是Android上的ReactiveX实现–Android上的RxJava bindings。
    RxAndroid添加尽可能少的类到RxJava以确保在Android应用中写响应式组件能够简单省事,尤其是它提供的调度器可以很好地调度主线程和任意Lopper线程。

    术语

    信号(Signal): 作名词时表示onSubscribe, onNext, onComplete, onError, request(n)cancel方法的触发信号,作动词时表示触发一个信号。
    请求(Demand): 作名词时表示由Publisher尚未交付(履行)的Subscriber请求的元素的总数。作动词时表示请求更多的元素的行为。
    同步(Synchronous): 在调用线程中执行。
    正常返回(Return normally): 永远只向调用者返回一个声明类型的值, 向Subscriber发失败信号的唯一合法方法是通过onError方法。
    背压(Backpressure): Observable发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息的场景。

    添加依赖

    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.1.0'

    几个比较重要的类

    Publisher
    /**
     * 一个可能无限数量的序列元素的提供者,根据从其Subscriber收到的请求发送它们。
     * `Publisher`可以在不同的时间点动态地服务多个通过`subscribe(Subscriber)`方法订阅的Subscriber。
     *
     * @param <T> 信号元素类型.
     */
    public interface Publisher<T> {
    
        /**
         * 请求Publisher开始数据流.
         * 这是一个可以被多次调用的"工厂方法", 但每次都会开始一个新的Subscription.
         * 每个Subscription将只会为一个Subscriber工作.
         * 一个Subscriber应该只能向一个Publisher订阅一次.
         * 如果Publisher拒绝了订阅尝试或者订阅失败了,它将通过Subscriber#onError发error信号
         *
         * @param s 将消费来自Publisher的信号的Subscriber
         */
        public void subscribe(Subscriber<? super T> s);
    }

    调用Publisher.subscribe(Subscriber)Subscriber的响应可能是的这样的方法调用序列:

    onSubscribe onNext* (onError | onComplete)?

    也就是说,如果Subscription没被cancel的话,onSubscribe信号将总会被触发,之后根据Subscriber的请求可能有多个onNext信号,之后如果出错的话会有一个onError信号,如果没有元素的话会有一个onComplete信号。
    ReactiveX中最重要的概念就是流(stream),任何东西都可以看作是流,最常见的就是网络数据流、UI事件流,产生stream的通常叫作被观察者(Observable)/生产者,而监听、处理stream的通常叫做观察者(Observer)/消费者,两者的关联通常叫作订阅(Subscribe)。
    stream
    上面说的Publisher就是产生事件的事件源,而RxJava根据不同使用情况提供了几个事件源的类,其中Flowable就是Publisher的实现类:

    • io.reactivex.Flowable: 0..N个流,支持Reactive-Streams和背压(backpressure)
    • io.reactivex.Observable: 0..N个流,不支持背压(backpressure)
    • io.reactivex.Single: 一个只包含一个item或error的流
    • io.reactivex.Completable: 一个不包含任何item只包含completion或error信号的流
    • io.reactivex.Maybe: 一个只包含一个maybe value或者error的流

    首先来看一下Flowable

    import io.reactivex.functions.Consumer;
    
    Flowable.just("Hello world")
      .subscribe(new Consumer<String>() {
          @Override public void accept(String s) {
              System.out.println(s);
          }
      });

    Flowable是为了更好地处理背压问题而新设计的类以弥补Observable的不足,也就是说,Observable不支持背压处理,一旦未及时处理的事件数累积到一定程度就会产生MissingBackpressureException或者OutOfMemoryError
    如果发射事件的Observable(上游)和接收处理事件的Observer(下游)工作在不同线程,就可能会出现发射事件的速度与处理事件的速度不一样的情况,如果发射的太快,就会出现事件积累的情况,而事件的积累会导致大量的资源浪费甚至OOM。而解决这一情况最直接暴力的方式就是控制上游的发射数量或发射速度,以给下游足够的事件处理事件,但是控制发射数量可能会导致部分事件被丢弃,控制发射速度可能影响性能。所以Flowable采用了更优雅的方式来解决这一问题,下游可以把自己“处理事件的能力”告诉上游,上游根据这个来决定要不要继续发射事件,也就是说下游可以根据自身情况主动通知上游发送事件,这也就完美解决了上下游流速不均衡的问题。
    Flowable提供了工厂方法create(FlowableOnSubscribe<T> source, BackpressureStrategy mode),可以通过第二个参数指定背压策略,MISSING表明由下游处理事件溢出问题,一般用于自定义参数的onBackpressureXXX操作符场景,ERROR表明如果下游跟不上上游的流速就抛出MissingBackpressureExceptionBUFFER表明缓冲所有的onNext值直到下游消费,DROP表明如果下游跟不上就丢弃最近的onNext值,LATEST表明如果下游跟不上就只保留最近的onNext值,覆盖之前的值。Flowable默认的bufferSize是128,由系统参数rx2.buffer-size决定。

    Observable可以看作是Flowable的阉割版,只是不支持背压逻辑,其它的逻辑、方法与Flowable是基本一样的。ObservableObservableSource接口的实现,而ObservableSource表示一个基本的、不支持背压的Observable源,由Observer消费,ObservableSource只有一个方法void subscribe(Observer<? super T> observer)用来订阅给定的Observer。也就是说Subscriber订阅FlowableObserver订阅Observable
    Observable可以通过非常多的工厂方法和操作符构建,如通过just()操作符构建:

    Observable.just("one", "two", "three", "four", "five")
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableObserver<String>() {
                @Override
                public void onNext(String value) {
                    Log.d(TAG, "onNext(" + value + ")");
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError()", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete()");
                }
            });

    那什么时候使用Observable,什么时候使用Flowable呢?
    何时使用Observable:

    • 最多1000个元素的数据流,即随着时间推移元素数量依然足够少以至于应用几乎没机会出现OOME。
    • 处理诸如鼠标移动或触摸事件之类的GUI事件:这些事件很少会合理地背压,也并不频繁。你可以使用Observable处理频率小于1000 Hz的事件,但最好考虑使用采样/去抖动。
    • 流本质上是同步的,但是您的平台不支持Java Streams,或者你忽略其中的功能。 使用Observable具有比Flowable更低的开销(你也可以考虑为Iterable流优化的IxJava 支持Java 6+)。

    何时使用Flowable:

    • 处理以某种方式生成的10k+元素,处理链可以告诉源限制元素生成的数量。
    • 从磁盘读取(解析)文件本质上是阻塞式和基于pull的,你可以很好地控制背压,例如从指定的请求量中读取多少行)。
    • 通过JDBC从数据库读取也是阻塞式和基于pull的,你可以根据每个下游请求调用ResultSet.next()来控制。
    • 网络(流)IO,其中网络帮助或使用的协议支持请求一些逻辑量。
    • 一些阻塞式和/或基于pull的数据源,最终会被一个非阻塞响应式的API/driver使用。

    SingleObservable类似但只能发射一个onSuccessonError给它的消费者io.reactivex.SingleObserver<T>

    public interface SingleObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T value);
        void onError(Throwable e);
    }

    响应流的事件模式为:onSubscribe (onSuccess | onError)?

    Completable用来表示一个延迟计算且不关心任何数据只关心完成和异常,只能发射一个onCompleteonError给它的消费者o.reactivex.CompletableObserver

    public interface CompletableObserver {
        void onSubscribe(Disposable d);
        void onComplete();
        void onError(Throwable e);
    }

    响应流的事件模式为:onSubscribe (onComplete | onError)?

    Maybe从概念上讲是SingleCompletable的结合,提供了一种捕获一些响应源发射0或1个item或一个error的发射模式的手段,用来表示一个延迟计算。MaybeMaybeSource作为基本接口,以MaybeObserver<T>作为信号接收接口。由于最多只能发射一个元素Maybe就没打算支持背压处理,这就意味着,在onSubscribe(Disposable)被调用后可能接着调用一个其它onXXX方法,与Flowable不同的是,如果只有一个值信号,只有onSuccess被调用而不是onComplete,使用这种新的基本响应类型与其他类型几乎是一样的,因为它提供了一个适用于0或1个item序列的Flowable操作符子集。

    Maybe.just(1)
    .map(v -> v + 1)
    .filter(v -> v == 1)
    .defaultIfEmpty(2)
    .test()
    .assertResult(2);

    响应流的事件模式为:onSubscribe (onSuccess | onError | onComplete)?

    Subscriber
    /**
     * 将Subscriber的一个实例传给Publisher#subscribe(Subscriber)方法后,将会收到一次onSubscribe(Subscription)方法调用
     * 在Subscription#request(long)被调用之前不会受到进一步通知
     * 在发送请求信号后:
     * 调用一到多次onNext(Object)方法,最多调用Subscription#request(long)中定义的次数
     * 调用一次onError(Throwable)或onComplete()方法表明终止状态,之后不会发送进一步事件
     * 只要Subscriber实例还能够处理,就可以通过Subscription#request(long)发请求信号
     *
     * @param <T> 信号元素类型
     */
    public interface Subscriber<T> {
        /**
         * 在调用Publisher#subscribe(Subscriber)之后调用
         * 直到Subscription#request(long)被调用才开始数据流
         * 每当需要更多数据时,这个Subscriber实例都有责任调用Subscription#request(long)
         * Publisher只有在Subscription#request(long)被调用后才会发送通知
         *
         * @param s 可以通过Subscription#request(long)方法请求数据的Subscription
         */
        public void onSubscribe(Subscription s);
    
        /**
         * Publisher发送的数据通知,以响应Subscription#request(long)的请求
         *
         * @param t the element signaled
         */
        public void onNext(T t);
    
        /**
         * 失败的结束状态
         * 即使再次调用Subscription#request(long)也不会再发送进一步事件
         *
         * @param t the throwable signaled
         */
        public void onError(Throwable t);
    
        /**
         * 成功的结束状态
         * 即使再次调用Subscription#request(long)也不会再发送进一步事件
         */
        public void onComplete();
    }

    为了在构建stream消费者时有更少的内部状态,Rxjava2为Flowable(和Observable)分别定义了DefaultSubscriberResourceSubscriberDisposableSubscriber(以及他们的XObserver变体),以提供资源跟踪支持,可以在外部通过dispose()方法cancelled(取消)或disposed(丢弃):

    ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
        @Override
        public void onStart() {
            request(Long.MAX_VALUE);
        }
    
        @Override
        public void onNext(Integer t) {
            System.out.println(t);
        }
    
        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    };
    
    Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);
    
    subscriber.dispose();

    为了能更好的控制订阅关系,每个基本响应类都提供了E subscribeWith(E subscriber)方法以返回订阅的subscriber/observer:

    CompositeDisposable composite2 = new CompositeDisposable();
    
    composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));

    在管理请求时有一点需要注意,在Subscriber.onSubscribeResourceSubscriber.onStart方法中调用request(n)可能会马上触发onNext方法(在onSubscribe/onStart方法返回前):

    Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {
    
        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("OnSubscribe start");
            s.request(Long.MAX_VALUE);
            System.out.println("OnSubscribe end");
        }
    
        @Override
        public void onNext(Integer v) {
            System.out.println(v);
        }
    
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    });

    将会打印:

    OnSubscribe start
    1
    2
    3
    Done
    OnSubscribe end

    也就是说,如果在onSubscribe/onStart方法中调用request之后再做一些初始化工作,那么onNext方法可能就看不到初始化的影响。为了避免出现这种情况,要确保onSubscribe/onStart中完成所有的初始化工作再调用request方法。

    Subscription
    /**
     * Subscription表示Subscriber订阅Publisher的一对一生命周期
     * 它只能由一个Subscriber使用一次
     * 它用于数据请求和取消请求(允许资源清理)
     *
     */
    public interface Subscription {
        /**
         * Publisher将不会发送任何事件直到通过该方法发送请求信号
         * 可以随时、频繁地调用,但未完成积累的请求决不能超过Long.MAX_VALUE
         * 未完成积累的请求数为Long.MAX_VALUE可能会被Publisher视为"有效无限"
         * Publisher可以发送的任何请求的信息,只有信号请求可以被安全处理
         * 如果流结束了Publisher发送数量可以少于请求的数量,但是随后必须发射Subscriber#onError(Throwable)或Subscriber#onComplete()
         *
         * @param n 向上游Publisher请求的元素数量(严格正数)
         */
        public void request(long n);
    
        /**
         * 请求Publisher停止发射数据并且清理资源
         * 由于这个请求是异步的,在调用cancel方法后,数据可能仍然被发送以满足之前的信号请求
         */
        public void cancel();
    }

    在Android中,对异步任务生命周期的管理尤为重要,而RxJava提供的最重要的接口就是Disposable

    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }

    由于ResourceSubscriberDisposableSubscriber(以及他们的XObserver变体)都实现了Disposable接口,对于单个资源请求我们可以直接调用它的dispose()方法丢弃,对于多个请求,我们就要借助CompositeDisposable来管理了,CompositeDisposable是多个Disposable的容器,提供了复杂度为O(1)的添加删除操作:

    public class MainActivity extends Activity {
        private static final String TAG = "RxAndroidSamples";
    
        private final CompositeDisposable disposables = new CompositeDisposable();
    
        @Override protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.main_activity);
            findViewById(R.id.button_run_scheduler).setOnClickListener(new View.OnClickListener() {
                @Override public void onClick(View v) {
                    onRunSchedulerExampleButtonClicked();
                }
            });
        }
    
        @Override protected void onDestroy() {
            super.onDestroy();
            disposables.clear();
        }
    
        void onRunSchedulerExampleButtonClicked() {
            disposables.add(sampleObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<String>() {
                    @Override public void onComplete() {
                        Log.d(TAG, "onComplete()");
                    }
    
                    @Override public void onError(Throwable e) {
                        Log.e(TAG, "onError()", e);
                    }
    
                    @Override public void onNext(String string) {
                        Log.d(TAG, "onNext(" + string + ")");
                    }
                }));
        }
    
        static Observable<String> sampleObservable() {
            return Observable.defer(new Callable<ObservableSource<? extends String>>() {
              @Override public ObservableSource<? extends String> call() throws Exception {
                    // Do some long running operation
                    SystemClock.sleep(5000);
                    return Observable.just("one", "two", "three", "four", "five");
                }
            });
        }
    }

    注意,CompositeDisposableclear()方法和dispose()方法类似,clear()可以多次被调用来丢弃容器中所有的Disposable,但dispose()被调用一次后就会失效。
    对于订阅生命周期的封装库最常见的是trello/RxLifecycle,不过它需要LifecycleProvider<T>才能工作,大部分情况下你只能选择继承它的RxActivityRxFragment。另一个很有意思的封装是zhihu/RxLifecycle,它利用TakeUtil操作符和一个HeadlessFragment来控制Observable的生命周期。

    Processor
    /**
     * Processor表示一个处理过程—Subscriber和Publisher都必须遵守两者的契约
     *
     * @param <T> Subscriber的信号元素类型
     * @param <R> Publisher的信号元素类型
     */
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

    References

    展开全文
  • 前言 刚参加工作的时候接触到了项目中的Rxjava,当时一点基础没有,学习了好长...但实际上,学会Rxjava对技术的提升还是很有帮助的,所以我会为大家讲解Rxjava的相关知识,以及Rxjava 1 到Rxjava 2的变化,帮助大...
  • Rxjava

    2020-07-27 23:06:49
    Rxjava是NetFlix出品的Java框架, 是“使用可观察序列组成的一个异步地、基于事件的响应式编程框架”,典型实例如下: Observable.create(new ObservableOnSubscribe<String>() { @Override public void ...
  • rxjava是什么

    2018-08-22 16:48:27
    该文章只是介绍了一些基本概念 reactiveX  近几年,反应式编程很火,那么reactivex是什么呢,一句话概括:reactiveX 是一个跨语言的标准、规范。 由来:reactivex是最开始是一个在微软的计算机科学家发明的,是...
  • 真爱,请置顶或星标 ...原文:juejin.im/post/5cd04b6e51882540e53fdfa2 ...距离上一次更新也有一段时间了,其实这篇文章我早就想写,碍于一直没来...来总结一下我RxJava遇到的坑,或者说我为什么不在推荐使用RxJa...
  • RxJava详解

    2019-06-08 20:03:00
    RxJava是ReactiveX推出的一个开源库,它是Reactive Extensions的Java VM实现,可以很方便的在Java中实现响应式编程。解决了Java中繁琐的异步切换、Callback hell等问题,使逻辑变得更加简洁。 1、操作符 RxJava提供...
  • 响应式函数编程框架RxJava入门,RxAndroid使用
  • 原文链接在第一篇中,我介绍了RxJava的基础知识。第二篇中,我向你展示了操作符的强大。但是你可能仍然没被说服。这篇里面,我讲向你展示RxJava的其他的一些好处,相信这篇足够让你去使用Rxjava.错误处理到目前为止...
  • 第一次接触RxJava是在前不久,一个新Android项目的启动,在评估时选择了RxJavaRxJava是一个基于事件订阅的异步执行的一个类库。听起来有点复杂,其实是要你使用过一次,就会大概明白它是怎么回事了!为是什么一个...
  • RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕。我读源码时,确实有点似懂非懂的感觉。网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾里的。既然用...
  • 关键词:合并 Observable前言在RxJava中, 同时处理多个Observables是很常见的一种操作。下面我们简单分析下几个组合类的操作符。Merge在异步的世界经常会创建这样的场景,我们有多个来源但是只想有一个结果:多输入...
  • 适合所有初中级工程师,从RxJava的每个知识点进行讲解,并在每个知识点中都带有文字说明和代码例子说明。通过这次系统化的学习,您将可以完整的学习到RxJava家族的所有特性和常用的操作符,掌握操作符的使用,掌握...
  • 1. 为什么写这篇文章RxJava这些年越来越流行,而上月末(2016.10.29)发布了2.0正式版,但网上大部分关于RxJava的教程都是1.x的。关于2.0的教程基本是介绍1.x和2.x的区别,对于RxJava的老用户来说,自然看看和1.x的...
  • Rxjava+ReTrofit+okHttp背景: 学习Rxjava+Retrofit+okhttp已经一段时间了,发现确实很强大,但是使用起来稍微有点麻烦,在工作中重复的代码太多,所以决定对http请求基于retrofit封装,最终效果还是比较满意,10行...
1 2 3 4 5 ... 20
收藏数 35,089
精华内容 14,035
关键字:

rxjava