rxjava2_rxjava2使用 - CSDN
精华内容
参与话题
  • RxJava2-完整攻略

    千次阅读 2019-05-29 17:39:58
    RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。 RxJava 有以下三个基本的元素: 被观察者(Observable) 观察者...

    https://juejin.im/post/5b17560e6fb9a01e2862246f

    0. 简介

    RxJava 其实就是提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。

    RxJava 有以下三个基本的元素:

    1. 被观察者(Observable)
    2. 观察者(Observer)
    3. 订阅(subscribe)

    下面来说说以上三者是如何协作的:

    首先在 gradle 文件中添加依赖:

    implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
    复制代码
    1. 创建被观察者:
    Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    });
    复制代码
    1. 创建观察者:
    Observer observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "======================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "======================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "======================onError");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "======================onComplete");
        }
    };
    复制代码
    1. 订阅
    observable.subscribe(observer);
    复制代码

    这里其实也可以使用链式调用:

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "======================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "======================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "======================onError");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "======================onComplete");
        }
    });
    复制代码

    被观察者发送的事件有以下几种,总结如下表:

    事件种类 作用
    onNext() 发送该事件时,观察者会回调 onNext() 方法
    onError() 发送该事件时,观察者会回调 onError() 方法,当发送该事件之后,其他事件将不会继续发送
    onComplete() 发送该事件时,观察者会回调 onComplete() 方法,当发送该事件之后,其他事件将不会继续发送

    其实可以把 RxJava 比喻成一个做果汁,家里有很多种水果(要发送的原始数据),你想榨点水果汁喝一下,这时候你就要想究竟要喝什么水果汁呢?如果你想喝牛油果雪梨柠檬汁,那你就要把这三种水果混在一起榨汁(使用各种操作符变换你想发送给观察者的数据),榨完后,你就可以喝上你想要的果汁了(把处理好的数据发送给观察者)。

    总结如下图:

     

     

     

    下面就来讲解 RxJava 各种常见的操作符。

    1. 创建操作符

    以下就是讲解创建被观察者的各种操作符。

    1.1 create()

    方法预览:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
    复制代码

    有什么用:

    创建一个被观察者

    怎么用:

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("Hello Observer");
            e.onComplete();
        }
    });
    复制代码

    上面的代码非常简单,创建 ObservableOnSubscribe 并重写其 subscribe 方法,就可以通过 ObservableEmitter 发射器向观察者发送事件。

    以下创建一个观察者,来验证这个被观察者是否成功创建。

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d("chan","=============onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d("chan","=============onComplete ");
        }
    };
            
    observable.subscribe(observer);
            
    复制代码

    打印结果:

    05-20 16:16:50.654 22935-22935/com.example.louder.rxjavademo D/chan: =============onNext Hello Observer
    =============onComplete
    复制代码

    1.2 just()

    方法预览:

    public static <T> Observable<T> just(T item) 
    ......
    public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10)
    复制代码

    有什么用?

    创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

    怎么用?

    Observable.just(1, 2, 3)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    
    复制代码

    上面的代码直接使用链式调用,代码也非常简单,这里就不细说了,看看打印结果:

    05-20 16:27:26.938 23281-23281/? D/chan: =================onSubscribe
    =================onNext 1
    =================onNext 2
    =================onNext 3
    =================onComplete 
    复制代码

    1.3 From 操作符

    1.3.1 fromArray()

    方法预览:

    public static <T> Observable<T> fromArray(T... items)
    复制代码

    有什么用?

    这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

    怎么用?

    Integer array[] = {1, 2, 3, 4};
    Observable.fromArray(array)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    复制代码

    代码和 just() 基本上一样,直接看打印结果:

    05-20 16:35:23.797 23574-23574/com.example.louder.rxjavademo D/chan: =================onSubscribe
    =================onNext 1
    =================onNext 2
    =================onNext 3
    =================onNext 4
    =================onComplete 
    复制代码

    1.3.2 fromCallable()

    方法预览:

    public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
    复制代码

    有什么用?

    这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

    怎么用?

    Observable.fromCallable(new Callable < Integer > () {
    
        @Override
        public Integer call() throws Exception {
            return 1;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "================accept " + integer);
        }
    });
    复制代码

    打印结果:

    05-26 13:01:43.009 6890-6890/? D/chan: ================accept 1
    复制代码

    1.3.3 fromFuture()

    方法预览:

    public static <T> Observable<T> fromFuture(Future<? extends T> future)
    复制代码

    有什么用?

    参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

    怎么用?

    FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
        @Override
        public String call() throws Exception {
            Log.d(TAG, "CallableDemo is Running");
            return "返回结果";
        }
    });
    
    Observable.fromFuture(futureTask)
        .doOnSubscribe(new Consumer < Disposable > () {
        @Override
        public void accept(Disposable disposable) throws Exception {
            futureTask.run();
        }
    })
    .subscribe(new Consumer < String > () {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "================accept " + s);
        }
    });
    复制代码

    doOnSubscribe() 的作用就是只有订阅时才会发送事件,具体会在下面讲解。

    打印结果:

    05-26 13:54:00.470 14429-14429/com.example.rxjavademo D/chan: CallableDemo is Running
    ================accept 返回结果
    复制代码

    1.3.4 fromIterable()

    方法预览:

    public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
    复制代码

    有什么用?

    直接发送一个 List 集合数据给观察者

    怎么用?

    List<Integer> list = new ArrayList<>();
    list.add(0);
    list.add(1);
    list.add(2);
    list.add(3);
    Observable.fromIterable(list)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    复制代码

    打印结果如下:

    05-20 16:43:28.874 23965-23965/? D/chan: =================onSubscribe
    =================onNext 0
    =================onNext 1
    =================onNext 2
    =================onNext 3
    =================onComplete 
    
    复制代码

    1.4 defer()

    方法预览:

    public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
    复制代码

    有什么用?

    这个方法的作用就是直到被观察者被订阅后才会创建被观察者。

    怎么用?

    // i 要定义为成员变量
    Integer i = 100;
            
    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(i);
        }
    });
    
    i = 200;
    
    Observer observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    };
    
    observable.subscribe(observer);
    
    i = 300;
    
    observable.subscribe(observer);
    复制代码

    打印结果如下:

    05-20 20:05:01.443 26622-26622/? D/chan: ================onNext 200
    ================onNext 300
    复制代码

    因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。

    1.5 timer()

    方法预览:

    public static Observable<Long> timer(long delay, TimeUnit unit) 
    ......
    复制代码

    有什么用?

    当到指定时间后就会发送一个 0L 的值给观察者。

    怎么用?

    Observable.timer(2, TimeUnit.SECONDS)
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "===============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-20 20:27:48.004 27204-27259/com.example.louder.rxjavademo D/chan: ===============onNext 0
    复制代码

    1.6 interval()

    方法预览:

    public static Observable<Long> interval(long period, TimeUnit unit)
    public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
    ......
    复制代码

    有什么用?

    每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

    怎么用?

    Observable.interval(4, TimeUnit.SECONDS)
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-20 20:48:10.321 28723-28723/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
    05-20 20:48:14.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 0
    05-20 20:48:18.324 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 1
    05-20 20:48:22.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 2
    05-20 20:48:26.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 3
    05-20 20:48:30.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 4
    05-20 20:48:34.323 28723-28746/com.example.louder.rxjavademo D/chan: ==============onNext 5
    复制代码

    从时间就可以看出每隔4秒就会发出一次数字递增1的事件。这里说下 interval() 第三个方法的 initialDelay 参数,这个参数的意思就是 onSubscribe 回调之后,再次回调 onNext 的间隔时间。

    1.7 intervalRange()

    方法预览:

    public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
    复制代码

    有什么用?

    可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。

    怎么用?

    Observable.intervalRange(2, 5, 2, 1, TimeUnit.SECONDS)
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-21 00:03:01.672 2504-2504/com.example.louder.rxjavademo D/chan: ==============onSubscribe 
    05-21 00:03:03.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 2
    05-21 00:03:04.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 3
    05-21 00:03:05.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 4
    05-21 00:03:06.673 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 5
    05-21 00:03:07.674 2504-2537/com.example.louder.rxjavademo D/chan: ==============onNext 6
    复制代码

    可以看出收到5次 onNext 事件,并且是从 2 开始的。

    1.8 range()

    方法预览:

    public static Observable<Integer> range(final int start, final int count)
    复制代码

    有什么用?

    同时发送一定范围的事件序列。

    怎么用?

    Observable.range(2, 5)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Integer aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-21 00:09:17.202 2921-2921/? D/chan: ==============onSubscribe 
    ==============onNext 2
    ==============onNext 3
    ==============onNext 4
    ==============onNext 5
    ==============onNext 6
    复制代码

    1.9 rangeLong()

    方法预览:

    public static Observable<Long> rangeLong(long start, long count)
    复制代码

    有什么用?

    作用与 range() 一样,只是数据类型为 Long

    怎么用?

    用法与 range() 一样,这里就不再赘述了。

    1.10 empty() & never() & error()

    方法预览:

    public static <T> Observable<T> empty()
    public static <T> Observable<T> never()
    public static <T> Observable<T> error(final Throwable exception)
    复制代码

    有什么用?

    1. empty() : 直接发送 onComplete() 事件
    2. never():不发送任何事件
    3. error():发送 onError() 事件

    怎么用?

    Observable.empty()
    .subscribe(new Observer < Object > () {
    
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe");
        }
    
        @Override
        public void onNext(Object o) {
            Log.d(TAG, "==================onNext");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError " + e);
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete");
        }
    });
    复制代码

    打印结果:

    05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
    ==================onComplete
    复制代码

    换成 never() 的打印结果:

    05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe
    复制代码

    换成 error() 的打印结果:

    05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
    ==================onError java.lang.NullPointerException
    复制代码

    2. 转换操作符

    2.1 map()

    方法预览:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
    复制代码

    有什么用?

    map 可以将被观察者发送的数据类型转变成其他的类型

    怎么用?

    以下代码将 Integer 类型的数据转换成 String。

    Observable.just(1, 2, 3)
    .map(new Function < Integer, String > () {
        @Override
        public String apply(Integer integer) throws Exception {
            return "I'm " + integer;
        }
    })
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "===================onSubscribe");
        }
    
        @Override
        public void onNext(String s) {
            Log.e(TAG, "===================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-21 09:16:03.490 5700-5700/com.example.rxjavademo E/chan: ===================onSubscribe
    ===================onNext I'm 1
    ===================onNext I'm 2
    ===================onNext I'm 3
    复制代码

    2.2 flatMap()

    方法预览:

    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    ......
    复制代码

    有什么用?

    这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。

    怎么用?

    flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个例子来说明 flatMap() 的用法。

    假设一个有一个 Person 类,这个类的定义如下:

    public class Person {
    
        private String name;
        private List<Plan> planList = new ArrayList<>();
    
        public Person(String name, List<Plan> planList) {
            this.name = name;
            this.planList = planList;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public List<Plan> getPlanList() {
            return planList;
        }
    
        public void setPlanList(List<Plan> planList) {
            this.planList = planList;
        }
    
    }
    复制代码

    Person 类有一个 name 和 planList 两个变量,分别代表的是人名和计划清单。

    Plan 类的定义如下:

    public class Plan {
    
        private String time;
        private String content;
        private List<String> actionList = new ArrayList<>();
    
        public Plan(String time, String content) {
            this.time = time;
            this.content = content;
        }
    
        public String getTime() {
            return time;
        }
    
        public void setTime(String time) {
            this.time = time;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        public List<String> getActionList() {
            return actionList;
        }
    
        public void setActionList(List<String> actionList) {
            this.actionList = actionList;
        }
    }
    复制代码

    现在有一个需求就是要将 Person 集合中的每个元素中的 Plan 的 action 打印出来。 首先用 map() 来实现这个需求看看:

    Observable.fromIterable(personList)
    .map(new Function < Person, List < Plan >> () {
        @Override
        public List < Plan > apply(Person person) throws Exception {
            return person.getPlanList();
        }
    })
    .subscribe(new Observer < List < Plan >> () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(List < Plan > plans) {
            for (Plan plan: plans) {
                List < String > planActionList = plan.getActionList();
                for (String action: planActionList) {
                    Log.d(TAG, "==================action " + action);
                }
            }
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    可以看到 onNext() 用了嵌套 for 循环来实现,如果代码逻辑复杂起来的话,可能需要多重循环才可以实现。

    现在看下使用 flatMap() 实现:

    Observable.fromIterable(personList)
    .flatMap(new Function < Person, ObservableSource < Plan >> () {
        @Override
        public ObservableSource < Plan > apply(Person person) {
            return Observable.fromIterable(person.getPlanList());
        }
    })
    .flatMap(new Function < Plan, ObservableSource < String >> () {
        @Override
        public ObservableSource < String > apply(Plan plan) throws Exception {
            return Observable.fromIterable(plan.getActionList());
        }
    })
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "==================action: " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    从代码可以看出,只需要两个 flatMap() 就可以完成需求,并且代码逻辑非常清晰。

    2.3 concatMap()

    方法预览:

    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
    复制代码

    有什么用?

    concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。

    怎么用?

    还是使用上面 flatMap() 的例子来讲解,首先来试下 flatMap() 来验证发送的事件是否是无序的,代码如下:

    Observable.fromIterable(personList)
    .flatMap(new Function < Person, ObservableSource < Plan >> () {
        @Override
        public ObservableSource < Plan > apply(Person person) {
            if ("chan".equals(person.getName())) {
                return Observable.fromIterable(person.getPlanList()).delay(10, TimeUnit.MILLISECONDS);
            }
            return Observable.fromIterable(person.getPlanList());
        }
    })
    .subscribe(new Observer < Plan > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Plan plan) {
            Log.d(TAG, "==================plan " + plan.getContent());
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    为了更好的验证 flatMap 是无序的,使用了一个 delay() 方法来延迟,直接看打印结果:

    05-21 13:57:14.031 21616-21616/com.example.rxjavademo D/chan: ==================plan chan 上课
    ==================plan chan 写作业
    ==================plan chan 打篮球
    05-21 13:57:14.041 21616-21641/com.example.rxjavademo D/chan: ==================plan Zede 开会
    ==================plan Zede 写代码
    ==================plan Zede 写文章
    复制代码

    可以看到本来 Zede 的事件发送顺序是排在 chan 事件之前,但是经过延迟后, 这两个事件序列发送顺序互换了。

    现在来验证下 concatMap() 是否是有序的,使用上面同样的代码,只是把 flatMap() 换成 concatMap(),打印结果如下:

    05-21 13:58:42.917 21799-21823/com.example.rxjavademo D/chan: ==================plan Zede 开会
    ==================plan Zede 写代码
    ==================plan Zede 写文章
    ==================plan chan 上课
    ==================plan chan 写作业
    ==================plan chan 打篮球
    复制代码

    这就代表 concatMap() 转换后发送的事件序列是有序的了。

    2.4 buffer()

    方法预览:

    public final Observable<List<T>> buffer(int count, int skip)
    ......
    复制代码

    有什么用?

    从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。

    怎么用?

    buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。这样说可能还是有点抽象,直接看代码:

    Observable.just(1, 2, 3, 4, 5)
    .buffer(2, 1)
    .subscribe(new Observer < List < Integer >> () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(List < Integer > integers) {
            Log.d(TAG, "================缓冲区大小: " + integers.size());
            for (Integer i: integers) {
                Log.d(TAG, "================元素: " + i);
            }
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: ================缓冲区大小: 2
    ================元素: 1
    ================元素: 2
    ================缓冲区大小: 2
    ================元素: 2
    ================元素: 3
    ================缓冲区大小: 2
    ================元素: 3
    ================元素: 4
    ================缓冲区大小: 2
    ================元素: 4
    ================元素: 5
    ================缓冲区大小: 1
    ================元素: 5
    复制代码

    从结果可以看出,每次发送事件,指针都会往后移动一个元素再取值,直到指针移动到没有元素的时候就会停止取值。

    2.5 groupBy()

    方法预览:

    public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
    复制代码

    有什么用?

    将发送的数据进行分组,每个分组都会返回一个被观察者。

    怎么用?

    Observable.just(5, 2, 3, 4, 1, 6, 8, 9, 7, 10)
    .groupBy(new Function < Integer, Integer > () {
        @Override
        public Integer apply(Integer integer) throws Exception {
            return integer % 3;
        }
    })
    .subscribe(new Observer < GroupedObservable < Integer, Integer >> () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "====================onSubscribe ");
        }
    
        @Override
        public void onNext(GroupedObservable < Integer, Integer > integerIntegerGroupedObservable) {
            Log.d(TAG, "====================onNext ");
            integerIntegerGroupedObservable.subscribe(new Observer < Integer > () {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "====================GroupedObservable onSubscribe ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "====================GroupedObservable onNext  groupName: " + integerIntegerGroupedObservable.getKey() + " value: " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "====================GroupedObservable onError ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "====================GroupedObservable onComplete ");
                }
            });
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "====================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "====================onComplete ");
        }
    });
    复制代码

    在 groupBy() 方法返回的参数是分组的名字,每返回一个值,那就代表会创建一个组,以上的代码就是将1~10的数据分成3组,来看看打印结果:

    05-26 14:38:02.062 21451-21451/com.example.rxjavademo D/chan: ====================onSubscribe 
    05-26 14:38:02.063 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
    ====================GroupedObservable onSubscribe     ====================GroupedObservable onNext  groupName: 2 value: 5
    ====================GroupedObservable onNext  groupName: 2 value: 2
    ====================onNext 
    ====================GroupedObservable onSubscribe 
    ====================GroupedObservable onNext  groupName: 0 value: 3
    05-26 14:38:02.064 21451-21451/com.example.rxjavademo D/chan: ====================onNext 
    ====================GroupedObservable onSubscribe 
    ====================GroupedObservable onNext  groupName: 1 value: 4
    ====================GroupedObservable onNext  groupName: 1 value: 1
    ====================GroupedObservable onNext  groupName: 0 value: 6
    ====================GroupedObservable onNext  groupName: 2 value: 8
    ====================GroupedObservable onNext  groupName: 0 value: 9
    ====================GroupedObservable onNext  groupName: 1 value: 7
    ====================GroupedObservable onNext  groupName: 1 value: 10
    05-26 14:38:02.065 21451-21451/com.example.rxjavademo D/chan: ====================GroupedObservable onComplete 
    ====================GroupedObservable onComplete 
    ====================GroupedObservable onComplete 
    ====================onComplete 
    复制代码

    可以看到返回的结果中是有3个组的。

    2.6 scan()

    方法预览:

    public final Observable<T> scan(BiFunction<T, T, T> accumulator)
    复制代码

    有什么用?

    将数据以一定的逻辑聚合起来。

    怎么用?

    Observable.just(1, 2, 3, 4, 5)
    .scan(new BiFunction < Integer, Integer, Integer > () {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            Log.d(TAG, "====================apply ");
            Log.d(TAG, "====================integer " + integer);
            Log.d(TAG, "====================integer2 " + integer2);
            return integer + integer2;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "====================accept " + integer);
        }
    });
    复制代码

    打印结果:

    05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
    ====================apply 
    ====================integer 1
    ====================integer2 2
    ====================accept 3
    ====================apply 
    05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
    ====================integer2 3
    ====================accept 6
    ====================apply 
    ====================integer 6
    ====================integer2 4
    ====================accept 10
    ====================apply 
    ====================integer 10
    ====================integer2 5
    ====================accept 15
    复制代码

    2.7 window()

    方法预览:

    public final Observable<Observable<T>> window(long count)
    ......
    复制代码

    有什么用?

    发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。

    怎么用?

    Observable.just(1, 2, 3, 4, 5)
    .window(2)
    .subscribe(new Observer < Observable < Integer >> () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=====================onSubscribe ");
        }
    
        @Override
        public void onNext(Observable < Integer > integerObservable) {
            integerObservable.subscribe(new Observer < Integer > () {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "=====================integerObservable onSubscribe ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "=====================integerObservable onNext " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "=====================integerObservable onError ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "=====================integerObservable onComplete ");
                }
            });
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=====================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=====================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-26 15:02:20.654 25838-25838/com.example.rxjavademo D/chan: =====================onSubscribe 
    05-26 15:02:20.655 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onSubscribe 
    05-26 15:02:20.656 25838-25838/com.example.rxjavademo D/chan: =====================integerObservable onNext 1
    =====================integerObservable onNext 2
    =====================integerObservable onComplete 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 3
    =====================integerObservable onNext 4
    =====================integerObservable onComplete 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 5
    =====================integerObservable onComplete 
    =====================onComplete 
    复制代码

    从结果可以发现,window() 将 1~5 的事件分成了3组。

    3. 组合操作符

    3.1 concat()

    方法预览:

    public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
    ......
    复制代码

    有什么用?

    可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。

    怎么用?

    Observable.concat(Observable.just(1, 2),
    Observable.just(3, 4),
    Observable.just(5, 6),
    Observable.just(7, 8))
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印如下:

    05-21 15:40:26.738 7477-7477/com.example.rxjavademo D/chan: ================onNext 1
    ================onNext 2
    05-21 15:40:26.739 7477-7477/com.example.rxjavademo D/chan: ================onNext 3
    ================onNext 4
    ================onNext 5
    ================onNext 6
    ================onNext 7
    ================onNext 8
    复制代码

    3.2 concatArray()

    方法预览:

    public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
    复制代码

    有什么用?

    与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。

    怎么用?

    Observable.concatArray(Observable.just(1, 2),
    Observable.just(3, 4),
    Observable.just(5, 6),
    Observable.just(7, 8),
    Observable.just(9, 10))
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-21 15:47:18.581 9129-9129/com.example.rxjavademo D/chan: ================onNext 1
    ================onNext 2
    ================onNext 3
    ================onNext 4
    ================onNext 5
    ================onNext 6
    ================onNext 7
    ================onNext 8
    ================onNext 9
    ================onNext 10
    复制代码

    3.3 merge()

    方法预览:

     public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
    ......
    复制代码

    有什么用?

    这个方法月 concat() 作用基本一样,知识 concat() 是串行发送事件,而 merge() 并行发送事件。

    怎么用?

    现在来演示 concat() 和 merge() 的区别。

    Observable.merge(
    Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
        @Override
        public String apply(Long aLong) throws Exception {
            return "A" + aLong;
        }
    }),
    Observable.interval(1, TimeUnit.SECONDS).map(new Function < Long, String > () {
        @Override
        public String apply(Long aLong) throws Exception {
            return "B" + aLong;
        }
    }))
        .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "=====================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果如下:

    05-21 16:10:31.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B0
    05-21 16:10:31.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A0
    05-21 16:10:32.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A1
    05-21 16:10:32.126 12801-12850/com.example.rxjavademo D/chan: =====================onNext B1
    05-21 16:10:33.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A2
    05-21 16:10:33.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B2
    05-21 16:10:34.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A3
    05-21 16:10:34.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B3
    05-21 16:10:35.124 12801-12849/com.example.rxjavademo D/chan: =====================onNext A4
    05-21 16:10:35.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B4
    05-21 16:10:36.125 12801-12849/com.example.rxjavademo D/chan: =====================onNext A5
    05-21 16:10:36.125 12801-12850/com.example.rxjavademo D/chan: =====================onNext B5
    ......
    复制代码

    从结果可以看出,A 和 B 的事件序列都可以发出,将以上的代码换成 concat() 看看打印结果:

    05-21 16:17:52.352 14597-14621/com.example.rxjavademo D/chan: =====================onNext A0
    05-21 16:17:53.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A1
    05-21 16:17:54.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A2
    05-21 16:17:55.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A3
    05-21 16:17:56.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A4
    05-21 16:17:57.351 14597-14621/com.example.rxjavademo D/chan: =====================onNext A5
    ......
    复制代码

    从结果可以知道,只有等到第一个被观察者发送完事件之后,第二个被观察者才会发送事件。

    mergeArray() 与 merge() 的作用是一样的,只是它可以发送4个以上的被观察者,这里就不再赘述了。

    3.4 concatArrayDelayError() & mergeArrayDelayError()

    方法预览:

    public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
    public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
    复制代码

    有什么用?

    在 concatArray() 和 mergeArray() 两个方法当中,如果其中有一个被观察者发送了一个 Error 事件,那么就会停止发送事件,如果你想 onError() 事件延迟到所有被观察者都发送完事件后再执行的话,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()

    怎么用?

    首先使用 concatArray() 来验证一下发送 onError() 事件是否会中断其他被观察者发送事件,代码如下:

    Observable.concatArray(
    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onError(new NumberFormatException());
        }
    }), Observable.just(2, 3, 4))
        .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果:

    05-21 16:38:59.725 17985-17985/com.example.rxjavademo D/chan: ===================onNext 1
    ===================onError 
    复制代码

    从结果可以知道,确实中断了,现在换用 concatArrayDelayError(),代码如下:

    Observable.concatArrayDelayError(
    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onError(new NumberFormatException());
        }
    }), Observable.just(2, 3, 4))
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    复制代码

    打印结果如下:

    05-21 16:40:59.329 18199-18199/com.example.rxjavademo D/chan: ===================onNext 1
    ===================onNext 2
    ===================onNext 3
    ===================onNext 4
    ===================onError 
    复制代码

    从结果可以看到,onError 事件是在所有被观察者发送完事件才发送的。mergeArrayDelayError() 也是有同样的作用,这里不再赘述。

    3.5 zip()

    方法预览:

    public static <T1, T2, R> Observable<R> zip(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper)
    ......
    复制代码

    有什么用?

    会将多个被观察者合并,根据各个被观察者发送事件的顺序一个个结合起来,最终发送的事件数量会与源 Observable 中最少事件的数量一样。

    怎么用?

    Observable.zip(Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
        .map(new Function<Long, String>() {
            @Override
            public String apply(Long aLong) throws Exception {
                String s1 = "A" + aLong;
                Log.d(TAG, "===================A 发送的事件 " + s1);
                return s1;
            }}),
            Observable.intervalRange(1, 6, 1, 1, TimeUnit.SECONDS)
                .map(new Function<Long, String>() {
                @Override
                public String apply(Long aLong) throws Exception {
                    String s2 = "B" + aLong;
                    Log.d(TAG, "===================B 发送的事件 " + s2);
                    return s2;
                }
            }),
            new BiFunction<String, String, String>() {
                @Override
                public String apply(String s, String s2) throws Exception {
                    String res = s + s2;
                    return res;
                }
            })
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "===================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
    });
    复制代码

    上面代码中有两个 Observable,第一个发送事件的数量为5个,第二个发送事件的数量为6个。现在来看下打印结果:

    05-22 09:10:39.952 5338-5338/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-22 09:10:40.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
    05-22 09:10:40.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
    ===================onNext A1B1
    05-22 09:10:41.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
    05-22 09:10:41.954 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
    ===================onNext A2B2
    05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
    05-22 09:10:42.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
    05-22 09:10:42.953 5338-5362/com.example.rxjavademo D/chan: ===================onNext A3B3
    05-22 09:10:43.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
    05-22 09:10:43.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
    05-22 09:10:43.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A4B4
    05-22 09:10:44.953 5338-5362/com.example.rxjavademo D/chan: ===================A 发送的事件 A5
    05-22 09:10:44.953 5338-5363/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
    05-22 09:10:44.954 5338-5363/com.example.rxjavademo D/chan: ===================onNext A5B5
    ===================onComplete 
    复制代码

    可以发现最终接收到的事件数量是5,那么为什么第二个 Observable 没有发送第6个事件呢?因为在这之前第一个 Observable 已经发送了 onComplete 事件,所以第二个 Observable 不会再发送事件。

    3.6 combineLatest() & combineLatestDelayError()

    方法预览:

    public static <T1, T2, R> Observable<R> combineLatest(ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> combiner)
    ....... 
    复制代码

    有什么用?

    combineLatest() 的作用与 zip() 类似,但是 combineLatest() 发送事件的序列是与发送的时间线有关的,当 combineLatest() 中所有的 Observable 都发送了事件,只要其中有一个 Observable 发送事件,这个事件就会和其他 Observable 最近发送的事件结合起来发送,这样可能还是比较抽象,看看以下例子代码。

    怎么用?

    Observable.combineLatest(
    Observable.intervalRange(1, 4, 1, 1, TimeUnit.SECONDS)
        .map(new Function < Long, String > () {@Override
        public String apply(Long aLong) throws Exception {
            String s1 = "A" + aLong;
            Log.d(TAG, "===================A 发送的事件 " + s1);
            return s1;
        }
    }),
    Observable.intervalRange(1, 5, 2, 2, TimeUnit.SECONDS)
        .map(new Function < Long, String > () {@Override
        public String apply(Long aLong) throws Exception {
            String s2 = "B" + aLong;
            Log.d(TAG, "===================B 发送的事件 " + s2);
            return s2;
        }
    }),
    new BiFunction < String, String, String > () {@Override
        public String apply(String s, String s2) throws Exception {
            String res = s + s2;
            return res;
        }
    })
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "===================最终接收到的事件 " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
    });
    复制代码

    分析上面的代码,Observable A 会每隔1秒就发送一次事件,Observable B 会隔2秒发送一次事件。来看看打印结果:

    05-22 11:41:20.859 15104-15104/? D/chan: ===================onSubscribe 
    05-22 11:41:21.859 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A1
    05-22 11:41:22.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A2
    05-22 11:41:22.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B1
    05-22 11:41:22.862 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A2B1
    05-22 11:41:23.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A3
    ===================最终接收到的事件 A3B1
    05-22 11:41:24.860 15104-15128/com.example.rxjavademo D/chan: ===================A 发送的事件 A4
    05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B2
    05-22 11:41:24.861 15104-15128/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B1
    05-22 11:41:24.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B2
    05-22 11:41:26.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B3
    05-22 11:41:26.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B3
    05-22 11:41:28.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B4
    05-22 11:41:28.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B4
    05-22 11:41:30.860 15104-15129/com.example.rxjavademo D/chan: ===================B 发送的事件 B5
    05-22 11:41:30.861 15104-15129/com.example.rxjavademo D/chan: ===================最终接收到的事件 A4B5
    ===================onComplete 
    复制代码

    分析上述结果可以知道,当发送 A1 事件之后,因为 B 并没有发送任何事件,所以根本不会发生结合。当 B 发送了 B1 事件之后,就会与 A 最近发送的事件 A2 结合成 A2B1,这样只有后面一有被观察者发送事件,这个事件就会与其他被观察者最近发送的事件结合起来了。

    因为 combineLatestDelayError() 就是多了延迟发送 onError() 功能,这里就不再赘述了。

    3.7 reduce()

    方法预览:

    public final Maybe<T> reduce(BiFunction<T, T, T> reducer)
    复制代码

    有什么用?

    与 scan() 操作符的作用也是将发送数据以一定逻辑聚合起来,这两个的区别在于 scan() 每处理一次数据就会将事件发送给观察者,而 reduce() 会将所有数据聚合在一起才会发送事件给观察者。

    怎么用?

    Observable.just(0, 1, 2, 3)
    .reduce(new BiFunction < Integer, Integer, Integer > () {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            int res = integer + integer2;
            Log.d(TAG, "====================integer " + integer);
            Log.d(TAG, "====================integer2 " + integer2);
            Log.d(TAG, "====================res " + res);
            return res;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "==================accept " + integer);
        }
    });
    复制代码

    打印结果:

    05-22 14:21:46.042 17775-17775/? D/chan: ====================integer 0
    ====================integer2 1
    ====================res 1
    ====================integer 1
    ====================integer2 2
    ====================res 3
    ====================integer 3
    ====================integer2 3
    ====================res 6
    ==================accept 6
    复制代码

    从结果可以看到,其实就是前2个数据聚合之后,然后再与后1个数据进行聚合,一直到没有数据为止。

    3.8 collect()

    方法预览:

    public final <U> Single<U> collect(Callable<? extends U> initialValueSupplier, BiConsumer<? super U, ? super T> collector)
    复制代码

    有什么用?

    将数据收集到数据结构当中。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .collect(new Callable < ArrayList < Integer >> () {
        @Override
        public ArrayList < Integer > call() throws Exception {
            return new ArrayList < > ();
        }
    },
    new BiConsumer < ArrayList < Integer > , Integer > () {
        @Override
        public void accept(ArrayList < Integer > integers, Integer integer) throws Exception {
            integers.add(integer);
        }
    })
    .subscribe(new Consumer < ArrayList < Integer >> () {
        @Override
        public void accept(ArrayList < Integer > integers) throws Exception {
            Log.d(TAG, "===============accept " + integers);
        }
    });
    复制代码

    打印结果:

    05-22 16:47:18.257 31361-31361/com.example.rxjavademo D/chan: ===============accept [1, 2, 3, 4]
    复制代码

    3.9 startWith() & startWithArray()

    方法预览:

    public final Observable<T> startWith(T item)
    public final Observable<T> startWithArray(T... items)
    复制代码

    有什么用?

    在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。

    怎么用?

    Observable.just(5, 6, 7)
    .startWithArray(2, 3, 4)
    .startWith(1)
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "================accept " + integer);
        }
    });
    复制代码

    打印结果:

    05-22 17:08:21.282 4505-4505/com.example.rxjavademo D/chan: ================accept 1
    ================accept 2
    ================accept 3
    ================accept 4
    ================accept 5
    ================accept 6
    ================accept 7
    复制代码

    3.10 count()

    方法预览:

    public final Single<Long> count()
    复制代码

    有什么用?

    返回被观察者发送事件的数量。

    怎么用?

    Observable.just(1, 2, 3)
    .count()
    .subscribe(new Consumer < Long > () {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "=======================aLong " + aLong);
        }
    });
    复制代码

    打印结果:

    05-22 20:41:25.025 14126-14126/? D/chan: =======================aLong 3
    复制代码

    4. 功能操作符

    4.1 delay()

    方法预览:

    public final Observable<T> delay(long delay, TimeUnit unit)
    复制代码

    有什么用?

    延迟一段事件发送事件。

    怎么用?

    Observable.just(1, 2, 3)
    .delay(2, TimeUnit.SECONDS)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=======================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=======================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=======================onSubscribe");
        }
    });
    复制代码

    这里延迟了两秒才发送事件,来看看打印结果:

    05-22 20:53:43.618 16880-16880/com.example.rxjavademo D/chan: =======================onSubscribe
    05-22 20:53:45.620 16880-16906/com.example.rxjavademo D/chan: =======================onNext 1
    05-22 20:53:45.621 16880-16906/com.example.rxjavademo D/chan: =======================onNext 2
    =======================onNext 3
    =======================onSubscribe
    复制代码

    从打印结果可以看出 onSubscribe 回调2秒之后 onNext 才会回调。

    4.2 doOnEach()

    方法预览:

    public final Observable<T> doOnEach(final Consumer<? super Notification<T>> onNotification)
    复制代码

    有什么用?

    Observable 每发送一件事件之前都会先回调这个方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            //      e.onError(new NumberFormatException());
            e.onComplete();
        }
    })
    .doOnEach(new Consumer < Notification < Integer >> () {
        @Override
        public void accept(Notification < Integer > integerNotification) throws Exception {
            Log.d(TAG, "==================doOnEach " + integerNotification.getValue());
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 09:07:05.547 19867-19867/? D/chan: ==================onSubscribe 
    ==================doOnEach 1
    ==================onNext 1
    ==================doOnEach 2
    ==================onNext 2
    ==================doOnEach 3
    ==================onNext 3
    ==================doOnEach null
    ==================onComplete 
    复制代码

    从结果就可以看出每发送一个事件之前都会回调 doOnEach 方法,并且可以取出 onNext() 发送的值。

    4.3 doOnNext()

    方法预览:

    public final Observable<T> doOnNext(Consumer<? super T> onNext)
    复制代码

    有什么用?

    Observable 每发送 onNext() 之前都会先回调这个方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doOnNext(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "==================doOnNext " + integer);
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 09:09:36.769 20020-20020/com.example.rxjavademo D/chan: ==================onSubscribe 
    ==================doOnNext 1
    ==================onNext 1
    ==================doOnNext 2
    ==================onNext 2
    ==================doOnNext 3
    ==================onNext 3
    ==================onComplete 
    复制代码

    4.4 doAfterNext()

    方法预览:

    public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
    复制代码

    有什么用?

    Observable 每发送 onNext() 之后都会回调这个方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doAfterNext(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "==================doAfterNext " + integer);
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    
    复制代码

    打印结果:

    05-23 09:15:49.215 20432-20432/com.example.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================doAfterNext 1
    ==================onNext 2
    ==================doAfterNext 2
    ==================onNext 3
    ==================doAfterNext 3
    ==================onComplete 
    复制代码

    4.5 doOnComplete()

    方法预览:

    public final Observable<T> doOnComplete(Action onComplete)
    复制代码

    有什么用?

    Observable 每发送 onComplete() 之前都会回调这个方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doOnComplete(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnComplete ");
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 09:32:18.031 20751-20751/? D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================doOnComplete 
    ==================onComplete 
    复制代码

    4.6 doOnError()

    方法预览:

    public final Observable<T> doOnError(Consumer<? super Throwable> onError)
    复制代码

    有什么用?

    Observable 每发送 onError() 之前都会回调这个方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
        }
    })
    .doOnError(new Consumer < Throwable > () {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.d(TAG, "==================doOnError " + throwable);
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 09:35:04.150 21051-21051/? D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================doOnError java.lang.NullPointerException
    ==================onError 
    
    复制代码

    4.7 doOnSubscribe()

    方法预览:

    public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
    复制代码

    有什么用?

    Observable 每发送 onSubscribe() 之前都会回调这个方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doOnSubscribe(new Consumer < Disposable > () {
        @Override
        public void accept(Disposable disposable) throws Exception {
            Log.d(TAG, "==================doOnSubscribe ");
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 09:39:25.778 21245-21245/? D/chan: ==================doOnSubscribe 
    ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onComplete 
    复制代码

    4.8 doOnDispose()

    方法预览:

    public final Observable<T> doOnDispose(Action onDispose)
    复制代码

    有什么用?

    当调用 Disposable 的 dispose() 之后回调该方法。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnDispose ");
        }
    })
    .subscribe(new Observer < Integer > () {
        private Disposable d;
        
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
            this.d = d;
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
            d.dispose();
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 09:55:48.122 22023-22023/com.example.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================doOnDispose 
    复制代码

    4.9 doOnLifecycle()

    方法预览:

    public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
    复制代码

    有什么用?

    在回调 onSubscribe 之前回调该方法的第一个参数的回调方法,可以使用该回调方法决定是否取消订阅。

    怎么用?

    doOnLifecycle() 第二个参数的回调方法的作用与 doOnDispose() 是一样的,现在用下面的例子来讲解:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doOnLifecycle(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            Log.d(TAG, "==================doOnLifecycle accept");
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnLifecycle Action");
        }
    })
    .doOnDispose(
        new Action() {
            @Override
            public void run() throws Exception {
                Log.d(TAG, "==================doOnDispose Action");
            }
    })
    .subscribe(new Observer<Integer>() {
        private Disposable d;
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
            this.d = d;
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
            d.dispose();
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
        
    });
    复制代码

    打印结果:

    05-23 10:20:36.345 23922-23922/? D/chan: ==================doOnLifecycle accept
    ==================onSubscribe 
    ==================onNext 1
    ==================doOnDispose Action
    ==================doOnLifecycle Action
    复制代码

    可以看到当在 onNext() 方法进行取消订阅操作后,doOnDispose() 和 doOnLifecycle() 都会被回调。

    如果使用 doOnLifecycle 进行取消订阅,来看看打印结果:

    05-23 10:32:20.014 24652-24652/com.example.rxjavademo D/chan: ==================doOnLifecycle accept
    ==================onSubscribe 
    复制代码

    可以发现 doOnDispose Action 和 doOnLifecycle Action 都没有被回调。

    4.10 doOnTerminate() & doAfterTerminate()

    方法预览:

    public final Observable<T> doOnTerminate(final Action onTerminate)
    public final Observable<T> doAfterTerminate(Action onFinally)
    复制代码

    有什么用?

    doOnTerminate 是在 onError 或者 onComplete 发送之前回调,而 doAfterTerminate 则是 onError 或者 onComplete 发送之后回调。

    怎么用?

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
    //      e.onError(new NullPointerException());
            e.onComplete();
        }
    })
    .doOnTerminate(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnTerminate ");
        }
    })
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
        
    });
    复制代码

    打印结果:

    05-23 10:00:39.503 22398-22398/com.example.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    05-23 10:00:39.504 22398-22398/com.example.rxjavademo D/chan: ==================onNext 3
    ==================doOnTerminate 
    ==================onComplete 
    复制代码

    doAfterTerminate 也是差不多,这里就不再赘述。

    4.11 doFinally()

    方法预览:

    public final Observable<T> doFinally(Action onFinally)
    复制代码

    有什么用?

    在所有事件发送完毕之后回调该方法。

    怎么用?

    这里可能你会有个问题,那就是 doFinally() 和 doAfterTerminate() 到底有什么区别?区别就是在于取消订阅,如果取消订阅之后 doAfterTerminate() 就不会被回调,而 doFinally() 无论怎么样都会被回调,且都会在事件序列的最后。

    现在用以下例子说明下:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .doFinally(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doFinally ");
        }
    })
    .doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doOnDispose ");
        }
    })
    .doAfterTerminate(new Action() {
        @Override
        public void run() throws Exception {
            Log.d(TAG, "==================doAfterTerminate ");
        }
    })
    .subscribe(new Observer<Integer>() {
        private Disposable d;
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
            this.d = d;
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
            d.dispose();
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 10:10:10.469 23196-23196/? D/chan: ==================onSubscribe 
    05-23 10:10:10.470 23196-23196/? D/chan: ==================onNext 1
    ==================doOnDispose 
    ==================doFinally 
    复制代码

    可以看到如果调用了 dispose() 方法,doAfterTerminate() 不会被回调。

    现在试试把 dispose() 注释掉看看,看看打印结果:

    05-23 10:13:34.537 23439-23439/com.example.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onComplete 
    ==================doAfterTerminate 
    ==================doFinally 
    复制代码

    doAfterTerminate() 已经成功回调,doFinally() 还是会在事件序列的最后。

    4.12 onErrorReturn()

    方法预览:

    public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier)
    复制代码

    有什么用?

    当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。

    怎么用?

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
        }
    })
    .onErrorReturn(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) throws Exception {
            Log.d(TAG, "==================onErrorReturn " + throwable);
            return 404;
        }
    })
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 18:35:18.175 19239-19239/? D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onErrorReturn java.lang.NullPointerException
    ==================onNext 404
    ==================onComplete 
    复制代码

    4.13 onErrorResumeNext()

    方法预览:

    public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction)
    复制代码

    有什么用?

    当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。

    怎么用?

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new NullPointerException());
        }
    })
    .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
            Log.d(TAG, "==================onErrorResumeNext " + throwable);
            return Observable.just(4, 5, 6);
        }
    })
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 18:43:10.910 26469-26469/? D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onErrorResumeNext java.lang.NullPointerException
    ==================onNext 4
    ==================onNext 5
    ==================onNext 6
    ==================onComplete 
    复制代码

    4.14 onExceptionResumeNext()

    方法预览:

    public final Observable<T> onExceptionResumeNext(final ObservableSource<? extends T> next)
    复制代码

    有什么用?

    与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。

    怎么用?

    先来试试 onExceptionResumeNext() 是否能捕捉 Error。

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new Error("404"));
        }
    })
    .onExceptionResumeNext(new Observable<Integer>() {
        @Override
        protected void subscribeActual(Observer<? super Integer> observer) {
            observer.onNext(333);
            observer.onComplete();
        }
    })
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 22:23:08.873 1062-1062/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    05-23 22:23:08.874 1062-1062/com.example.louder.rxjavademo D/chan: ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onError 
    复制代码

    从打印结果可以知道,观察者收到 onError() 事件,证明 onErrorResumeNext() 不能捕捉 Error 事件。

    将被观察者的 e.onError(new Error("404")) 改为 e.onError(new Exception("404")),现在看看是否能捕捉 Exception 事件:

    05-23 22:32:14.563 10487-10487/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onNext 333
    ==================onComplete 
    复制代码

    从打印结果可以知道,这个方法成功捕获 Exception 事件。

    4.15 retry()

    方法预览:

    public final Observable<T> retry(long times)
    ......
    复制代码

    有什么用?

    如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。

    怎么用?

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new Exception("404"));
        }
    })
    .retry(2)
    .subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 22:46:18.537 22239-22239/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    05-23 22:46:18.538 22239-22239/com.example.louder.rxjavademo D/chan: ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onError 
    复制代码

    4.16 retryUntil()

    方法预览:

    public final Observable<T> retryUntil(final BooleanSupplier stop)
    复制代码

    有什么用?

    出现错误事件之后,可以通过此方法判断是否继续发送事件。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onError(new Exception("404"));
        }
    })
    .retryUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            if (i == 6) {
                return true;
            }
            return false;
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-23 22:57:32.905 23063-23063/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    05-23 22:57:32.906 23063-23063/com.example.louder.rxjavademo D/chan: ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onError 
    
    复制代码

    4.17 retryWhen()

    方法预览:

    public final void safeSubscribe(Observer<? super T> s)
    复制代码

    有什么用?

    当被观察者接收到异常或者错误事件时会回调该方法,这个方法会返回一个新的被观察者。如果返回的被观察者发送 Error 事件则之前的被观察者不会继续发送事件,如果发送正常事件则之前的被观察者会继续不断重试发送事件。

    怎么用?

    Observable.create(new ObservableOnSubscribe < String > () {
        @Override
        public void subscribe(ObservableEmitter < String > e) throws Exception {
            e.onNext("chan");
            e.onNext("ze");
            e.onNext("de");
            e.onError(new Exception("404"));
            e.onNext("haha");
        }
    })
    .retryWhen(new Function < Observable < Throwable > , ObservableSource <? >> () {
        @Override
        public ObservableSource <? > apply(Observable < Throwable > throwableObservable) throws Exception {
            return throwableObservable.flatMap(new Function < Throwable, ObservableSource <? >> () {
                @Override
                public ObservableSource <? > apply(Throwable throwable) throws Exception {
                    if(!throwable.toString().equals("java.lang.Exception: 404")) {
                        return Observable.just("可以忽略的异常");
                    } else {
                        return Observable.error(new Throwable("终止啦"));
                    }
                }
            });
        }
    })
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "==================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError " + e.toString());
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 09:13:25.622 28372-28372/com.example.rxjavademo D/chan: ==================onSubscribe 
    05-24 09:13:25.623 28372-28372/com.example.rxjavademo D/chan: ==================onNext chan
    ==================onNext ze
    ==================onNext de
    05-24 09:13:25.624 28372-28372/com.example.rxjavademo D/chan: ==================onError java.lang.Throwable: 终止啦
    复制代码

    将 onError(new Exception("404")) 改为 onError(new Exception("303")) 看看打印结果:

    ==================onNext chan
    05-24 09:54:08.653 29694-29694/? D/chan: ==================onNext ze
    ==================onNext de
    ==================onNext chan
    ==================onNext ze
    ==================onNext de
    ==================onNext chan
    ==================onNext ze
    ==================onNext de
    ==================onNext chan
    ==================onNext ze
    ==================onNext de
    ==================onNext chan
    ==================onNext ze
    ==================onNext de
    ==================onNext chan
    ......
    复制代码

    从结果可以看出,会不断重复发送消息。

    4.18 repeat()

    方法预览:

    public final Observable<T> repeat(long times)
    ......
    复制代码

    有什么用?

    重复发送被观察者的事件,times 为发送次数。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .repeat(2)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onSubscribe 
    ===================onNext 1
    ===================onNext 2
    ===================onNext 3
    ===================onNext 1
    ===================onNext 2
    ===================onNext 3
    05-24 11:33:29.565 8544-8544/com.example.rxjavademo D/chan: ===================onComplete 
    复制代码

    从结果可以看出,该事件发送了两次。

    4.19 repeatWhen()

    方法预览:

    public final Observable<T> repeatWhen(final Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
    复制代码

    有什么用?

    这个方法可以会返回一个新的被观察者设定一定逻辑来决定是否重复发送事件。

    怎么用?

    这里分三种情况,如果新的被观察者返回 onComplete 或者 onError 事件,则旧的被观察者不会继续发送事件。如果被观察者返回其他事件,则会重复发送事件。

    现在试验发送 onComplete 事件,代码如下:

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .repeatWhen(new Function < Observable < Object > , ObservableSource <? >> () {
        @Override
        public ObservableSource <? > apply(Observable < Object > objectObservable) throws Exception {
            return Observable.empty();
        //  return Observable.error(new Exception("404"));
        //  return Observable.just(4); null;
        }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 11:44:33.486 9379-9379/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-24 11:44:33.487 9379-9379/com.example.rxjavademo D/chan: ===================onComplete 
    复制代码

    下面直接看看发送 onError 事件和其他事件的打印结果。

    发送 onError 打印结果:

    05-24 11:46:29.507 9561-9561/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-24 11:46:29.508 9561-9561/com.example.rxjavademo D/chan: ===================onError 
    复制代码

    发送其他事件的打印结果:

    05-24 11:48:35.844 9752-9752/com.example.rxjavademo D/chan: ===================onSubscribe 
    ===================onNext 1
    ===================onNext 2
    ===================onNext 3
    ===================onComplete 
    复制代码

    4.20 subscribeOn()

    方法预览:

    public final Observable<T> subscribeOn(Scheduler scheduler)
    复制代码

    有什么用?

    指定被观察者的线程,要注意的时,如果多次调用此方法,只有第一次有效。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    //.subscribeOn(Schedulers.newThread())
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "======================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "======================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "======================onError");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "======================onComplete");
        }
    });
    复制代码

    现在不调用 subscribeOn() 方法,来看看打印结果:

    05-26 10:40:42.246 21466-21466/? D/chan: ======================onSubscribe
    05-26 10:40:42.247 21466-21466/? D/chan: =========================currentThread name: main
    ======================onNext 1
    ======================onNext 2
    ======================onNext 3
    ======================onComplete
    复制代码

    可以看到打印被观察者的线程名字是主线程。

    接着调用 subscribeOn(Schedulers.newThread()) 来看看打印结果:

    05-26 10:43:26.964 22530-22530/com.example.rxjavademo D/chan: ======================onSubscribe
    05-26 10:43:26.966 22530-22569/com.example.rxjavademo D/chan: =========================currentThread name: RxNewThreadScheduler-1
    05-26 10:43:26.967 22530-22569/com.example.rxjavademo D/chan: ======================onNext 1
    ======================onNext 2
    ======================onNext 3
    ======================onComplete
    复制代码

    可以看到打印结果被观察者是在一条新的线程。

    现在看看多次调用会不会有效,代码如下:

    Observable.create(new ObservableOnSubscribe < Integer > () {
    
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            Log.d(TAG, "=========================currentThread name: " + Thread.currentThread().getName());
            e.onNext(1);
            e.onNext(2);
            e.onNext(3);
            e.onComplete();
        }
    })
    .subscribeOn(Schedulers.computation())
    .subscribeOn(Schedulers.newThread())
    .subscribe(new Observer < Integer > () {@Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "======================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "======================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "======================onError");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "======================onComplete");
        }
    });
    复制代码

    打印结果:

    05-26 10:47:20.925 23590-23590/com.example.rxjavademo D/chan: ======================onSubscribe
    05-26 10:47:20.930 23590-23629/com.example.rxjavademo D/chan: =========================currentThread name: RxComputationThreadPool-1
    ======================onNext 1
    ======================onNext 2
    ======================onNext 3
    ======================onComplete
    复制代码

    可以看到第二次调动的 subscribeOn(Schedulers.newThread()) 并没有效果。

    4.21 observeOn()

    方法预览:

    public final Observable<T> observeOn(Scheduler scheduler)
    复制代码

    有什么用?

    指定观察者的线程,每指定一次就会生效一次。

    怎么用?

    Observable.just(1, 2, 3)
    .observeOn(Schedulers.newThread())
    .flatMap(new Function < Integer, ObservableSource < String >> () {
        @Override
        public ObservableSource < String > apply(Integer integer) throws Exception {
            Log.d(TAG, "======================flatMap Thread name " + Thread.currentThread().getName());
            return Observable.just("chan" + integer);
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "======================onSubscribe");
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "======================onNext Thread name " + Thread.currentThread().getName());
            Log.d(TAG, "======================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "======================onError");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "======================onComplete");
        }
    });
    复制代码

    打印结果:

    05-26 10:58:04.593 25717-25717/com.example.rxjavademo D/chan: ======================onSubscribe
    05-26 10:58:04.594 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
    05-26 10:58:04.595 25717-25753/com.example.rxjavademo D/chan: ======================flatMap Thread name RxNewThreadScheduler-1
    ======================flatMap Thread name RxNewThreadScheduler-1
    05-26 10:58:04.617 25717-25717/com.example.rxjavademo D/chan: ======================onNext Thread name main
    ======================onNext chan1
    ======================onNext Thread name main
    ======================onNext chan2
    ======================onNext Thread name main
    ======================onNext chan3
    05-26 10:58:04.618 25717-25717/com.example.rxjavademo D/chan: ======================onComplete
    复制代码

    从打印结果可以知道,observeOn 成功切换了线程。

    下表总结了 RxJava 中的调度器:

    调度器 作用
    Schedulers.computation( ) 用于使用计算任务,如事件循环和回调处理
    Schedulers.immediate( ) 当前线程
    Schedulers.io( ) 用于 IO 密集型任务,如果异步阻塞 IO 操作。
    Schedulers.newThread( ) 创建一个新的线程
    AndroidSchedulers.mainThread() Android 的 UI 线程,用于操作 UI。

    5. 过滤操作符

    5.1 filter()

    方法预览:

    public final Observable<T> filter(Predicate<? super T> predicate)
    复制代码

    有什么用?

    通过一定逻辑来过滤被观察者发送的事件,如果返回 true 则会发送事件,否则不会发送。

    怎么用?

     Observable.just(1, 2, 3)
        .filter(new Predicate < Integer > () {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer < 2;
            }
    })
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    以上代码只有小于2的事件才会发送,来看看打印结果:

    05-24 22:57:32.562 12776-12776/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onComplete 
    复制代码

    5.2 ofType()

    方法预览:

    public final <U> Observable<U> ofType(final Class<U> clazz)
    复制代码

    有什么用?

    可以过滤不符合该类型事件

    怎么用?

    Observable.just(1, 2, 3, "chan", "zhide")
    .ofType(Integer.class)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 23:04:24.752 13229-13229/? D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    05-24 23:04:24.753 13229-13229/? D/chan: ==================onComplete 
    复制代码

    5.3 skip()

    方法预览:

    public final Observable<T> skip(long count)
    .......
    复制代码

    有什么用?

    跳过正序某些事件,count 代表跳过事件的数量

    怎么用?

    Observable.just(1, 2, 3)
    .skip(2)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 23:13:50.448 13831-13831/? D/chan: ==================onSubscribe 
    05-24 23:13:50.449 13831-13831/? D/chan: ==================onNext 3
    ==================onComplete 
    复制代码

    skipLast() 作用也是跳过某些事件,不过它是用来跳过正序的后面的事件,这里就不再讲解了。

    5.4 distinct()

    方法预览:

    public final Observable<T> distinct() 
    复制代码

    有什么用?

    过滤事件序列中的重复事件。

    怎么用?

    Observable.just(1, 2, 3, 3, 2, 1)
    .distinct()
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 23:19:44.334 14206-14206/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onComplete 
    复制代码

    5.5 distinctUntilChanged()

    方法预览:

    public final Observable<T> distinctUntilChanged()
    复制代码

    有什么用?

    过滤掉连续重复的事件

    怎么用?

    Observable.just(1, 2, 3, 3, 2, 1)
    .distinctUntilChanged()
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 23:22:35.985 14424-14424/com.example.louder.rxjavademo D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onNext 2
    ==================onNext 1
    ==================onComplete 
    复制代码

    因为事件序列中连续出现两次3,所以第二次3并不会发出。

    5.6 take()

    方法预览:

    public final Observable<T> take(long count)
    ......
    复制代码

    有什么用?

    控制观察者接收的事件的数量。

    怎么用?

    Observable.just(1, 2, 3, 4, 5)
    .take(3)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            i += integer;
            Log.d(TAG, "==================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-24 23:28:32.899 14704-14704/? D/chan: ==================onSubscribe 
    ==================onNext 1
    ==================onNext 2
    ==================onNext 3
    ==================onComplete 
    复制代码

    takeLast() 的作用就是控制观察者只能接受事件序列的后面几件事情,这里就不再讲解了,大家可以自己试试。

    5.7 debounce()

    方法预览:

    public final Observable<T> debounce(long timeout, TimeUnit unit)
    ......
    复制代码

    有什么用?

    如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
    
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onNext(1);
            Thread.sleep(900);
            e.onNext(2);
        }
    })
    .debounce(1, TimeUnit.SECONDS)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "===================onSubscribe ");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "===================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "===================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "===================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-25 20:39:10.512 17441-17441/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-25 20:39:12.413 17441-17478/com.example.rxjavademo D/chan: ===================onNext 2
    复制代码

    可以看到事件1并没有发送出去,现在将间隔时间改为1000,看看打印结果:

    05-25 20:42:10.874 18196-18196/com.example.rxjavademo D/chan: ===================onSubscribe 
    05-25 20:42:11.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 1
    05-25 20:42:12.875 18196-18245/com.example.rxjavademo D/chan: ===================onNext 2
    复制代码

    throttleWithTimeout() 与此方法的作用一样,这里就不再赘述了。

    5.8 firstElement() && lastElement()

    方法预览:

    public final Maybe<T> firstElement()
    public final Maybe<T> lastElement()
    复制代码

    有什么用?

    firstElement() 取事件序列的第一个元素,lastElement() 取事件序列的最后一个元素。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .firstElement()
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "====================firstElement " + integer);
        }
    });
    
    Observable.just(1, 2, 3, 4)
    .lastElement()
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "====================lastElement " + integer);
        }
    });
    复制代码

    打印结果:

    05-25 20:47:22.189 19909-19909/? D/chan: ====================firstElement 1
    ====================lastElement 4
    复制代码

    5.9 elementAt() & elementAtOrError()

    方法预览:

    public final Maybe<T> elementAt(long index)
    public final Single<T> elementAtOrError(long index)
    复制代码

    有什么用?

    elementAt() 可以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。这种情况下,你想发出异常信息的话就用 elementAtOrError() 。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .elementAt(0)
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "====================accept " + integer);
        }
    });
    复制代码

    打印结果:

    05-25 20:56:22.266 23346-23346/com.example.rxjavademo D/chan: ====================accept 1
    复制代码

    将 elementAt() 的值改为5,这时是没有打印结果的,因为没有满足条件的元素。

    替换 elementAt() 为 elementAtOrError(),代码如下:

    Observable.just(1, 2, 3, 4)
    .elementAtOrError(5)
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "====================accept " + integer);
        }
    });
    复制代码

    打印结果:

    io.reactivex.exceptions.OnErrorNotImplementedException
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 704)
    at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java: 701)
    at io.reactivex.internal.observers.ConsumerSingleObserver.onError(ConsumerSingleObserver.java: 47)
    at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117)
    at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110)
    at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36)
    at io.reactivex.Observable.subscribe(Observable.java: 10903)
    at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37)
    at io.reactivex.Single.subscribe(Single.java: 2707)
    at io.reactivex.Single.subscribe(Single.java: 2693)
    at io.reactivex.Single.subscribe(Single.java: 2664)
    at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103)
    at android.app.Activity.performCreate(Activity.java: 6942)
    at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126)
    at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880)
    at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988)
    at android.app.ActivityThread. - wrap14(ActivityThread.java)
    at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631)
    at android.os.Handler.dispatchMessage(Handler.java: 102)
    at android.os.Looper.loop(Looper.java: 154)
    at android.app.ActivityThread.main(ActivityThread.java: 6682)
    at java.lang.reflect.Method.invoke(Native Method)
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520)
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410)
    Caused by: java.util.NoSuchElementException
    at io.reactivex.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java: 117) 
    at io.reactivex.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java: 110) 
    at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java: 36) 
    at io.reactivex.Observable.subscribe(Observable.java: 10903) 
    at io.reactivex.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java: 37) 
    at io.reactivex.Single.subscribe(Single.java: 2707) 
    at io.reactivex.Single.subscribe(Single.java: 2693) 
    at io.reactivex.Single.subscribe(Single.java: 2664) 
    at com.example.rxjavademo.MainActivity.onCreate(MainActivity.java: 103) 
    at android.app.Activity.performCreate(Activity.java: 6942) 
    at android.app.Instrumentation.callActivityOnCreate(Instrumentation.java: 1126) 
    at android.app.ActivityThread.performLaunchActivity(ActivityThread.java: 2880) 
    at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java: 2988) 
    at android.app.ActivityThread. - wrap14(ActivityThread.java) 
    at android.app.ActivityThread$H.handleMessage(ActivityThread.java: 1631) 
    at android.os.Handler.dispatchMessage(Handler.java: 102) 
    at android.os.Looper.loop(Looper.java: 154) 
    at android.app.ActivityThread.main(ActivityThread.java: 6682) 
    at java.lang.reflect.Method.invoke(Native Method) 
    at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java: 1520) 
    at com.android.internal.os.ZygoteInit.main(ZygoteInit.java: 1410) 
    复制代码

    这时候会抛出 NoSuchElementException 异常。

    6. 条件操作符

    6.1 all()

    方法预览:

    public final Observable<T> ambWith(ObservableSource<? extends T> other)
    复制代码

    有什么用?

    判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .all(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 5;
        }
    })
    .subscribe(new Consumer < Boolean > () {
        @Override
        public void accept(Boolean aBoolean) throws Exception {
            Log.d(TAG, "==================aBoolean " + aBoolean);
        }
    });
    复制代码

    打印结果:

    05-26 09:39:51.644 1482-1482/com.example.rxjavademo D/chan: ==================aBoolean true
    复制代码

    6.2 takeWhile()

    方法预览:

    public final Observable<T> takeWhile(Predicate<? super T> predicate)
    复制代码

    有什么用?

    可以设置条件,当某个数据满足条件时就会发送该数据,反之则不发送。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .takeWhile(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 3;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "========================integer " + integer);
        }
    });
    复制代码

    打印结果:

    05-26 09:43:14.634 3648-3648/com.example.rxjavademo D/chan: ========================integer 1
    ========================integer 2
    复制代码

    6.3 skipWhile()

    方法预览:

    public final Observable<T> skipWhile(Predicate<? super T> predicate)
    复制代码

    有什么用?

    可以设置条件,当某个数据满足条件时不发送该数据,反之则发送。

    怎么用?

    Observable.just(1, 2, 3, 4)
    .skipWhile(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer < 3;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "========================integer " + integer);
        }
    });
    复制代码

    打印结果:

    05-26 09:47:32.653 4861-4861/com.example.rxjavademo D/chan: ========================integer 3
    ========================integer 4
    复制代码

    6.4 takeUntil()

    方法预览:

    public final Observable<T> takeUntil(Predicate<? super T> stopPredicate
    复制代码

    有什么用?

    可以设置条件,当事件满足此条件时,下一次的事件就不会被发送了。

    怎么用?

    Observable.just(1, 2, 3, 4, 5, 6)
    .takeUntil(new Predicate < Integer > () {
        @Override
        public boolean test(Integer integer) throws Exception {
            return integer > 3;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "========================integer " + integer);
        }
    });
    复制代码

    打印结果:

    05-26 09:55:12.918 7933-7933/com.example.rxjavademo D/chan: ========================integer 1
    ========================integer 2
    05-26 09:55:12.919 7933-7933/com.example.rxjavademo D/chan: ========================integer 3
    ========================integer 4
    复制代码

    6.5 skipUntil()

    方法预览:

    public final <U> Observable<T> skipUntil(ObservableSource<U> other)
    复制代码

    有什么用?

    当 skipUntil() 中的 Observable 发送事件了,原来的 Observable 才会发送事件给观察者。

    怎么用?

    Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
    .skipUntil(Observable.intervalRange(6, 5, 3, 1, TimeUnit.SECONDS))
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "========================onSubscribe ");
        }
    
        @Override
        public void onNext(Long along) {
            Log.d(TAG, "========================onNext " + along);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "========================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "========================onComplete ");
        }
    });
    复制代码

    打印结果:

    05-26 10:08:50.574 13023-13023/com.example.rxjavademo D/chan: ========================onSubscribe 
    05-26 10:08:53.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 4
    05-26 10:08:54.576 13023-13054/com.example.rxjavademo D/chan: ========================onNext 5
    ========================onComplete 
    复制代码

    从结果可以看出,skipUntil() 里的 Observable 并不会发送事件给观察者。

    6.6 sequenceEqual()

    方法预览:

    public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2)
    ......
    复制代码

    有什么用?

    判断两个 Observable 发送的事件是否相同。

    怎么用?

    Observable.sequenceEqual(Observable.just(1, 2, 3),
    Observable.just(1, 2, 3))
    .subscribe(new Consumer < Boolean > () {
        @Override
        public void accept(Boolean aBoolean) throws Exception {
            Log.d(TAG, "========================onNext " + aBoolean);
        }
    });
    复制代码

    打印结果:

    05-26 10:11:45.975 14157-14157/? D/chan: ========================onNext true
    复制代码

    6.7 contains()

    方法预览:

    public final Single<Boolean> contains(final Object element)
    复制代码

    有什么用?

    判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。

    怎么用?

    Observable.just(1, 2, 3)
    .contains(3)
    .subscribe(new Consumer < Boolean > () {
        @Override
        public void accept(Boolean aBoolean) throws Exception {
            Log.d(TAG, "========================onNext " + aBoolean);
        }
    });
    复制代码

    打印结果:

    05-26 10:14:23.522 15085-15085/com.example.rxjavademo D/chan: ========================onNext true
    复制代码

    6.8 isEmpty()

    方法预览:

    public final Single<Boolean> isEmpty()
    复制代码

    有什么用?

    判断事件序列是否为空。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
    
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onComplete();
        }
    })
    .isEmpty()
    .subscribe(new Consumer < Boolean > () {
        @Override
        public void accept(Boolean aBoolean) throws Exception {
            Log.d(TAG, "========================onNext " + aBoolean);
        }
    });
    复制代码

    打印结果:

    05-26 10:17:16.725 16109-16109/com.example.rxjavademo D/chan: ========================onNext true
    复制代码

    6.9 amb()

    方法预览:

    public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
    复制代码

    有什么用?

    amb() 要传入一个 Observable 集合,但是只会发送最先发送事件的 Observable 中的事件,其余 Observable 将会被丢弃。

    怎么用?

    ArrayList < Observable < Long >> list = new ArrayList < > ();
    
    list.add(Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS));
    list.add(Observable.intervalRange(6, 5, 0, 1, TimeUnit.SECONDS));
    
    Observable.amb(list)
    .subscribe(new Consumer < Long > () {
        @Override
        public void accept(Long aLong) throws Exception {
            Log.d(TAG, "========================aLong " + aLong);
        }
    });
    复制代码

    打印结果:

    05-26 10:21:29.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 6
    05-26 10:21:30.580 17185-17219/com.example.rxjavademo D/chan: ========================aLong 7
    05-26 10:21:31.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 8
    05-26 10:21:32.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 9
    05-26 10:21:33.579 17185-17219/com.example.rxjavademo D/chan: ========================aLong 10
    复制代码

    6.10 defaultIfEmpty()

    方法预览:

    public final Observable<T> defaultIfEmpty(T defaultItem)
    复制代码

    有什么用?

    如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。

    怎么用?

    Observable.create(new ObservableOnSubscribe < Integer > () {
    
        @Override
        public void subscribe(ObservableEmitter < Integer > e) throws Exception {
            e.onComplete();
        }
    })
    .defaultIfEmpty(666)
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "========================onNext " + integer);
        }
    });
    复制代码

    打印结果:

    05-26 10:26:56.376 19249-19249/com.example.rxjavademo D/chan: ========================onNext 666
    复制代码

    RxJava 常见的使用方式都已经介绍的差不多,相信大家如果都掌握这些操作符的用法的话,那么使用 RxJava 将不会再是难题了。

    展开全文
  • RxJava2从入门到精通-初级篇

    千人学习 2020-02-24 11:19:38
    适合所有初中级工程师,从RxJava的每个知识点进行讲解,并在每个知识点中都带有文字说明和代码例子说明。通过这次系统化的学习,您将可以完整的学习到RxJava家族的所有特性和常用的操作符,掌握操作符的使用,掌握...
  • RxJava2详解(一)--基础

    万次阅读 2017-06-20 16:33:42
    ReactiveX详解 RxJava2基础 RxAndroid ReactiveX(Reactive Extensions),一般简写为Rx,是一个使用可观察数据流进行异步编程的编程接口。由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供...

    简介

    什么是ReactiveX

    ReactiveX: An API for asynchronous programming with observable streams.

    ReactiveX(Reactive Extensions),一般简写为Rx,是一个使用可观察数据流进行异步编程的编程接口。由微软的架构师Erik Meijer领导的团队开发,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便地编写异步和基于事件的程序,现在已经有了RxJava、RxJS、Rx.NET、RxScala、RxClojure、RxSwift等基本所有主流语言的实现,受到越来越多的开发者的欢迎,开源社区也普遍在使用。
    ReactiveX不仅仅是一个API,它是一种思想、一种编程突破,它影响了许多其它的API、框架甚至编程语言。

    为什么要使用ReactiveX

    无论你开发app还是后台应用,你总会时不时地编写一些异步或基于事件的代码,但你会发现很难完美地处理工作线程和主线程的切换、异常的处理、线程的取消、线程同步等等问题,而且在多个线程协调处理业务逻辑时代码结构变得异常的复杂而且还容易出错。
    使用Rx,你可以:

    函数式编程
    对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
    精简代码
    Rx的操作符经常可以将复杂的难题简化为很少的几行代码
    更好地处理异步错误
    传统的try/catch对于异步计算过程中的错误无能为力,但Rx提供了很好的错误处理机制
    轻松处理并发
    Rx的Observable和Scheduler让开发者可以摆脱底层的线程同步和各种并发问题

    也就是说,ReactiveX的Observable模型让你对异步事件流的处理就像平时对数据集(如数组)进行简单、可组合的处理一样,让你从混乱的回调中解脱出来,写出更高可读性、更不容易产生Bug的代码。
    Rx结合了观察者模式、迭代器模式和函数式编程的优秀思想。Rx扩展观察者模式以支持数据/事件序列,添加了一些操作符以使你可以声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO 。

    Observable是可组合的
    传统的并发处理中,Java Futures技术对于简单的无嵌套异步操作是简单高效的,如benjchristensen/FuturesA.java。但是一旦涉及到嵌套那么代码复杂度将有一个非常可观的增加,如benjchristensen/FuturesB.java,一个带条件的异步任务需要等待另一个异步任务拿到的数据才能工作,而这就可能会产生一些其他的异步任务被阻塞的问题。
    使用Future很难正确地组合多个带条件的异步流(考虑到运行时各种潜在的问题,甚至可以说是不可能的),当然,如果你对并发编程非常精通还是可以做到的,但是整个代码逻辑也会变得异常复杂且很容易出错,或者过早地阻塞在Future.get(),这样的话异步执行的优势就完全没有了。或许你会考虑使用回调来解决Future.get()过早阻塞的问题,但是对于嵌套的异步任务来说,使用回调同样会使代码变得混乱,如benjchristensen/CallbackB.java。Rx的Observable从另一方面讲就是为了组合异步数据流准备的,使用RX你可以非常方便地组合多个Observable。
    Observable是灵活的
    Rx的Observable不但可以像Futures一样发射单个值,还可以发射数据序列甚至无穷数据流。也就是说无论数据是怎样的都可以抽象为一个Observable。
    Observable拥有Iterable所有灵活、优雅的特点。Iterable是同步的单向pull,Observable是异步的双向push,即Iterable是需要手动遍历获取数据的,Observable是主动发射数据的;Iterable通过T next()方法获取数据,Observable通过onNext(T)发射数据;Iterable遇到错误会抛出异常(throws Exception),Observable遇到错误会调用onError(Exception)方法;Iterable可以利用!hasNext()方法判断是否完成,Observable通过调用onCompleted()方法表示完成。
    Observable是不固执的
    Rx对于一些特定并发或异步源是没有偏见的,Observable可以用任何方式实现,如线程池、事件循环、非阻塞IO、Actor模式,或者任何满足你的需求、偏好或擅长技术的实现。无论你选择怎样实现它,无论底层实现是阻塞的还是非阻塞的,客户端代码都会将所有与Observable的交互当做是异步的。

    RxJava/RxAndroid

    RxJava是JVM上的ReactiveX实现 – 一个在 Java VM上使用可观测的序列来组成异步的、基于事件的程序的库。
    RxJava目前版本是2.1.0,2.x版本相对于1.x版本有了很大的改动,甚至可以说是重写,而1.x版本截止到2017年6月1号不会增加新的操作符,仅仅修复Bug,截止到2018年3月31号完全停止开发维护。所以1.x版本的RxJava就不再关注了。
    RxAndroid是Android上的ReactiveX实现–Android上的RxJava bindings。
    RxAndroid添加尽可能少的类到RxJava以确保在Android应用中写响应式组件能够简单省事,尤其是它提供的调度器可以很好地调度主线程和任意Lopper线程。

    术语

    信号(Signal): 作名词时表示onSubscribe, onNext, onComplete, onError, request(n)cancel方法的触发信号,作动词时表示触发一个信号。
    请求(Demand): 作名词时表示由Publisher尚未交付(履行)的Subscriber请求的元素的总数。作动词时表示请求更多的元素的行为。
    同步(Synchronous): 在调用线程中执行。
    正常返回(Return normally): 永远只向调用者返回一个声明类型的值, 向Subscriber发失败信号的唯一合法方法是通过onError方法。
    背压(Backpressure): Observable发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息的场景。

    添加依赖

    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.1.0'

    几个比较重要的类

    Publisher
    /**
     * 一个可能无限数量的序列元素的提供者,根据从其Subscriber收到的请求发送它们。
     * `Publisher`可以在不同的时间点动态地服务多个通过`subscribe(Subscriber)`方法订阅的Subscriber。
     *
     * @param <T> 信号元素类型.
     */
    public interface Publisher<T> {
    
        /**
         * 请求Publisher开始数据流.
         * 这是一个可以被多次调用的"工厂方法", 但每次都会开始一个新的Subscription.
         * 每个Subscription将只会为一个Subscriber工作.
         * 一个Subscriber应该只能向一个Publisher订阅一次.
         * 如果Publisher拒绝了订阅尝试或者订阅失败了,它将通过Subscriber#onError发error信号
         *
         * @param s 将消费来自Publisher的信号的Subscriber
         */
        public void subscribe(Subscriber<? super T> s);
    }

    调用Publisher.subscribe(Subscriber)Subscriber的响应可能是的这样的方法调用序列:

    onSubscribe onNext* (onError | onComplete)?

    也就是说,如果Subscription没被cancel的话,onSubscribe信号将总会被触发,之后根据Subscriber的请求可能有多个onNext信号,之后如果出错的话会有一个onError信号,如果没有元素的话会有一个onComplete信号。
    ReactiveX中最重要的概念就是流(stream),任何东西都可以看作是流,最常见的就是网络数据流、UI事件流,产生stream的通常叫作被观察者(Observable)/生产者,而监听、处理stream的通常叫做观察者(Observer)/消费者,两者的关联通常叫作订阅(Subscribe)。
    stream
    上面说的Publisher就是产生事件的事件源,而RxJava根据不同使用情况提供了几个事件源的类,其中Flowable就是Publisher的实现类:

    • io.reactivex.Flowable: 0..N个流,支持Reactive-Streams和背压(backpressure)
    • io.reactivex.Observable: 0..N个流,不支持背压(backpressure)
    • io.reactivex.Single: 一个只包含一个item或error的流
    • io.reactivex.Completable: 一个不包含任何item只包含completion或error信号的流
    • io.reactivex.Maybe: 一个只包含一个maybe value或者error的流

    首先来看一下Flowable

    import io.reactivex.functions.Consumer;
    
    Flowable.just("Hello world")
      .subscribe(new Consumer<String>() {
          @Override public void accept(String s) {
              System.out.println(s);
          }
      });

    Flowable是为了更好地处理背压问题而新设计的类以弥补Observable的不足,也就是说,Observable不支持背压处理,一旦未及时处理的事件数累积到一定程度就会产生MissingBackpressureException或者OutOfMemoryError
    如果发射事件的Observable(上游)和接收处理事件的Observer(下游)工作在不同线程,就可能会出现发射事件的速度与处理事件的速度不一样的情况,如果发射的太快,就会出现事件积累的情况,而事件的积累会导致大量的资源浪费甚至OOM。而解决这一情况最直接暴力的方式就是控制上游的发射数量或发射速度,以给下游足够的事件处理事件,但是控制发射数量可能会导致部分事件被丢弃,控制发射速度可能影响性能。所以Flowable采用了更优雅的方式来解决这一问题,下游可以把自己“处理事件的能力”告诉上游,上游根据这个来决定要不要继续发射事件,也就是说下游可以根据自身情况主动通知上游发送事件,这也就完美解决了上下游流速不均衡的问题。
    Flowable提供了工厂方法create(FlowableOnSubscribe<T> source, BackpressureStrategy mode),可以通过第二个参数指定背压策略,MISSING表明由下游处理事件溢出问题,一般用于自定义参数的onBackpressureXXX操作符场景,ERROR表明如果下游跟不上上游的流速就抛出MissingBackpressureExceptionBUFFER表明缓冲所有的onNext值直到下游消费,DROP表明如果下游跟不上就丢弃最近的onNext值,LATEST表明如果下游跟不上就只保留最近的onNext值,覆盖之前的值。Flowable默认的bufferSize是128,由系统参数rx2.buffer-size决定。

    Observable可以看作是Flowable的阉割版,只是不支持背压逻辑,其它的逻辑、方法与Flowable是基本一样的。ObservableObservableSource接口的实现,而ObservableSource表示一个基本的、不支持背压的Observable源,由Observer消费,ObservableSource只有一个方法void subscribe(Observer<? super T> observer)用来订阅给定的Observer。也就是说Subscriber订阅FlowableObserver订阅Observable
    Observable可以通过非常多的工厂方法和操作符构建,如通过just()操作符构建:

    Observable.just("one", "two", "three", "four", "five")
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableObserver<String>() {
                @Override
                public void onNext(String value) {
                    Log.d(TAG, "onNext(" + value + ")");
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError()", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete()");
                }
            });

    那什么时候使用Observable,什么时候使用Flowable呢?
    何时使用Observable:

    • 最多1000个元素的数据流,即随着时间推移元素数量依然足够少以至于应用几乎没机会出现OOME。
    • 处理诸如鼠标移动或触摸事件之类的GUI事件:这些事件很少会合理地背压,也并不频繁。你可以使用Observable处理频率小于1000 Hz的事件,但最好考虑使用采样/去抖动。
    • 流本质上是同步的,但是您的平台不支持Java Streams,或者你忽略其中的功能。 使用Observable具有比Flowable更低的开销(你也可以考虑为Iterable流优化的IxJava 支持Java 6+)。

    何时使用Flowable:

    • 处理以某种方式生成的10k+元素,处理链可以告诉源限制元素生成的数量。
    • 从磁盘读取(解析)文件本质上是阻塞式和基于pull的,你可以很好地控制背压,例如从指定的请求量中读取多少行)。
    • 通过JDBC从数据库读取也是阻塞式和基于pull的,你可以根据每个下游请求调用ResultSet.next()来控制。
    • 网络(流)IO,其中网络帮助或使用的协议支持请求一些逻辑量。
    • 一些阻塞式和/或基于pull的数据源,最终会被一个非阻塞响应式的API/driver使用。

    SingleObservable类似但只能发射一个onSuccessonError给它的消费者io.reactivex.SingleObserver<T>

    public interface SingleObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T value);
        void onError(Throwable e);
    }

    响应流的事件模式为:onSubscribe (onSuccess | onError)?

    Completable用来表示一个延迟计算且不关心任何数据只关心完成和异常,只能发射一个onCompleteonError给它的消费者o.reactivex.CompletableObserver

    public interface CompletableObserver {
        void onSubscribe(Disposable d);
        void onComplete();
        void onError(Throwable e);
    }

    响应流的事件模式为:onSubscribe (onComplete | onError)?

    Maybe从概念上讲是SingleCompletable的结合,提供了一种捕获一些响应源发射0或1个item或一个error的发射模式的手段,用来表示一个延迟计算。MaybeMaybeSource作为基本接口,以MaybeObserver<T>作为信号接收接口。由于最多只能发射一个元素Maybe就没打算支持背压处理,这就意味着,在onSubscribe(Disposable)被调用后可能接着调用一个其它onXXX方法,与Flowable不同的是,如果只有一个值信号,只有onSuccess被调用而不是onComplete,使用这种新的基本响应类型与其他类型几乎是一样的,因为它提供了一个适用于0或1个item序列的Flowable操作符子集。

    Maybe.just(1)
    .map(v -> v + 1)
    .filter(v -> v == 1)
    .defaultIfEmpty(2)
    .test()
    .assertResult(2);

    响应流的事件模式为:onSubscribe (onSuccess | onError | onComplete)?

    Subscriber
    /**
     * 将Subscriber的一个实例传给Publisher#subscribe(Subscriber)方法后,将会收到一次onSubscribe(Subscription)方法调用
     * 在Subscription#request(long)被调用之前不会受到进一步通知
     * 在发送请求信号后:
     * 调用一到多次onNext(Object)方法,最多调用Subscription#request(long)中定义的次数
     * 调用一次onError(Throwable)或onComplete()方法表明终止状态,之后不会发送进一步事件
     * 只要Subscriber实例还能够处理,就可以通过Subscription#request(long)发请求信号
     *
     * @param <T> 信号元素类型
     */
    public interface Subscriber<T> {
        /**
         * 在调用Publisher#subscribe(Subscriber)之后调用
         * 直到Subscription#request(long)被调用才开始数据流
         * 每当需要更多数据时,这个Subscriber实例都有责任调用Subscription#request(long)
         * Publisher只有在Subscription#request(long)被调用后才会发送通知
         *
         * @param s 可以通过Subscription#request(long)方法请求数据的Subscription
         */
        public void onSubscribe(Subscription s);
    
        /**
         * Publisher发送的数据通知,以响应Subscription#request(long)的请求
         *
         * @param t the element signaled
         */
        public void onNext(T t);
    
        /**
         * 失败的结束状态
         * 即使再次调用Subscription#request(long)也不会再发送进一步事件
         *
         * @param t the throwable signaled
         */
        public void onError(Throwable t);
    
        /**
         * 成功的结束状态
         * 即使再次调用Subscription#request(long)也不会再发送进一步事件
         */
        public void onComplete();
    }

    为了在构建stream消费者时有更少的内部状态,Rxjava2为Flowable(和Observable)分别定义了DefaultSubscriberResourceSubscriberDisposableSubscriber(以及他们的XObserver变体),以提供资源跟踪支持,可以在外部通过dispose()方法cancelled(取消)或disposed(丢弃):

    ResourceSubscriber<Integer> subscriber = new ResourceSubscriber<Integer>() {
        @Override
        public void onStart() {
            request(Long.MAX_VALUE);
        }
    
        @Override
        public void onNext(Integer t) {
            System.out.println(t);
        }
    
        @Override
        public void onError(Throwable t) {
            t.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    };
    
    Flowable.range(1, 10).delay(1, TimeUnit.SECONDS).subscribe(subscriber);
    
    subscriber.dispose();

    为了能更好的控制订阅关系,每个基本响应类都提供了E subscribeWith(E subscriber)方法以返回订阅的subscriber/observer:

    CompositeDisposable composite2 = new CompositeDisposable();
    
    composite2.add(Flowable.range(1, 5).subscribeWith(subscriber));

    在管理请求时有一点需要注意,在Subscriber.onSubscribeResourceSubscriber.onStart方法中调用request(n)可能会马上触发onNext方法(在onSubscribe/onStart方法返回前):

    Flowable.range(1, 3).subscribe(new Subscriber<Integer>() {
    
        @Override
        public void onSubscribe(Subscription s) {
            System.out.println("OnSubscribe start");
            s.request(Long.MAX_VALUE);
            System.out.println("OnSubscribe end");
        }
    
        @Override
        public void onNext(Integer v) {
            System.out.println(v);
        }
    
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    });

    将会打印:

    OnSubscribe start
    1
    2
    3
    Done
    OnSubscribe end

    也就是说,如果在onSubscribe/onStart方法中调用request之后再做一些初始化工作,那么onNext方法可能就看不到初始化的影响。为了避免出现这种情况,要确保onSubscribe/onStart中完成所有的初始化工作再调用request方法。

    Subscription
    /**
     * Subscription表示Subscriber订阅Publisher的一对一生命周期
     * 它只能由一个Subscriber使用一次
     * 它用于数据请求和取消请求(允许资源清理)
     *
     */
    public interface Subscription {
        /**
         * Publisher将不会发送任何事件直到通过该方法发送请求信号
         * 可以随时、频繁地调用,但未完成积累的请求决不能超过Long.MAX_VALUE
         * 未完成积累的请求数为Long.MAX_VALUE可能会被Publisher视为"有效无限"
         * Publisher可以发送的任何请求的信息,只有信号请求可以被安全处理
         * 如果流结束了Publisher发送数量可以少于请求的数量,但是随后必须发射Subscriber#onError(Throwable)或Subscriber#onComplete()
         *
         * @param n 向上游Publisher请求的元素数量(严格正数)
         */
        public void request(long n);
    
        /**
         * 请求Publisher停止发射数据并且清理资源
         * 由于这个请求是异步的,在调用cancel方法后,数据可能仍然被发送以满足之前的信号请求
         */
        public void cancel();
    }

    在Android中,对异步任务生命周期的管理尤为重要,而RxJava提供的最重要的接口就是Disposable

    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }

    由于ResourceSubscriberDisposableSubscriber(以及他们的XObserver变体)都实现了Disposable接口,对于单个资源请求我们可以直接调用它的dispose()方法丢弃,对于多个请求,我们就要借助CompositeDisposable来管理了,CompositeDisposable是多个Disposable的容器,提供了复杂度为O(1)的添加删除操作:

    public class MainActivity extends Activity {
        private static final String TAG = "RxAndroidSamples";
    
        private final CompositeDisposable disposables = new CompositeDisposable();
    
        @Override protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.main_activity);
            findViewById(R.id.button_run_scheduler).setOnClickListener(new View.OnClickListener() {
                @Override public void onClick(View v) {
                    onRunSchedulerExampleButtonClicked();
                }
            });
        }
    
        @Override protected void onDestroy() {
            super.onDestroy();
            disposables.clear();
        }
    
        void onRunSchedulerExampleButtonClicked() {
            disposables.add(sampleObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<String>() {
                    @Override public void onComplete() {
                        Log.d(TAG, "onComplete()");
                    }
    
                    @Override public void onError(Throwable e) {
                        Log.e(TAG, "onError()", e);
                    }
    
                    @Override public void onNext(String string) {
                        Log.d(TAG, "onNext(" + string + ")");
                    }
                }));
        }
    
        static Observable<String> sampleObservable() {
            return Observable.defer(new Callable<ObservableSource<? extends String>>() {
              @Override public ObservableSource<? extends String> call() throws Exception {
                    // Do some long running operation
                    SystemClock.sleep(5000);
                    return Observable.just("one", "two", "three", "four", "five");
                }
            });
        }
    }

    注意,CompositeDisposableclear()方法和dispose()方法类似,clear()可以多次被调用来丢弃容器中所有的Disposable,但dispose()被调用一次后就会失效。
    对于订阅生命周期的封装库最常见的是trello/RxLifecycle,不过它需要LifecycleProvider<T>才能工作,大部分情况下你只能选择继承它的RxActivityRxFragment。另一个很有意思的封装是zhihu/RxLifecycle,它利用TakeUtil操作符和一个HeadlessFragment来控制Observable的生命周期。

    Processor
    /**
     * Processor表示一个处理过程—Subscriber和Publisher都必须遵守两者的契约
     *
     * @param <T> Subscriber的信号元素类型
     * @param <R> Publisher的信号元素类型
     */
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }

    References

    展开全文
  • RxJava2 原理浅析

    2020-07-20 16:28:30
    RxJava2 原理浅析背景分析总结 背景 最近在看Rxjava的相关资料, 上网搜索资料发现大部分的文章都是教授如何使用RxJava的,但是这样只知其然,不知其所以然,总是感觉学的不彻底,用着不踏实,所以就想找一些能揭示其...

    RxJava2 原理浅析

    背景

    最近在看Rxjava的相关资料, 上网搜索资料发现大部分的文章都是教授如何使用RxJava的,但是这样只知其然,不知其所以然,总是感觉学的不彻底,用着不踏实,所以就想找一些能揭示其原理的文章参考下。 But, 网上(至少是国内的网上)对于RxJava是如何实现响应式编程模式介绍文章还是比较难以找到得,不过还好,还是有大神介绍了一些原理性的文章。但是,这篇文章介绍的是基于Rxjava1的原理,目前使用的大部分已经是RxJava2了,不过,疏归同途,基本原理是一样,只不过RxJava2的实现有些差异罢了。本文通过一个例子,分析代码,梳理代码的流程,展现一下RxJava2的实现原理,权当自行学习的一个记录,也供有兴趣的同学参考。当然,RxJava2有很多使用场景和操作,这里只跟踪解析一个比较基本的例子,其他复杂的操作和使用场景,基于分析完的例子的原理可以自行扩展,参考源代码进一步分析。

    分析

    来看下这段代码例子

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                }
            }).map(new Function<Integer, String>() {
                @Override
                public String apply(@NonNull Integer integer) throws Exception {
                    return "This is result " + integer;
                }
            }).map(new Function<String, String>() {
                @Override
                public String apply(@NonNull String integer) throws Exception {
                    return "after  " + integer + " convert";
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(@NonNull String s) throws Exception {
                    mRxOperatorsText.append("accept : " + s +"\n");
                    Log.e(TAG, "accept : " + s +"\n" );
                }
            });
    

    RxJava是基于观察者模式的这个大家都了解,这里我觉得RxJava这里还有一种模式的思想就是职责链模式,RxJava的响应式模式就是通过职责链的思想把对应的操作一步步链接起来的。
    这里Obserable的一个操作,就对应了新建了一个的Observable类,请看代码一步步分析,以下是create操作的代码实现:

        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    这里create操作符create了一个新的ObservableCreate类,这里的ObservableCreate类继承自Observable类,主要的成员变量为

    ObservableOnSubscribe<T> source;
    

    这里就把新建的Observable类与原始的obervable也就是用户的原始数据来源(这里是emit相关的类)建立了联系:

        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    

    如图:
    图1
    第二步,我们来看下map操作:

        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    

    map操作的用途不用讲了,大家看过RxJava的教程应该都知道,这里我们主要来看下map操作是如何实现的。
    这里的操作主要做了两件事:

    1. create了一个新的Obdervable类ObservableMap(为了说明 我们叫它ObservableMap1,下同)
    2. 然后将原来的Observable类传入,再转入Function(用户定义的映射操作)
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    

    图2

    然后同样的map操作,现在的关系变成了如下:

    图3
    通过这些操作,我们现在就把所需要的信息都记录了下来,通过source变量形成一条信息链,为接下来的subscribe操作提供回调的线索。

    接下来就是subscribe了,这里就会顺着刚才建立的关系链,一步步回调把原始的数据经过一步步的转换,传入到最后的Observer类处理。

    首先来看下最初的subscribe,传入用户定义的处理类consumer (继承自Observer接口),注意这里的Observable类已经是ObservableMap2了。

    图4
    所有继承了Observable的类调用的subscrib操作,疏归同途,最后都是调用Observable的subscribe操作

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    这里subscribeActual是一个抽象方法,由其子类实现

    我们来看看ObservableMap的实现

        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    

    这里就开始了新的关系链的建立,我们先来看看MapObserver里保存的什么

            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    

    原理和Observable的扩展类一样,保存了一个actual用户的执行操作 和 一个function (这里是map操作)——之前保存的用户定义的映射操作。

    调用ObservableMap2.subscribe后的各个类的关系如下
    图5
    参考之前的建立关系的过程,这里的source2指向的是ObservableMap1, 所以接下来调用的实际上是ObservableMap1.subscribe(MapObserver2)

    OK, 那么接下来的套路跟之前一样,只不过这次到了ObservableCreate.subscribe(MapObserver1) 了

    图6
    好,然后我们再来看下ObservableCreate的subscribeActual做了什么

        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    它create了一个CreateEmitter, 然后就调用source.sbuscribe(CreateEmitter )了, 好了, 这里的source就是用户定义的数据源头了

        static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    最后形成的调用链就是

    图7
    OK, 接下来的过程就和我们预期的一样了,看看数据是如何从源数据emit经过层层转换,到了最后用户定义的Observer的吧,拿一个onNext调用为例

    ObservableEmitter.onNext() -> MapObserver1.onNext()

    ObservableEmitterMapObserver1MapObserver2ObserveronNext()function()onNext()function()onNext()这里就是用户定义的Observer了ObservableEmitterMapObserver1MapObserver2Observer

    以上就是一个简单的RxJava应用场景的流程分析了

    总结

    从以上的分析可以看出,RxJava的流程主要分为两部分

    1. 是关系链的建立,通过Observable的source变量链接起来
    2. 基于建立好的关系链,建立回调关系链,回调关系链是通过Observer的actual变量链接起来的。
    展开全文
  • 目录 一、配置 二、原理 第一步:创建被观察者 第二步:创建观察者 第三步:建立订阅关系 ...下篇:RxJava2 使用详解二之...要在Android中使用RxJava2, 在app的build.gradle中添加依赖: /*导入Rxjava RxAndr...

    目录

    一、配置

    二、原理

    第一步:创建被观察者

    第二步:创建观察者

    第三步:建立订阅关系

    链式操作

    ObservableEmitter:

    Disposable:

    subscribe()重载方法

    下篇:RxJava2 使用详解二之线程调度


    一、配置

    要在Android中使用RxJava2, 在app的build.gradle中添加依赖:

    /*导入Rxjava RxAndroid 2018/8/30 最新版*/
    implementation 'io.reactivex.rxjava2:rxjava:2.2.1'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'

    一个是Rxjava的库,一个是RxJava基于Android的库。

    二、原理

    RxJava原理详解

    形象说明:用一条河的上游和下游代替被观察者和观察者, 上游和下游之间有个水闸,建立连接的情况下水闸是打开的,上游每发送一个事件,下游就能收到该事件。

    RxJava的使用都是分三步

    第一步:创建被观察者

    //创建上游 Observable(被观察者)
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });

    第二步:创建观察者

    //创建下游 Observer(观察者)
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "onNext=" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };
    

    第三步:建立订阅关系

            //建立连接
            observable.subscribe(observer);

    运行结果:

    这里写图片描述

    这里的事件发送的顺序是1,2,3, 事件接收的顺序也是1,2,3的顺序。 
    上游和下游就分别对应着RxJava中的Observable和Observer,它们之间的连接就对应着subscribe()。 
    只有当Observable(上游)和Observer(下游)建立连接之后, 上游才会开始发送事件. 也就是调用了subscribe() 方法之后才开始发送事件. 

    链式操作

    把这段代码连起来写就成了被大家所熟知的RxJava的链式操作,运行结果也是一样的:

        Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "onNext=" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

    解释一下两个关键词:ObservableEmitter和Disposable

    ObservableEmitter

    Emitter就是发射器的,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。 
    这里需要注意的是: 
    1、上游可以发送无限个onNext, 下游也可以接收无限个onNext。当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件。 
    2、当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件。上游可以不发送onComplete或onError。 
    3、发送多个onComplete是可以正常运行的, 但是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃。

    我们就来一一试验一下:

    1、发送多个onComplete

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emitter=1");
                    emitter.onNext(1);
                    Log.d(TAG, "emitter=2");
                    emitter.onNext(2);
                    Log.d(TAG, "emitter=3");
                    emitter.onNext(3);
                    Log.d(TAG, "emitter=complete=1");
                    emitter.onComplete();
                    Log.d(TAG, "emitter=complete=2");
                    emitter.onComplete();
                    Log.d(TAG, "emitter=complete=3");
                    emitter.onComplete();
                    Log.d(TAG, "emitter=4");
                    emitter.onNext(4);
                }
            }).subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "onNext: " + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

    运行结果:

    这里写图片描述

    可以看到,发送多个onComplete是可以的,只是接收到一个onComplete之后就不在接收事件,但是上游依旧在发送事件。

    2、发送多个onError事件

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emitter=1");
                    emitter.onNext(1);
                    Log.d(TAG, "emitter=2");
                    emitter.onNext(2);
                    Log.d(TAG, "emitter=3");
                    emitter.onNext(3);
    //                Log.d(TAG, "emitter=complete=1");
    //                emitter.onComplete();
    //                Log.d(TAG, "emitter=complete=2");
    //                emitter.onComplete();
    //                Log.d(TAG, "emitter=complete=3");
    //                emitter.onComplete();
                    Log.d(TAG, "emitter=error=1");
                    emitter.onError(new Throwable("no message"));
                    Log.d(TAG, "emitter=error=2");
                    emitter.onError(new Throwable("no message"));
                    Log.d(TAG, "emitter=error=3");
                    emitter.onError(new Throwable("no message"));
                    Log.d(TAG, "emitter=4");
                    emitter.onNext(4);
                }
            }).subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "onNext: " + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError="+e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

    运行结果:

    这里写图片描述

    从这个结果中可以看到,当收到第一个onError()后,打印了error信息但是程序并没有崩溃,然后第二个onError()程序就崩溃了。

    Disposable:

    这个单词的意思:是一次性的,用后就抛弃的. 那么在RxJava中怎么去理解它呢, 对应于上面的例子, 我们可以把它理解成水闸, 当调用它的dispose()方法时, 水闸关闭, 下游就收不到事件。 
    但是调用dispose()并不会让上游停止继续发送事件, 相反上游会继续发送剩余的事件.

    上游依次发送1,2,3,complete,4,在下游收到第二个事件之后, 关闭水闸, 看看运行结果:

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emitter=1");
                    emitter.onNext(1);
                    Log.d(TAG, "emitter=2");
                    emitter.onNext(2);
                    Log.d(TAG, "emitter=3");
                    emitter.onNext(3);
                    Log.d(TAG, "emitter=complete");
                    emitter.onComplete();
                    Log.d(TAG, "emitter=4");
                    emitter.onNext(4);
                }
            }).subscribe(new Observer<Integer>() {
                private Disposable disposable;
                private int i;
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                    disposable = d;
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "onNext: " + value);
                    i++;
                    if (i == 2) {
                        disposable.dispose();
                        Log.d(TAG, "isDisposed : " + disposable.isDisposed());
                    }
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });

    运行结果为:

    这里写图片描述

    从运行结果我们看到, 在收到onNext(2)这个事件后, 关闭水闸, 但是上游仍然发送了onNext(3), complete, onNext(4)这几个事件, 而且上游并没有因为发送了onComplete而停止. 同时可以看到下游的onSubscribe()方法是最先调用的。 

    subscribe()重载方法

    另外, subscribe()有多个重载的方法:

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}

    第一个不带任何参数的subscribe() 表示下游不关心上游发送的是什么。 
    第二个带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件不关心, 因此我们如果只需要onNext事件可以这么写,代码如下:

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emitter=1");
                    emitter.onNext(1);
                    Log.d(TAG, "emitter=2");
                    emitter.onNext(2);
                    Log.d(TAG, "emitter=3");
                    emitter.onNext(3);
                    Log.d(TAG, "emitter=complete");
                    emitter.onComplete();
                    Log.d(TAG, "emitter=4");
                    emitter.onNext(4);
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept: " + integer);
                }
            });

    运行结果如下:

    这里写图片描述

    接下来的几个方法的使用和第二个是一样的,就不多说了,最后一个方法的话,在文章前面就使用了,也不介绍了。

    展开全文
  • RxJava2 浅析

    万次阅读 2016-09-06 09:59:30
    作者: maplejaw ... 版本: compile 'io.reactivex.rxjava2:rxjava:...前段时间阅读了RxJava1.x的源码,刚好RxJava2.x也发布了RC版,为了迎接10月底的正式版,趁热打铁,本篇将对RxJava2.x进行一个简单的剖析。Observa
  • Rxjava介绍
  • RxJava2最全面、最详细的讲解(二)

    千次阅读 2019-11-15 10:30:58
    相信大家对RxJava有了一定的理解,由于篇幅过长所以重新写了一篇,如果不了解Rxjava2可以参考下RxJava2最全面、最详细的讲解(一)。下面开始继续讲解RxJava2的其他用法。(源码和其他链接在文章最后给出) 在使用...
  • RxJava2 五大重要角色介绍

    千次阅读 2019-04-09 12:05:27
    RxJava2 的5大基类及本章学习结构图 1、Flowable 1.1、Flowable简介 1.2、Flowable官方图解 1.3、Backpressure 2、Observable 2.1、Observable简介 2.2、Observable官方图解 2.3、Flowable和Observable对比 ...
  • To allow having RxJava 1.x and RxJava 2.x side-by-side, RxJava 2.x is under the maven coordinates io.reactivex.rxjava2:rxjava:2.x.y and classes are accessible below io.reactivex....
  • RxJava 混淆配置

    万次阅读 2016-05-11 15:37:27
    RxJava 混淆配置
  • Rxjava2+Retrofit2.0进行封装(总地址)

    千次阅读 2017-03-09 18:32:49
    RxJava2+Retrofit2实现网络请求和解析封装 Retrofit2与RxJava用法大全 RxJava +retrofit2实现安卓中网络操作 从零开始RxJava2.0 一 从零开始RxJava2.0 二 从零开始RxJava2.0 三 从零开始RxJava2.0 四 提高 RxJava2...
  • RxJava2 和 Retrofit2 依赖时出现冲突

    千次阅读 2017-06-09 16:16:56
    RxJava2 和 Retrofit2 依赖时出现冲突问题 RxJava2 和 Retrofit2 依赖时出现冲突问题 配置如下: compile "com.squareup.okhttp3:okhttp:3.6.0" compile "io.reactivex.rxjava2:rxjava:2.0.7" compile ...
  • retrofit2和rxjava2一起使用时的rxjava.jar版本冲突问题解决最近在升级rxjava到2.X.X版本的时候,发现出现rxjava1.x.x和rxjava2.x.x版本冲突retrofit = new Retrofit.Builder() .baseUrl("https://xxx.xxx.com/") ...
  • 使用rxjava实现倒计时

    千次阅读 2018-08-30 16:23:31
    以前的倒计时都是使用线程和handler来实现,或者一个timer,现在又有一个新...compile 'io.reactivex.rxjava2:rxjava:2.2.1' compile 'io.reactivex.rxjava2:rxandroid:2.1.0' 2.首先看下Observable的几个方法 int...
  • 关于rxjava的循环请求

    千次阅读 2017-06-08 17:51:02
    mFetIceServerTask = Observable.interval(0, 10, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .takeUntil(new Func1, Boolean>() { @Override public
  • Rxjava实现间隔时间发送数据

    千次阅读 2018-09-06 18:13:57
    三步搞定,下面是输出时间 比较明显的1s间隔
  • Android 最简单的rxjava遍历集合写法

    千次阅读 2016-12-15 17:02:36
    废话不多说,直接上代码List<User> list = new ArrayList(); for (int i = 0; i ; i++) { User user = new User(); user.id = i + ""; list.add(user); } Obse
  • Could not download rxjava.jar (io.reactivex:rxjava:1.1.0): No cached version available for offline mode 网上找了一圈,终于解决了问题,方法如下: 在File-Settings-Gradle中将Offlin...
  • RxJava延时操作

    千次阅读 2019-07-15 16:23:59
    直接去这里找就好,我只是个搬运工 https://blog.csdn.net/xiangshiweiyu_hd/article/details/83924707
1 2 3 4 5 ... 20
收藏数 35,465
精华内容 14,186
关键字:

rxjava2