精华内容
下载资源
问答
  • 使用Reactor进行反应式编程最全教程

    千次阅读 2018-11-20 13:58:09
    本文要介绍的是另外一个新的反应式编程库 Reactor。 反应式编程介绍 反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的...

    反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎。在 Java 社区中比较流行的是 RxJava 和 RxJava 2。本文要介绍的是另外一个新的反应式编程库 Reactor。

    反应式编程介绍

    反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。该规范定义了反应式流的相关接口,并将集成到 Java 9 中。

    在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。

    反应式流中第一个重要概念是负压(backpressure)。在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request()方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request()方法调用。这实际上变成了推拉结合的模式。

    Reactor 简介

    前面提到的 RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava 2 在 RxJava 的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。

    在 Java 程序中使用 Reactor 库非常的简单,只需要通过 Maven 或 Gradle 来添加对 io.projectreactor:reactor-core 的依赖即可,目前的版本是 3.0.5.RELEASE。

    Flux 和 Mono

    Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long>对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

    创建 Flux

    有多种不同的方式可以创建 Flux 序列。

    Flux 类的静态方法

    第一种方式是通过 Flux 类中的静态方法。

    • just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
    • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
    • empty():创建一个不包含任何元素,只发布结束消息的序列。
    • error(Throwable error):创建一个只包含错误消息的序列。
    • never():创建一个不包含任何消息通知的序列。
    • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
    • interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
    • intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

    代码清单 1 中给出了上述这些方法的使用示例。

    清单 1. 通过 Flux 类的静态方法创建 Flux 序列

    1

    2

    3

    4

    5

    6

    Flux.just("Hello", "World").subscribe(System.out::println);

    Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

    Flux.empty().subscribe(System.out::println);

    Flux.range(1, 10).subscribe(System.out::println);

    Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

    Flux.intervalMillis(1000).subscribe(System.out::println);

    上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。

    generate()方法

    generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate()方法的另外一种形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。

    在代码清单 2中,第一个序列的生成逻辑中通过 next()方法产生一个简单的值,然后通过 complete()方法来结束该序列。如果不调用 complete()方法,所产生的是一个无限序列。第二个序列的生成逻辑中的状态对象是一个 ArrayList 对象。实际产生的值是一个随机数。产生的随机数被添加到 ArrayList 中。当产生了 10 个数时,通过 complete()方法来结束序列。

    清单 2. 使用 generate()方法生成 Flux 序列

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    Flux.generate(sink -> {

        sink.next("Hello");

        sink.complete();

    }).subscribe(System.out::println);

     

     

    final Random random = new Random();

    Flux.generate(ArrayList::new, (list, sink) -> {

        int value = random.nextInt(100);

        list.add(value);

        sink.next(value);

        if (list.size() == 10) {

            sink.complete();

        }

        return list;

    }).subscribe(System.out::println);

    create()方法

    create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。在代码清单 3 中,在一次调用中就产生了全部的 10 个元素。

    清单 3. 使用 create()方法生成 Flux 序列

    1

    2

    3

    4

    5

    6

    Flux.create(sink -> {

        for (int i = 0; i < 10; i++) {

            sink.next(i);

        }

        sink.complete();

    }).subscribe(System.out::println);

    创建 Mono

    Mono 的创建方式与之前介绍的 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。除了这些方法之外,Mono 还有一些独有的静态方法。

    • fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
    • delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
    • ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
    • justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

    还可以通过 create()方法来使用 MonoSink 来创建 Mono。代码清单 4 中给出了创建 Mono 序列的示例。

    清单 4. 创建 Mono 序列

    1

    2

    3

    Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

    Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

    Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

    操作符

    和 RxJava 一样,Reactor 的强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。下面对其中重要的操作符进行分类介绍。

    buffer 和 bufferTimeout

    这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。

    除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。

    代码清单 5 给出了 buffer 相关操作符的使用示例。第一行语句输出的是 5 个包含 20 个元素的数组;第二行语句输出的是 2 个包含了 10 个元素的数组;第三行语句输出的是 5 个包含 2 个元素的数组。每当遇到一个偶数就会结束当前的收集;第四行语句输出的是 5 个包含 1 个元素的数组,数组里面包含的只有偶数。

    需要注意的是,在代码清单 5 中,首先通过 toStream()方法把 Flux 序列转换成 Java 8 中的 Stream 对象,再通过 forEach()方法来进行输出。这是因为序列的生成是异步的,而转换成 Stream 对象可以保证主线程在序列生成完成之前不会退出,从而可以正确地输出序列中的所有元素。

    清单 5. buffer 相关操作符的使用示例

    1

    2

    3

    4

    Flux.range(1, 100).buffer(20).subscribe(System.out::println);

    Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

    Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

    Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

    filter

    对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。代码清单 6 中的语句输出的是 1 到 10 中的所有偶数。

    清单 6. filter 操作符使用示例

    1

    Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

    window

    window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<Flux<T>>。在代码清单 7 中,两行语句的输出结果分别是 5 个和 2 个 UnicastProcessor 字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,而 UnicastProcessor 类的 toString 方法输出的就是 UnicastProcessor 字符。

    清单 7. window 操作符使用示例

    1

    2

    Flux.range(1, 100).window(20).subscribe(System.out::println);

    Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

    zipWith

    zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

    在代码清单 8 中,两个流中包含的元素分别是 a,b 和 c,d。第一个 zipWith 操作符没有使用合并函数,因此结果流中的元素类型为 Tuple2;第二个 zipWith 操作通过合并函数把元素类型变为 String。

    清单 8. zipWith 操作符使用示例

    1

    2

    3

    4

    5

    6

    Flux.just("a", "b")

            .zipWith(Flux.just("c", "d"))

            .subscribe(System.out::println);

    Flux.just("a", "b")

            .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

            .subscribe(System.out::println);

    take

    take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。

    • take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。
    • takeLast(long n):提取流中的最后 N 个元素。
    • takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
    • takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。
    • takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。

    在代码清单 9 中,第一行语句输出的是数字 1 到 10;第二行语句输出的是数字 991 到 1000;第三行语句输出的是数字 1 到 9;第四行语句输出的是数字 1 到 10,使得 Predicate 返回 true 的元素也是包含在内的。

    清单 9. take 系列操作符使用示例

    1

    2

    3

    4

    Flux.range(1, 1000).take(10).subscribe(System.out::println);

    Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

    Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);

    Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

    reduce 和 reduceWith

    reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

    在代码清单 10 中,第一行语句对流中的元素进行相加操作,结果为 5050;第二行语句同样也是进行相加操作,不过通过一个 Supplier 给出了初始值为 100,所以结果为 5150。

    清单 10. reduce 和 reduceWith 操作符使用示例

    1

    2

    Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

    Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

    merge 和 mergeSequential

    merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。

    代码清单 11 中分别使用了 merge 和 mergeSequential 操作符。进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起;而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。

    清单 11. merge 和 mergeSequential 操作符使用示例

    1

    2

    3

    4

    5

    6

    Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

            .toStream()

            .forEach(System.out::println);

    Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

            .toStream()

            .forEach(System.out::println);

    flatMap 和 flatMapSequential

    flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。

    在代码清单 12 中,流中的元素被转换成每隔 100 毫秒产生的数量不同的流,再进行合并。由于第一个流中包含的元素数量较少,所以在结果流中一开始是两个流的元素交织在一起,然后就只有第二个流中的元素。

    清单 12. flatMap 操作符使用示例

    1

    2

    3

    4

    Flux.just(5, 10)

            .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

            .toStream()

            .forEach(System.out::println);

    concatMap

    concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

    代码清单 13 与代码清单 12 类似,只不过把 flatMap 换成了 concatMap,结果流中依次包含了第一个流和第二个流中的全部元素。

    清单 13. concatMap 操作符使用示例

    1

    2

    3

    4

    Flux.just(5, 10)

            .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

            .toStream()

            .forEach(System.out::println);

    combineLatest

    combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。在 代码清单 14 中,流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String。

    清单 14. combineLatest 操作符使用示例

    1

    2

    3

    4

    5

    Flux.combineLatest(

            Arrays::toString,

            Flux.intervalMillis(100).take(5),

            Flux.intervalMillis(50, 100).take(5)

    ).toStream().forEach(System.out::println);

    消息处理

    当需要处理 Flux 或 Mono 中的消息时,如之前的代码清单所示,可以通过 subscribe 方法来添加相应的订阅逻辑。在调用 subscribe 方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。代码清单 15 中通过 subscribe()方法同时处理了正常消息和错误消息。

    清单 15. 通过 subscribe()方法处理正常和错误消息

    1

    2

    3

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .subscribe(System.out::println, System.err::println);

    正常的消息处理相对简单。当出现错误时,有多种不同的处理策略。第一种策略是通过 onErrorReturn()方法返回一个默认值。在代码清单 16 中,当出现错误时,流会产生默认值 0.

    清单 16. 出现错误时返回默认值

    1

    2

    3

    4

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .onErrorReturn(0)

            .subscribe(System.out::println);

    第二种策略是通过 switchOnError()方法来使用另外的流来产生元素。在代码清单 17 中,当出现错误时,将产生 Mono.just(0)对应的流,也就是数字 0。

    清单 17. 出现错误时使用另外的流

    1

    2

    3

    4

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .switchOnError(Mono.just(0))

            .subscribe(System.out::println);

    第三种策略是通过 onErrorResumeWith()方法来根据不同的异常类型来选择要使用的产生元素的流。在代码清单 18 中,根据异常类型来返回不同的流作为出现错误时的数据来源。因为异常的类型为 IllegalArgumentException,所产生的元素为-1。

    清单 18. 出现错误时根据异常类型来选择流

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalArgumentException()))

            .onErrorResumeWith(e -> {

                if (e instanceof IllegalStateException) {

                    return Mono.just(0);

                } else if (e instanceof IllegalArgumentException) {

                    return Mono.just(-1);

                }

                return Mono.empty();

            })

            .subscribe(System.out::println);

    当出现错误时,还可以通过 retry 操作符来进行重试。重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。代码清单 19 中指定了重试次数为 1,所输出的结果是 1,2,1,2 和错误信息。

    清单 19. 使用 retry 操作符进行重试

    1

    2

    3

    4

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .retry(1)

            .subscribe(System.out::println);

    调度器

    前面介绍了反应式流和在其上可以进行的各种操作,通过调度器(Scheduler)可以指定这些操作执行的方式和所在的线程。有下面几种不同的调度器实现。

    • 当前线程,通过 Schedulers.immediate()方法来创建。
    • 单一的可复用的线程,通过 Schedulers.single()方法来创建。
    • 使用弹性的线程池,通过 Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时,新的线程会被创建。如果一个线程闲置太长时间,则会被销毁。该调度器适用于 I/O 操作相关的流的处理。
    • 使用对并行操作优化的线程池,通过 Schedulers.parallel()方法来创建。其中的线程数量取决于 CPU 的核的数量。该调度器适用于计算密集型的流的处理。
    • 使用支持任务调度的调度器,通过 Schedulers.timer()方法来创建。
    • 从已有的 ExecutorService 对象中创建调度器,通过 Schedulers.fromExecutorService()方法来创建。

    某些操作符默认就已经使用了特定类型的调度器。比如 intervalMillis()方法创建的流就使用了由 Schedulers.timer()创建的调度器。通过 publishOn()和 subscribeOn()方法可以切换执行操作的调度器。其中 publishOn()方法切换的是操作符的执行方式,而 subscribeOn()方法切换的是产生流中元素时的执行方式。

    在代码清单 20 中,使用 create()方法创建一个新的 Flux 对象,其中包含唯一的元素是当前线程的名称。接着是两对 publishOn()和 map()方法,其作用是先切换执行时的调度器,再把当前的线程名称作为前缀添加。最后通过 subscribeOn()方法来改变流产生时的执行方式。运行之后的结果是[elastic-2] [single-1] parallel-1。最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操作之前的 Schedulers.single()调度器,最外层的线程名字 elastic-2 来自第二个 map 操作之前的 Schedulers.elastic()调度器。

    清单 20. 使用调度器切换操作符执行方式

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    Flux.create(sink -> {

        sink.next(Thread.currentThread().getName());

        sink.complete();

    })

    .publishOn(Schedulers.single())

    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

    .publishOn(Schedulers.elastic())

    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

    .subscribeOn(Schedulers.parallel())

    .toStream()

    .forEach(System.out::println);

    测试

    在对使用 Reactor 的代码进行测试时,需要用到 io.projectreactor.addons:reactor-test 库。

    使用 StepVerifier

    进行测试时的一个典型的场景是对于一个序列,验证其中所包含的元素是否符合预期。StepVerifier 的作用是可以对序列中包含的元素进行逐一验证。在代码清单 21 中,需要验证的流中包含 a 和 b 两个元素。通过 StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而 verifyComplete()方法则验证流是否正常结束。类似的方法还有 verifyError()来验证流由于错误而终止。

    清单 21. 使用 StepVerifier 验证流中的元素

    1

    2

    3

    4

    StepVerifier.create(Flux.just("a", "b"))

            .expectNext("a")

            .expectNext("b")

            .verifyComplete();

    操作测试时间

    有些序列的生成是有时间要求的,比如每隔 1 分钟才产生一个新的元素。在进行测试中,不可能花费实际的时间来等待每个元素的生成。此时需要用到 StepVerifier 提供的虚拟时间功能。通过 StepVerifier.withVirtualTime()方法可以创建出使用虚拟时钟的 StepVerifier。通过 thenAwait(Duration)方法可以让虚拟时钟前进。

    在代码清单 22 中,需要验证的流中包含两个产生间隔为一天的元素,并且第一个元素的产生延迟是 4 个小时。在通过 StepVerifier.withVirtualTime()方法包装流之后,expectNoEvent()方法用来验证在 4 个小时之内没有任何消息产生,然后验证第一个元素 0 产生;接着 thenAwait()方法来让虚拟时钟前进一天,然后验证第二个元素 1 产生;最后验证流正常结束。

    清单 22. 操作测试时间

    1

    2

    3

    4

    5

    6

    7

    StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))

            .expectSubscription()

            .expectNoEvent(Duration.ofHours(4))

            .expectNext(0L)

            .thenAwait(Duration.ofDays(1))

            .expectNext(1L)

            .verifyComplete();

    使用 TestPublisher

    TestPublisher 的作用在于可以控制流中元素的产生,甚至是违反反应流规范的情况。在代码清单 23 中,通过 create()方法创建一个新的 TestPublisher 对象,然后使用 next()方法来产生元素,使用 complete()方法来结束流。TestPublisher 主要用来测试开发人员自己创建的操作符。

    清单 23. 使用 TestPublisher 创建测试所用的流

    1

    2

    3

    4

    5

    6

    7

    8

    9

    final TestPublisher<String> testPublisher = TestPublisher.create();

    testPublisher.next("a");

    testPublisher.next("b");

    testPublisher.complete();

     

    StepVerifier.create(testPublisher)

            .expectNext("a")

            .expectNext("b")

            .expectComplete();

    调试

    由于反应式编程范式与传统编程范式的差异性,使用 Reactor 编写的代码在出现问题时比较难进行调试。为了更好的帮助开发人员进行调试,Reactor 提供了相应的辅助功能。

    启用调试模式

    当需要获取更多与流相关的执行信息时,可以在程序开始的地方添加代码清单 24 中的代码来启用调试模式。在调试模式启用之后,所有的操作符在执行时都会保存额外的与执行链相关的信息。当出现错误时,这些信息会被作为异常堆栈信息的一部分输出。通过这些信息可以分析出具体是在哪个操作符的执行中出现了问题。

    清单 24. 启用调试模式

    1

    Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

    不过当调试模式启用之后,记录这些额外的信息是有代价的。一般只有在出现了错误之后,再考虑启用调试模式。但是当为了找到问题而启用了调试模式之后,之前的错误不一定能很容易重现出来。为了减少可能的开销,可以限制只对特定类型的操作符启用调试模式。

    使用检查点

    另外一种做法是通过 checkpoint 操作符来对特定的流处理链来启用调试模式。代码清单 25 中,在 map 操作符之后添加了一个名为 test 的检查点。当出现错误时,检查点名称会出现在异常堆栈信息中。对于程序中重要或者复杂的流处理链,可以在关键的位置上启用检查点来帮助定位可能存在的问题。

    清单 25. 使用 checkpoint 操作符

    1

    Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

    日志记录

    在开发和调试中的另外一项实用功能是把流相关的事件记录在日志中。这可以通过添加 log 操作符来实现。在代码清单 26 中,添加了 log 操作符并指定了日志分类的名称。

    清单 26. 使用 log 操作符记录事件

    1

    Flux.range(1, 2).log("Range").subscribe(System.out::println);

    在实际的运行时,所产生的输出如代码清单 27 所示。

    清单 27. log 操作符所产生的日志

    1

    2

    3

    4

    5

    6

    7

    8

    13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

    13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

    13:07:56.753 [main] INFO Range - | request(unbounded)

    13:07:56.754 [main] INFO Range - | onNext(1)

    1

    13:07:56.754 [main] INFO Range - | onNext(2)

    2

    13:07:56.754 [main] INFO Range - | onComplete()

    “冷”与“热”序列

    之前的代码清单中所创建的都是冷序列。冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。而与之对应的热序列,则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

    在代码清单 28 中,原始的序列中包含 10 个间隔为 1 秒的元素。通过 publish()方法把一个 Flux 对象转换成 ConnectableFlux 对象。方法 autoConnect()的作用是当 ConnectableFlux 对象有一个订阅者时就开始产生消息。代码 source.subscribe()的作用是订阅该 ConnectableFlux 对象,让其开始产生数据。接着当前线程睡眠 5 秒钟,第二个订阅者此时只能获得到该序列中的后 5 个元素,因此所输出的是数字 5 到 9。

    清单 28. 热序列

    1

    2

    3

    4

    5

    6

    7

    8

    9

    final Flux<Long> source = Flux.intervalMillis(1000)

            .take(10)

            .publish()

            .autoConnect();

    source.subscribe();

    Thread.sleep(5000);

    source

            .toStream()

            .forEach(System.out::println);

    小结

    反应式编程范式对于习惯了传统编程范式的开发人员来说,既是一个需要进行思维方式转变的挑战,也是一个充满了更多可能的机会。Reactor 作为一个基于反应式流规范的新的 Java 库,可以作为反应式应用的基础。本文对 Reactor 库做了详细的介绍,包括 Flux 和 Mono 序列的创建、常用操作符的使用、调度器、错误处理以及测试和调试技巧等。

    参考资源 (resources)

     

    资源下载(Reactor例子)

    https://download.csdn.net/download/cn_hhaip/10795795

    展开全文
  • 一、网络服务和请求的特点与事件分发器的两种模式: 例如:Web服务、分布式事务 大多数都有相同的基础结构和步骤: 读请求:Read request 解码请求:Decode request 进程服务:Process service 编码回复:Encode ...

    一、网络服务和请求的特点与事件分发器的两种模式:

    例如:Web服务、分布式事务

    大多数都有相同的基础结构和步骤:

    读请求:Read request

    解码请求:Decode request

    进程服务:Process service

    编码回复:Encode reply

    回复应答:Send reply

    但是各种不同的请求不同在逻辑和每一步的开销

    例如:

    XML parsing, File transfer, Web page generation, computational services, …

    典型的服务设计如下图所示:

    在这里插入图片描述

    参照上图,可以写出典型的ServerSocket的伪代码如下:

    class Server implements Runnable {
        public static final Integer PORT=123333;//mock的
        public static final Integer MAX_INPUT=128;//mock的
        public void run() {
            try {
                ServerSocket ss = new ServerSocket(PORT);
                while (!Thread.interrupted())
                    new Thread(new Handler(ss.accept())).start();
                // or, single-threaded, or a thread pool
            } catch (IOException ex) { /* ... */ }
        }
        static class Handler implements Runnable {
            final Socket socket;
            Handler(Socket s) { socket = s; }
            public void run() {
                try {
                    byte[] input = new byte[MAX_INPUT];
                    socket.getInputStream().read(input);
                    byte[] output = process(input);
                    socket.getOutputStream().write(output);
                } catch (IOException ex) { /* ... */ }
            }
            private byte[] process(byte[] cmd) { /* ... */ }
        }
    }
    

    分治思想:

    • 1.将进程切分成小的任务,每一小块非阻塞的去执行任务

    • 2.当每个任务被启用时执行它 一个IO事件通常如下所示:

      在这里插入图片描述

    • 3.java nio中的基本原理:

      • 非阻塞的读写
      • 分配任务绑定IO事件

    一般情况下,I/O 复用机制需要事件分发器(event dispatcher)。 事件分发器的作用,即将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊: 谁谁谁的快递到了, 快来拿吧!开发人员在开始的时候需要在分发器那里注册感兴趣的事件,并提供相应的处理者(event handler),或者是回调函数;事件分发器在适当的时候,会将请求的事件分发给这些handler或者回调函数。

    涉及到事件分发器的两种模式称为:Reactor和Proactor。 Reactor模式是基于同步I/O的而Proactor模式是和异步I/O相关的。在Reactor模式中,事件分发器等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分发器就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。

    单机版Reactor模式:

    在这里插入图片描述

    二、Java NIO

    Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。

    Java NIO的组成:

    • Channels:与文件、socket连接来支持 非阻塞读 Channel 有点象流。 数据可以从Channel读到Buffer中

    • Buffers:缓冲区

    • Selectors(选择器):这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。

    • SelectionKeys:一个用于向Selector注册Channel的token。每次Selector注册一个Channel的时候都会创建一个 SelectionKey,一个键(SelectionKey)一直有效,直到通过调用它的cancel方法、关闭它的通道或关闭它的选择器来取消它。cancel方法取消键(SelectionKey)不会立即将其从选择器中移除;相反,它被添加到选择器(SelectionKey)的取消键集中,以便在下一个选择操作中删除。可以通过调用键的isValid方法来测试键的有效性。

      选择键包含两个表示为整数值的操作集 。 操作集的每一位表示由密钥通道支持的可选择操作的类别。

      • (interest set)兴趣集确定下一次调用选择器的选择方法之一后,准备测试哪些操作类别。 兴趣集在创建密钥时用给定的值初始化; 可以稍后通过interestOps(int)方法进行更改。
      • (ready set)就续集标识了键的选择器已经检测到密钥通道已准备就绪的操作类别。 当创建密钥时,就绪集被初始化为零; 可能在选择操作期间可能会被选择器更新,但不能直接更新。

      选择键的就绪集(ready set)表示其通道对某些操作类别做好准备是一个提示,但不能保证这样的类别中的操作可以由线程执行而不会导致线程阻塞。 在完成选择操作之后,准备好的集合很可能是准确的。 外部事件和相应通道上调用的I / O操作可能会导致不准确。

      该类定义了所有已知的操作设置位,但是确切地说,给定通道支持哪些位取决于通道的类型SelectableChannel每个子类定义了一个validOps()方法,它返回一组仅识别通道支持的操作的集合。 尝试设置或测试密钥通道不支持的操作集位将导致适当的运行时异常。

      通常需要将某些特定于应用程序的数据与选择密钥相关联,例如表示较高级别协议的状态的对象,并处理就绪通知以实现该协议。 因此选择键支持单个任意对象的一个键的连接 。 可以通过attach方法附加一个对象,然后通过attachment方法检索

      多个并发线程使用选择键是安全的。 通常,读取和写入兴趣集的操作将与选择器的某些操作同步。

      正是这种同步的执行方式取决于实现:在简单的实现中,如果选择操作已经进行,则读取或写入兴趣集可能会无限期地阻止;

      在高性能的实施中,阅读或写入兴趣集可能会暂时阻止,如果有的话。 在任何情况下,选择操作将始终使用在操作开始时当前的兴趣值。

    Channel和Buffer有好几种类型。下面是JAVA NIO中的一些主要Channel的实现:

    • FileChannel
    • DatagramChannel
    • SocketChannel
    • ServerSocketChannel

    正如你所看到的,这些通道涵盖了UDP 和 TCP 网络IO,以及文件IO。

    img

    服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一,大多数IO相关组件如Netty、Redis在使用的IO模式,消息队列kafka中接收消息也是基于Reactor模式。

    我的理解:Reactor 模型中有 2 个关键组成:

    • Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人。
    • Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。

    img

    取决于 Reactor 的数量和 Hanndler 线程数量的不同,Reactor 模型有 3 个变种:

    • 单 Reactor 单线程。
    • 单 Reactor 多线程。
    • 主从 Reactor 多线程。

    关于如何处理请求?

    很容易想到两个方案

    1.顺序处理请求:如果写成伪代码,大概是这个样子:

    while (true) {
                Request request = accept(connection);
                handle(request);
    }
    

    这个方法实现简单,但是有个致命的缺陷,那就是吞吐量太差。由于只能顺序处理每个请求,因此,每个请求都必须等待前一个请求处理完毕才能得到处理。这种方式只适用于请求发送非常不频繁的系统。

    2**.每个请求使用单独线程处理**。也就是说,我们为每个入站请求都创建一个新的线程来异步处理。我们一起来看看这个方案的伪代码。

    while (true) {
                Request = request = accept(connection);
                Thread thread = new Thread(() -> {
      handle(request);});
                thread.start();
    }
    

    这个方法反其道而行之,完全采用异步的方式。系统会为每个入站请求都创建单独的线程来处理。

    优点:它是完全异步的,每个请求的处理都不会阻塞下一个请求

    缺点:为每个请求都创建线程的做法开销极大,在某些场景下甚至会压垮整个服务。

    还是那句话,这个方法只适用于请求发送频率很低的业务场景。

    那么先来看看单线程的Reactor是如何处理请求的

    三、单 Reactor 单线程

    img

    方案说明:

    • 1.select是 I/O 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求。
    • 2.Reactor对象通过select监控客户端请求事件,收到事件后通过 Dispatch 进行分发。
    • 3.如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理。
    • 4.如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应。
    • 5.Handler 会完成 Read→业务处理→Send 的完整业务流程。

    服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,下面的NIO就属于这种模型。

    3.1Reactor模型的朴素原型

    Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型

    下面用Java NIO来简单实现一个Reactor模型(非常简单,因为Java NIO就是基于Reactor模型实现的)

    public class NIOServer {
        public static void main(String[] args) throws IOException {
            //1、获取Selector选择器
            Selector selector = Selector.open();
    
            // 2、获取通道
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            // 3.设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            // 4、绑定连接
            serverSocketChannel.bind(new InetSocketAddress(8888));
    
            // 5、将通道注册到选择器上,并注册的操作为:“接收”操作
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            // 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
            while (true){
                if(selector.select() == 0){
                    continue;
                }
                // 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    
                while(selectedKeys.hasNext()){
                    // 8、获取“准备就绪”的时间
                    SelectionKey selectedKey = selectedKeys.next();
    
                    // 9、判断key是具体的什么事件
                    if (selectedKey.isAcceptable()) {
                        // 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        // 11、切换为非阻塞模式
                        socketChannel.configureBlocking(false);
                        // 12、将该通道注册到selector选择器上
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }else if(selectedKey.isReadable()){
                        // 13、获取该选择器上的“读就绪”状态的通道
                        SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
    
                        //14.读取数据
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int length=0;
                        while((length=socketChannel.read(byteBuffer))!=-1){
                            //flip():Buffer有两种模式,写模式和读模式。在写模式下调用flip()之后,Buffer从写模式变成读模式。
                            //那么limit就设置成了position当前的值(即当前写了多少数据),postion会被置为0,以表示读操作从缓存的头开始读,mark置为-1。
                            //也就是说调用flip()之后,读/写指针position指到缓冲区头部,并且设置了最多只能读出之前写入的数据长度(而不是整个缓存的容量大小)。
                            byteBuffer.flip();
                            System.out.println(new String(byteBuffer.array(), 0, length));
                            byteBuffer.clear();
                        }
                        socketChannel.close();
                    }
    
                    // 15、移除选择键
                    selectedKeys.remove();
                }
    
            }
        }
    }
    

    在上面这个简单的模型中抽象出来两个组件——Reactor和Handler两个组件:

    1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

    2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

    就能表示一个简单的Reactor模型了

    3.2什么是单线程Reactor呢?

    下面的图,来自于“Scalable IO in Java”。Reactor和Hander 处于一条线程执行

    img

    这是最简单的单Reactor单线程模型Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中

    顺便说一下,可以将上图的accepter,看做是一种特殊的handler。

    3.3单线程Reactor的参考代码

    “Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:

    Reactor线程

    class Reactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocket;
    
        Reactor(int port) throws IOException {
            //1、获取Selector选择器
            selector = Selector.open();
            //2、获取通道
            serverSocket = ServerSocketChannel.open();
            //3、绑定连接
            serverSocket.socket().bind(
                    new InetSocketAddress(port));
            //4、设置为非阻塞
            serverSocket.configureBlocking(false);
            // 5、将通道注册到选择器上,并注册的操作为:“接收”操作
            SelectionKey sk =
                    serverSocket.register(selector,
                            SelectionKey.OP_ACCEPT);
            //6、新建一个Acceptor分发器 关联到serverSocket
            sk.attach(new Acceptor());
        }
        public void run() { // normally in a new Thread
    
            try {
                while (!Thread.interrupted()) {
                    //select函数 阻塞式的 至少有一个I/O事件就绪,才会返回
                    selector.select();
                    //得到selectedKeys
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    //遍历selectedKeys 用Acceptor分配
                    while (it.hasNext()){
                        dispatch((SelectionKey)(it.next()));
                    }
                    //都执行完了,清空selectedKeys
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }
        void dispatch(SelectionKey k) {
            //通过选择见检索到当前附加的对象也就是Acceptor 去执行
            Runnable r = (Runnable)(k.attachment());
            if (r != null)
                r.run();
        }
        class Acceptor implements Runnable { // inner
            public void run() {
                try {
                    SocketChannel c = serverSocket.accept();
                    //创建一个handler去执行(读或写)channel
                    if (c != null)
                        new Handler(selector, c);
                }
                catch(IOException ex) { /* ... */ }
            }
        }
    }
    

    Handler执行器

    public class Handler implements Runnable {
        
        private SocketChannel channel;
    
        private SelectionKey selectionKey;
        
        ByteBuffer input = ByteBuffer.allocate(1024);
        ByteBuffer output = ByteBuffer.allocate(1024);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
        public Handler(Selector selector, SocketChannel c) throws IOException {
            this.channel = c;
            c.configureBlocking(false);
            // Optionally try first read now
            this.selectionKey = channel.register(selector, 0);
    
            //将Handler作为callback对象
            this.selectionKey.attach(this);
    
            //第二步,注册Read就绪事件
            this.selectionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        private boolean inputIsComplete() {
            /* ... */
            return false;
        }
    
        private boolean outputIsComplete() {
            /* ... */
            return false;
        }
    
        private void process() {
            /* ... */
            return;
        }
    
        public void run() {
            try {
                if (state == READING) {
                    read();
                } else if (state == SENDING) {
                    send();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void read() throws IOException {
            channel.read(input);
            if (inputIsComplete()) {
                process();
    
                state = SENDING;
                // Normally also do first write now
                //第三步,接收write就绪事件
                selectionKey.interestOps(SelectionKey.OP_WRITE);
            }
        }
    
        private void send() throws IOException {
            channel.write(output);
    
            //write完就结束了, 关闭select key
            if (outputIsComplete()) {
                selectionKey.cancel();
            }
        }
    }
    

    3.4单线程模式的缺点

    其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

    3.5方案优缺点分析:

    • 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。
    • 缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
    • 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
    • 使用场景:单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。以及客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况。

    四、多线程的单Reactor(体现在Handler执行器由多线程来执行)

    img

    方案说明:

    • 1、Reactor 对象通过select 监控客户端请求事件,收到事件后,通过dispatch进行分发
    • 2、如果建立连接请求,则Acceptor 通过accept 处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件
    • 3、如果不是连接请求,则由reactor分发调用连接对应的handler 来处理。
    • 4、handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务。
    • 5、worker 线程池会分配独立线程完成真正的业务,并将结果返回给handler。
    • 6、handler收到响应后,通过send 将结果返回给client。

    方案优缺点分析:

    • 优点:可以充分的利用多核cpu 的处理能力。
    • 缺点:多线程数据共享和访问比较复杂reactor 处理所有的事件的监听和响应在单线程运行, 在高并发场景容易出现性能瓶颈。

    4.1、基于线程池的改进

    在多线程Reactor模式基础上,做如下改进:

    • 1、将Handler处理器的执行放入线程池,多线程进行业务处理。
    • 2、而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

    img

    4.2、改进后的完整示意图

    下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。

    img

    4.3、多线程Reactor的参考代码

    “Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:

    public class MThreadHandler implements Runnable {
        private SocketChannel channel;
    
        private SelectionKey selectionKey;
    
        ByteBuffer input = ByteBuffer.allocate(1024);
        ByteBuffer output = ByteBuffer.allocate(1024);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
    
        ExecutorService pool = Executors.newFixedThreadPool(2);
        static final int PROCESSING = 3;
    
        public MThreadHandler(Selector selector, SocketChannel c) throws IOException {
            this.channel = c;
            c.configureBlocking(false);
            // Optionally try first read now
            this.selectionKey = this.channel.register(selector, 0);
    
            //将Handler作为callback对象
            this.selectionKey.attach(this);
    
            //第二步,注册Read就绪事件
            this.selectionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        private boolean inputIsComplete() {
            /* ... */
            return false;
        }
    
        private boolean outputIsComplete() {
            /* ... */
            return false;
        }
    
        private void process() {
            /* ... */
            return;
        }
    
        public void run() {
            try {
                if (state == READING) {
                    read();
                } else if (state == SENDING) {
                    send();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    
        private synchronized void read() throws IOException {
            // ...
            channel.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                //使用线程pool异步执行
                pool.execute(new Processer());
            }
        }
    
        private void send() throws IOException {
            channel.write(output);
    
            //write完就结束了, 关闭select key
            if (outputIsComplete()) {
                selectionKey.cancel();
            }
        }
    
        private synchronized void processAndHandOff() {
            process();
            state = SENDING;
            // or rebind attachment
            //process完,开始等待write就绪
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    
        private class Processer implements Runnable {
            public void run() {
                processAndHandOff();
            }
        }
    }
    

    区别就在于使用了线程池来异步执行

    private synchronized void read() throws IOException {
            // ...
            channel.read(input);
            if (inputIsComplete()) {
                state = PROCESSING;
                //使用线程pool异步执行
                pool.execute(new Processer());
            }
        }
    

    五、主从 Reactor 多线程

    针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行

    img

    方案说明:

    • 1、Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件。
    • 2、当 Acceptor 处理连接事件后MainReactor 将连接分配给SubReactor
    • 3、subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理
    • 4、当有新事件发生时, subreactor 就会调用对应的handler处理。
    • 5、handler 通过read 读取数据,分发给后面的worker 线程处理。
    • 6、worker 线程池分配独立的worker 线程进行业务处理,并返回结果。
    • 7、handler 收到响应的结果后,再通过send 将结果返回给client。
    • 8、Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor。

    加粗的为与多线程单Reactor不一样的地方

    Scalable IO in Java 对 Multiple Reactors 的原理图解

    img

    对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:

    public class MThreadReactor implements Runnable {
    
        //subReactors集合, 一个selector代表一个subReactor
        Selector[] selectors=new Selector[2];
        int next = 0;
        final ServerSocketChannel serverSocket;
    
        private MThreadReactor(int port) throws IOException { 
            //Reactor初始化
            selectors[0]=Selector.open();
            selectors[1]= Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(port));
            //非阻塞
            serverSocket.configureBlocking(false);
            
            //分步处理,第一步,接收accept事件
            SelectionKey selectionKey = serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
            //attach callback object, Acceptor
            selectionKey.attach(new Acceptor());
        }
    
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    for (int i = 0; i <2 ; i++) {
                        selectors[i].select();
                        Set<SelectionKey> selectionKeys = selectors[i].selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            //Reactor负责dispatch收到的事件
                            dispatch((SelectionKey) (iterator.next()));
                        }
                        selectionKeys.clear();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void dispatch(SelectionKey k) {
            Runnable r = (Runnable) (k.attachment());
            //调用之前注册的callback对象
            if (r != null) {
                r.run();
            }
        }
    
    
        class Acceptor { // ...
            public synchronized void run() throws IOException {
                //主selector负责accept
                SocketChannel connection = serverSocket.accept(); 
                if (connection != null) {
                    //选个subReactor去负责接收到的connection
                    new Handler(selectors[next], connection); 
                }
                if (++next == selectors.length) {
                    next = 0;
                }
            }
        }
    }
    

    方案优缺点说明:

    • 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
    • 优点:父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
    • 缺点:编程复杂度较高。

    这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持,kafka在接收客户端请求时也是类似。

    六、Reactor编程的优点和缺点

    优点:

    • 1、响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
    • 2、编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
    • 3、可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
    • 4、可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

    缺点:

    • 1、相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
    • 2、Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
    • 3、Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。

    参考:

    https://www.jianshu.com/p/2759a2374ed4

    http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

    展开全文
  • 响应式编程的首要问题 - 不好调试 我们在分析传统代码的时候,在哪里打了断点,就能看到直观的调用堆栈,来搞清楚,谁调用了这个代码,之前对参数做了什么修改,等等。但是在响应式编程中,这个问题就很麻烦。来看...

    响应式编程的首要问题 - 不好调试

    我们在分析传统代码的时候,在哪里打了断点,就能看到直观的调用堆栈,来搞清楚,谁调用了这个代码,之前对参数做了什么修改,等等。但是在响应式编程中,这个问题就很麻烦。来看下面的例子。

    public class FluxUtil1 {
        public static Flux<Integer> test(Flux<Integer> integerFlux) {
            return FluxUtil2.test2(integerFlux.map(Object::toString));
        }
    }
    public class FluxUtil2 {
        public static Flux<Integer> test2(Flux<String> stringFlux) {
            return stringFlux.map(Integer::new);
        }
    }
    public class FluxTest {
        public static void main(String[] args) {
            Flux<Integer> integerFlux = Flux.fromIterable(List.of(1, 2, 3));
            FluxUtil1.test(integerFlux.log()).subscribe(integer -> {
                System.out.println(integer);
            });
        }
    }
    

    我们调试到 subscribe 订阅消费(这个后面会讲),我们一般会想知道我们订阅的这个东西,之前经过了怎样的处理,但是在System.out.println(integer)打断点,看到的却是:

    image

    根本看不出来是FluxUtil1FluxUtil2处理过这个Flux。简单的代码还好,复杂起来调试简直要人命。官方也意识到了这一点,所以提供了一种在操作时捕捉堆栈缓存起来的机制。

    这里我们先给出这些机制如何使用,后面我们会分析其中的实现原理。

    1. 通过打开全局 Operator 堆栈追踪

    设置reactor.trace.operatorStacktrace这个环境变量为 true,即启动参数中加入 -Dreactor.trace.operatorStacktrace=true,这样启动全局 Operator 堆栈追踪。

    这个也可以通过代码动态打开或者关闭:

    //打开
    Hooks.onOperatorDebug();
    //关闭
    Hooks.resetOnOperatorDebug();
    

    打开这个追踪之后,在每多一个 Operator,就会多出来一个 FluxOnAssembly(这个后面原理会详细说明)。通过这个 FluxOnAssembly,里面就有堆栈信息。怎么获取呢?可以通过Scannable.from(某个Flux).parents().collect(Collectors.toList())获取里面所有层的 Flux,其中包含了 FluxOnAssembly, FluxOnAssembly 就包含了堆栈信息。

    我们这里,在System.out.println(integer)打断点,加入查看Scannable.from(FluxUtil1.test(integerFlux.log())).parents().collect(Collectors.toList()),就能看到:

    image

    可以看出,每次map操作究竟发生在哪一行代码,都能看到。

    如果使用的是专业版的 IDEA,还可以配置:
    image

    然后可以在打断点 Debug 就能看到具体堆栈:
    image

    2. 通过加入 ReactorDebugAgent 实现

    添加依赖:

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-tools</artifactId>
        <version>略</version>
    </dependency>
    

    之后,可以通过这两个代码,开启

    //启用
    ReactorDebugAgent.init();
    //如果有类没有生效,例如初始化没加载,后来动态加载的类,可以调用这个重新处理启用
    ReactorDebugAgent.processExistingClasses();
    

    这样,可以动态修改线上应用开启Debug模式,例如通过 Arthas 这个工具的 ognl 调用静态方法的功能(https://alibaba.github.io/arthas/ognl.html)。

    如果使用的是专业版的 IDEA,还可以配置:
    image

    然后可以在打断点 Debug 就能看到具体堆栈:
    image

    响应式编程 - Flow 的理解

    之前说过 FLow 是 Java 9 中引入的响应式编程的抽象概念,对应的类就是:java.util.concurrent.Flow
    Flow 是一个概念类,其中定义了三个接口供实现。这三个接口分别是:Publisher, SubscriberSubscription

    //标注是一个FunctionalInterface,因为只有一个抽象方法
    @FunctionalInterface
    public static interface Publisher<T> {
        public void subscribe(Subscriber<? super T> subscriber);
    }
    
    public static interface Subscriber<T> {
        public void onSubscribe(Subscription subscription);
        public void onNext(T item);
        public void onError(Throwable throwable);
        public void onComplete();
    }
    
    public static interface Subscription {
        public void request(long n);
        public void cancel();
    }
    

    Publisher是负责生成 item 的,其中的subscribe方法就是注册Subscriber进去,用于消费。注册成功后,会调用SubscriberonSubscribe方法,传Subscription进来。这个Subscription里面的 request 用于请求Publisher发送多少 item 过来,cancel 用于告诉Publisher不要再发 item 过来了。每次Publisher有 item 生成并且没有超过Subscription request 的个数限制,onNext方法会被调用用于发送这个 item。当有异常发生时,onError 就会被调用。当Publisher判断不会有新的 item 或者异常发生的时候,就会调用onComplete告诉Subscriber消费完成了。大体上就是这么个流程。

    Project Reactor 就是Flow的一种实现。并且在Flow这个模型的基础上,参考了 Java 8 Stream 的接口功能设计,加入了流处理的机制。

    Project Reactor - Flux如何实现Flow的接口

    Flux就是一串相同类型数据的流,他包括并且会发射 0~n 个对象,例如:

    Flux<String> just = Flux.just("1", "2", "3");
    

    这样,我们就生成了一个包含三个字符串的Flux流(底层实现实际上就是FluxArray,这个我们以后会说的)

    然后,我们按照之前 Flow 里面提到的流程,先进行简单的 subscribe

    Flux.just("test1", "test2", "test3")
        //打印详细流日志
        .log()
        //订阅消费
        .subscribe(System.out::println);
    

    运行代码,我们会看到日志输出:

    07:08:13.816 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
    07:08:13.822 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
    07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test1)
    test1
    07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test2)
    test2
    07:08:13.823 [main] INFO reactor.Flux.Array.1 - | onNext(test3)
    test3
    07:08:13.824 [main] INFO reactor.Flux.Array.1 - | onComplete()
    
    

    这些日志很清楚的说明了subscribe究竟是如何工作的:

    1. 首先在subscribe的同时,onSubscribe首先被调用
    2. 然后调用request(unbounded),这里request代表请求多少个数据,unbounded代表请求无限个,就是所有的数据
    3. 对于每个数据对象,调用onNext方法:onNext(test1),onNext(test2),onNext(test3)
    4. 在最后完成的时候,onComplete会被调用,如果说遇到了异常,那么onError会被调用,就不会调用onComplete
      这些方法其实都是Subscriber的方法,Subscriber是Flux的订阅者,配置订阅者如何消费以及消费的具体操作。
    Subscriber<String> subscriber = new Subscriber<String>() {
        //在订阅成功的时候,如何操作
        @Override
        public void onSubscribe(Subscription subscription) {
            //取最大数量的元素个数
            subscription.request(Long.MAX_VALUE);
        }
    
        //对于每个元素的操作
        @Override
        public void onNext(String o) {
            System.out.println(o);
        }
    
        //在发生错误的时候
        @Override
        public void onError(Throwable throwable) {
            log.error("error: {}", throwable.getMessage(), throwable);
        }
    
        //在完成的时候,发生错误不算完成
        @Override
        public void onComplete() {
            log.info("complete");
        }
    };
    
    Flux.just("test1", "test2", "test3")
        //打印详细流日志
        .log()
        //订阅消费
        .subscribe(subscriber);
    

    运行后,日志是:

    07:28:27.227 [main] INFO reactor.Flux.Array.2 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
    07:28:27.227 [main] INFO reactor.Flux.Array.2 - | request(unbounded)
    07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test1)
    test1
    07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test2)
    test2
    07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onNext(test3)
    test3
    07:28:27.228 [main] INFO reactor.Flux.Array.2 - | onComplete()
    07:28:27.235 [main] INFO com.test.TestMonoFlux - complete
    

    subscribe还有如下几个api:

    //在不需要消费,只需要启动Flux中间处理的话,用这个
    subscribe();
    //相当于:
    new Subscriber() {
        @Override
        public void onSubscribe(Subscription subscription) {
            //取最大数量的元素个数
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void onNext(Object o) {
        }
        @Override
        public void onError(Throwable throwable) {
        }
        @Override
        public void onComplete() {
        }
    };
    
    
    //指定消费者消费
    subscribe(Consumer<? super T> consumer); 
    //相当于:
    new Subscriber() {
        @Override
        public void onSubscribe(Subscription subscription) {
            //取最大数量的元素个数
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void onNext(Object o) {
            consumer.accept(o);
        }
        @Override
        public void onError(Throwable throwable) {
        }
        @Override
        public void onComplete() {
        }
    };
    
    
    //指定消费者,还有异常处理者
    subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer); 
    //相当于:
    new Subscriber() {
        @Override
        public void onSubscribe(Subscription subscription) {
            //取最大数量的元素个数
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void onNext(Object o) {
            consumer.accept(o);
        }
        @Override
        public void onError(Throwable throwable) {
            errorConsumer.accept(throwable);
        }
        @Override
        public void onComplete() {
        }
    };
    
    
    //指定消费者,异常处理着还有完成的时候的要执行的操作
    subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
    //相当于:
    new Subscriber() {
        @Override
        public void onSubscribe(Subscription subscription) {
            //取最大数量的元素个数
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void onNext(Object o) {
            consumer.accept(o);
        }
        @Override
        public void onError(Throwable throwable) {
            errorConsumer.accept(throwable);
        }
        @Override
        public void onComplete() {
            completeConsumer.run();
        }
    };
    
    //指定Subscriber所有需要的元素
    subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer); 
    //相当于:
    new Subscriber() {
        @Override
        public void onSubscribe(Subscription subscription) {
            subscriptionConsumer.accept(subscription);
        }
        @Override
        public void onNext(Object o) {
            consumer.accept(o);
        }
        @Override
        public void onError(Throwable throwable) {
            errorConsumer.accept(throwable);
        }
        @Override
        public void onComplete() {
            completeConsumer.run();
        }
    };
    

    这样,就和之前所说的Flow的设计对应起来了。

    展开全文
  • 本文要介绍的是另外一个新的反应式编程库 Reactor。 反应式编程介绍 反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的...

    反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎。在 Java 社区中比较流行的是 RxJava 和 RxJava 2。本文要介绍的是另外一个新的反应式编程库 Reactor。

    反应式编程介绍

    反应式编程来源于数据流和变化的传播,意味着由底层的执行模型负责通过数据流来自动传播变化。比如求值一个简单的表达式 c=a+b,当 a 或者 b 的值发生变化时,传统的编程范式需要对 a+b 进行重新计算来得到 c 的值。如果使用反应式编程,当 a 或者 b 的值发生变化时,c 的值会自动更新。反应式编程最早由 .NET 平台上的 Reactive Extensions (Rx) 库来实现。后来迁移到 Java 平台之后就产生了著名的 RxJava 库,并产生了很多其他编程语言上的对应实现。在这些实现的基础上产生了后来的反应式流(Reactive Streams)规范。该规范定义了反应式流的相关接口,并将集成到 Java 9 中。

    在传统的编程范式中,我们一般通过迭代器(Iterator)模式来遍历一个序列。这种遍历方式是由调用者来控制节奏的,采用的是拉的方式。每次由调用者通过 next()方法来获取序列中的下一个值。使用反应式流时采用的则是推的方式,即常见的发布者-订阅者模式。当发布者有新的数据产生时,这些数据会被推送到订阅者来进行处理。在反应式流上可以添加各种不同的操作来对数据进行处理,形成数据处理链。这个以声明式的方式添加的处理链只在订阅者进行订阅操作时才会真正执行。

    反应式流中第一个重要概念是负压(backpressure)。在基本的消息推送模式中,当消息发布者产生数据的速度过快时,会使得消息订阅者的处理速度无法跟上产生的速度,从而给订阅者造成很大的压力。当压力过大时,有可能造成订阅者本身的奔溃,所产生的级联效应甚至可能造成整个系统的瘫痪。负压的作用在于提供一种从订阅者到生产者的反馈渠道。订阅者可以通过 request()方法来声明其一次所能处理的消息数量,而生产者就只会产生相应数量的消息,直到下一次 request()方法调用。这实际上变成了推拉结合的模式。

    Reactor 简介

    前面提到的 RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava 2 在 RxJava 的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。

    在 Java 程序中使用 Reactor 库非常的简单,只需要通过 Maven 或 Gradle 来添加对 io.projectreactor:reactor-core 的依赖即可,目前的版本是 3.0.5.RELEASE。

    Flux 和 Mono

    Flux 和 Mono 是 Reactor 中的两个基本概念。Flux 表示的是包含 0 到 N 个元素的异步序列。在该序列中可以包含三种不同类型的消息通知:正常的包含元素的消息、序列结束的消息和序列出错的消息。当消息通知产生时,订阅者中对应的方法 onNext(), onComplete()和 onError()会被调用。Mono 表示的是包含 0 或者 1 个元素的异步序列。该序列中同样可以包含与 Flux 相同的三种类型的消息通知。Flux 和 Mono 之间可以进行转换。对一个 Flux 序列进行计数操作,得到的结果是一个 Mono<Long>对象。把两个 Mono 序列合并在一起,得到的是一个 Flux 对象。

    创建 Flux

    有多种不同的方式可以创建 Flux 序列。

    Flux 类的静态方法

    第一种方式是通过 Flux 类中的静态方法。

     

    代码清单 1 中给出了上述这些方法的使用示例。

    清单 1. 通过 Flux 类的静态方法创建 Flux 序列

    1

    2

    3

    4

    5

    6

    Flux.just("Hello", "World").subscribe(System.out::println);

    Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

    Flux.empty().subscribe(System.out::println);

    Flux.range(1, 10).subscribe(System.out::println);

    Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

    Flux.intervalMillis(1000).subscribe(System.out::println);

     

    • just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
    • fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
    • empty():创建一个不包含任何元素,只发布结束消息的序列。
    • error(Throwable error):创建一个只包含错误消息的序列。
    • never():创建一个不包含任何消息通知的序列。
    • range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
    • interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
    • intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。

    代码清单 1 中给出了上述这些方法的使用示例。

    清单 1. 通过 Flux 类的静态方法创建 Flux 序列

    1

    2

    3

    4

    5

    6

    Flux.just("Hello", "World").subscribe(System.out::println);

    Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

    Flux.empty().subscribe(System.out::println);

    Flux.range(1, 10).subscribe(System.out::println);

    Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

    Flux.intervalMillis(1000).subscribe(System.out::println);

    上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。

    generate()方法

    generate()方法通过同步和逐一的方式来产生 Flux 序列。序列的产生是通过调用所提供的 SynchronousSink 对象的 next(),complete()和 error(Throwable)方法来完成的。逐一生成的含义是在具体的生成逻辑中,next()方法只能最多被调用一次。在有些情况下,序列的生成可能是有状态的,需要用到某些状态对象。此时可以使用 generate()方法的另外一种形式 generate(Callable<S> stateSupplier, BiFunction<S,SynchronousSink<T>,S> generator),其中 stateSupplier 用来提供初始的状态对象。在进行序列生成时,状态对象会作为 generator 使用的第一个参数传入,可以在对应的逻辑中对该状态对象进行修改以供下一次生成时使用。

    在代码清单 2中,第一个序列的生成逻辑中通过 next()方法产生一个简单的值,然后通过 complete()方法来结束该序列。如果不调用 complete()方法,所产生的是一个无限序列。第二个序列的生成逻辑中的状态对象是一个 ArrayList 对象。实际产生的值是一个随机数。产生的随机数被添加到 ArrayList 中。当产生了 10 个数时,通过 complete()方法来结束序列。

    清单 2. 使用 generate()方法生成 Flux 序列

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    Flux.generate(sink -> {

        sink.next("Hello");

        sink.complete();

    }).subscribe(System.out::println);

     

     

    final Random random = new Random();

    Flux.generate(ArrayList::new, (list, sink) -> {

        int value = random.nextInt(100);

        list.add(value);

        sink.next(value);

        if (list.size() == 10) {

            sink.complete();

        }

        return list;

    }).subscribe(System.out::println);

    create()方法

    create()方法与 generate()方法的不同之处在于所使用的是 FluxSink 对象。FluxSink 支持同步和异步的消息产生,并且可以在一次调用中产生多个元素。在代码清单 3 中,在一次调用中就产生了全部的 10 个元素。

    清单 3. 使用 create()方法生成 Flux 序列

    1

    2

    3

    4

    5

    6

    Flux.create(sink -> {

        for (int i = 0; i < 10; i++) {

            sink.next(i);

        }

        sink.complete();

    }).subscribe(System.out::println);

    创建 Mono

    Mono 的创建方式与之前介绍的 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。除了这些方法之外,Mono 还有一些独有的静态方法。

    fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。 delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。 ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。 justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。

    还可以通过 create()方法来使用 MonoSink 来创建 Mono。代码清单 4 中给出了创建 Mono 序列的示例。

    清单 4. 创建 Mono 序列

    1

    2

    3

    Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

    Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

    Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

    操作符

     

    和 RxJava 一样,Reactor 的强大之处在于可以在反应式流上通过声明式的方式添加多种不同的操作符。下面对其中重要的操作符进行分类介绍。

    buffer 和 bufferTimeout

    这两个操作符的作用是把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。在进行收集时可以指定不同的条件:所包含的元素的最大数量或收集的时间间隔。方法 buffer()仅使用一个条件,而 bufferTimeout()可以同时指定两个条件。指定时间间隔时可以使用 Duration 对象或毫秒数,即使用 bufferMillis()或 bufferTimeoutMillis()两个方法。

    除了元素数量和时间间隔之外,还可以通过 bufferUntil 和 bufferWhile 操作符来进行收集。这两个操作符的参数是表示每个集合中的元素所要满足的条件的 Predicate 对象。bufferUntil 会一直收集直到 Predicate 返回为 true。使得 Predicate 返回 true 的那个元素可以选择添加到当前集合或下一个集合中;bufferWhile 则只有当 Predicate 返回 true 时才会收集。一旦值为 false,会立即开始下一次收集。

    代码清单 5 给出了 buffer 相关操作符的使用示例。第一行语句输出的是 5 个包含 20 个元素的数组;第二行语句输出的是 2 个包含了 10 个元素的数组;第三行语句输出的是 5 个包含 2 个元素的数组。每当遇到一个偶数就会结束当前的收集;第四行语句输出的是 5 个包含 1 个元素的数组,数组里面包含的只有偶数。

    需要注意的是,在代码清单 5 中,首先通过 toStream()方法把 Flux 序列转换成 Java 8 中的 Stream 对象,再通过 forEach()方法来进行输出。这是因为序列的生成是异步的,而转换成 Stream 对象可以保证主线程在序列生成完成之前不会退出,从而可以正确地输出序列中的所有元素。

    清单 5. buffer 相关操作符的使用示例

    1

    2

    3

    4

    Flux.range(1, 100).buffer(20).subscribe(System.out::println);

    Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

    Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

    Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

    filter

    对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素。代码清单 6 中的语句输出的是 1 到 10 中的所有偶数。

    清单 6. filter 操作符使用示例

    1

    Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

    window

    window 操作符的作用类似于 buffer,所不同的是 window 操作符是把当前流中的元素收集到另外的 Flux 序列中,因此返回值类型是 Flux<Flux<T>>。在代码清单 7 中,两行语句的输出结果分别是 5 个和 2 个 UnicastProcessor 字符。这是因为 window 操作符所产生的流中包含的是 UnicastProcessor 类的对象,而 UnicastProcessor 类的 toString 方法输出的就是 UnicastProcessor 字符。

    清单 7. window 操作符使用示例

    1

    2

    Flux.range(1, 100).window(20).subscribe(System.out::println);

    Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

     

    zipWith

    zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为 Tuple2 的流;也可以通过一个 BiFunction 函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

    在代码清单 8 中,两个流中包含的元素分别是 a,b 和 c,d。第一个 zipWith 操作符没有使用合并函数,因此结果流中的元素类型为 Tuple2;第二个 zipWith 操作通过合并函数把元素类型变为 String。

    清单 8. zipWith 操作符使用示例

    1

    2

    3

    4

    5

    6

    Flux.just("a", "b")

            .zipWith(Flux.just("c", "d"))

            .subscribe(System.out::println);

    Flux.just("a", "b")

            .zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

            .subscribe(System.out::println);

    take

    take 系列操作符用来从当前流中提取元素。提取的方式可以有很多种。

    • take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的数量或时间间隔来提取。
    • takeLast(long n):提取流中的最后 N 个元素。
    • takeUntil(Predicate<? super T> predicate):提取元素直到 Predicate 返回 true。
    • takeWhile(Predicate<? super T> continuePredicate): 当 Predicate 返回 true 时才进行提取。
    • takeUntilOther(Publisher<?> other):提取元素直到另外一个流开始产生元素。

    在代码清单 9 中,第一行语句输出的是数字 1 到 10;第二行语句输出的是数字 991 到 1000;第三行语句输出的是数字 1 到 9;第四行语句输出的是数字 1 到 10,使得 Predicate 返回 true 的元素也是包含在内的。

    清单 9. take 系列操作符使用示例

    1

    2

    3

    4

    Flux.range(1, 1000).take(10).subscribe(System.out::println);

    Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

    Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);

    Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

    reduce 和 reduceWith

    reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。

    在代码清单 10 中,第一行语句对流中的元素进行相加操作,结果为 5050;第二行语句同样也是进行相加操作,不过通过一个 Supplier 给出了初始值为 100,所以结果为 5150。

    清单 10. reduce 和 reduceWith 操作符使用示例

    1

    2

    Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

    Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

    merge 和 mergeSequential

    merge 和 mergeSequential 操作符用来把多个流合并成一个 Flux 序列。不同之处在于 merge 按照所有流中元素的实际产生顺序来合并,而 mergeSequential 则按照所有流被订阅的顺序,以流为单位进行合并。

    代码清单 11 中分别使用了 merge 和 mergeSequential 操作符。进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起;而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素。

    清单 11. merge 和 mergeSequential 操作符使用示例

    1

    2

    3

    4

    5

    6

    Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

            .toStream()

            .forEach(System.out::println);

    Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

            .toStream()

            .forEach(System.out::println);

    flatMap 和 flatMapSequential

    flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。

    在代码清单 12 中,流中的元素被转换成每隔 100 毫秒产生的数量不同的流,再进行合并。由于第一个流中包含的元素数量较少,所以在结果流中一开始是两个流的元素交织在一起,然后就只有第二个流中的元素。

    清单 12. flatMap 操作符使用示例

    1

    2

    3

    4

    Flux.just(5, 10)

            .flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

            .toStream()

            .forEach(System.out::println);

    concatMap

    concatMap 操作符的作用也是把流中的每个元素转换成一个流,再把所有流进行合并。与 flatMap 不同的是,concatMap 会根据原始流中的元素顺序依次把转换之后的流进行合并;与 flatMapSequential 不同的是,concatMap 对转换之后的流的订阅是动态进行的,而 flatMapSequential 在合并之前就已经订阅了所有的流。

    代码清单 13 与代码清单 12 类似,只不过把 flatMap 换成了 concatMap,结果流中依次包含了第一个流和第二个流中的全部元素。

    清单 13. concatMap 操作符使用示例

    1

    2

    3

    4

    Flux.just(5, 10)

            .concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

            .toStream()

            .forEach(System.out::println);

    combineLatest

    combineLatest 操作符把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素。只要其中任何一个流中产生了新的元素,合并操作就会被执行一次,结果流中就会产生新的元素。在 代码清单 14 中,流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String。

    清单 14. combineLatest 操作符使用示例

    1

    2

    3

    4

    5

    Flux.combineLatest(

            Arrays::toString,

            Flux.intervalMillis(100).take(5),

            Flux.intervalMillis(50, 100).take(5)

    ).toStream().forEach(System.out::println);

    消息处理

    当需要处理 Flux 或 Mono 中的消息时,如之前的代码清单所示,可以通过 subscribe 方法来添加相应的订阅逻辑。在调用 subscribe 方法时可以指定需要处理的消息类型。可以只处理其中包含的正常消息,也可以同时处理错误消息和完成消息。代码清单 15 中通过 subscribe()方法同时处理了正常消息和错误消息。

    清单 15. 通过 subscribe()方法处理正常和错误消息

    1

    2

    3

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .subscribe(System.out::println, System.err::println);

    正常的消息处理相对简单。当出现错误时,有多种不同的处理策略。第一种策略是通过 onErrorReturn()方法返回一个默认值。在代码清单 16 中,当出现错误时,流会产生默认值 0.

    清单 16. 出现错误时返回默认值

    1

    2

    3

    4

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .onErrorReturn(0)

            .subscribe(System.out::println);

    第二种策略是通过 switchOnError()方法来使用另外的流来产生元素。在代码清单 17 中,当出现错误时,将产生 Mono.just(0)对应的流,也就是数字 0。

    清单 17. 出现错误时使用另外的流

    1

    2

    3

    4

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .switchOnError(Mono.just(0))

            .subscribe(System.out::println);

    第三种策略是通过 onErrorResumeWith()方法来根据不同的异常类型来选择要使用的产生元素的流。在代码清单 18 中,根据异常类型来返回不同的流作为出现错误时的数据来源。因为异常的类型为 IllegalArgumentException,所产生的元素为-1。

    清单 18. 出现错误时根据异常类型来选择流

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalArgumentException()))

            .onErrorResumeWith(e -> {

                if (e instanceof IllegalStateException) {

                    return Mono.just(0);

                } else if (e instanceof IllegalArgumentException) {

                    return Mono.just(-1);

                }

                return Mono.empty();

            })

            .subscribe(System.out::println);

    当出现错误时,还可以通过 retry 操作符来进行重试。重试的动作是通过重新订阅序列来实现的。在使用 retry 操作符时可以指定重试的次数。代码清单 19 中指定了重试次数为 1,所输出的结果是 1,2,1,2 和错误信息。

    清单 19. 使用 retry 操作符进行重试

    1

    2

    3

    4

    Flux.just(1, 2)

            .concatWith(Mono.error(new IllegalStateException()))

            .retry(1)

            .subscribe(System.out::println);

    调度器

    前面介绍了反应式流和在其上可以进行的各种操作,通过调度器(Scheduler)可以指定这些操作执行的方式和所在的线程。有下面几种不同的调度器实现。

    • 当前线程,通过 Schedulers.immediate()方法来创建。
    • 单一的可复用的线程,通过 Schedulers.single()方法来创建。
    • 使用弹性的线程池,通过 Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时,新的线程会被创建。如果一个线程闲置太长时间,则会被销毁。该调度器适用于 I/O 操作相关的流的处理。
    • 使用对并行操作优化的线程池,通过 Schedulers.parallel()方法来创建。其中的线程数量取决于 CPU 的核的数量。该调度器适用于计算密集型的流的处理。
    • 使用支持任务调度的调度器,通过 Schedulers.timer()方法来创建。
    • 从已有的 ExecutorService 对象中创建调度器,通过 Schedulers.fromExecutorService()方法来创建。

    某些操作符默认就已经使用了特定类型的调度器。比如 intervalMillis()方法创建的流就使用了由 Schedulers.timer()创建的调度器。通过 publishOn()和 subscribeOn()方法可以切换执行操作的调度器。其中 publishOn()方法切换的是操作符的执行方式,而 subscribeOn()方法切换的是产生流中元素时的执行方式。

    在代码清单 20 中,使用 create()方法创建一个新的 Flux 对象,其中包含唯一的元素是当前线程的名称。接着是两对 publishOn()和 map()方法,其作用是先切换执行时的调度器,再把当前的线程名称作为前缀添加。最后通过 subscribeOn()方法来改变流产生时的执行方式。运行之后的结果是[elastic-2] [single-1] parallel-1。最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操作之前的 Schedulers.single()调度器,最外层的线程名字 elastic-2 来自第二个 map 操作之前的 Schedulers.elastic()调度器。

    清单 20. 使用调度器切换操作符执行方式

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    Flux.create(sink -> {

        sink.next(Thread.currentThread().getName());

        sink.complete();

    })

    .publishOn(Schedulers.single())

    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

    .publishOn(Schedulers.elastic())

    .map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

    .subscribeOn(Schedulers.parallel())

    .toStream()

    .forEach(System.out::println);

    测试

    在对使用 Reactor 的代码进行测试时,需要用到 io.projectreactor.addons:reactor-test 库。

    使用 StepVerifier

    进行测试时的一个典型的场景是对于一个序列,验证其中所包含的元素是否符合预期。StepVerifier 的作用是可以对序列中包含的元素进行逐一验证。在代码清单 21 中,需要验证的流中包含 a 和 b 两个元素。通过 StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而 verifyComplete()方法则验证流是否正常结束。类似的方法还有 verifyError()来验证流由于错误而终止。

    清单 21. 使用 StepVerifier 验证流中的元素

    1

    2

    3

    4

    StepVerifier.create(Flux.just("a", "b"))

            .expectNext("a")

            .expectNext("b")

            .verifyComplete();

    操作测试时间

    有些序列的生成是有时间要求的,比如每隔 1 分钟才产生一个新的元素。在进行测试中,不可能花费实际的时间来等待每个元素的生成。此时需要用到 StepVerifier 提供的虚拟时间功能。通过 StepVerifier.withVirtualTime()方法可以创建出使用虚拟时钟的 StepVerifier。通过 thenAwait(Duration)方法可以让虚拟时钟前进。

    在代码清单 22 中,需要验证的流中包含两个产生间隔为一天的元素,并且第一个元素的产生延迟是 4 个小时。在通过 StepVerifier.withVirtualTime()方法包装流之后,expectNoEvent()方法用来验证在 4 个小时之内没有任何消息产生,然后验证第一个元素 0 产生;接着 thenAwait()方法来让虚拟时钟前进一天,然后验证第二个元素 1 产生;最后验证流正常结束。

    清单 22. 操作测试时间

    1

    2

    3

    4

    5

    6

    7

    StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))

            .expectSubscription()

            .expectNoEvent(Duration.ofHours(4))

            .expectNext(0L)

            .thenAwait(Duration.ofDays(1))

            .expectNext(1L)

            .verifyComplete();

    使用 TestPublisher

    TestPublisher 的作用在于可以控制流中元素的产生,甚至是违反反应流规范的情况。在代码清单 23 中,通过 create()方法创建一个新的 TestPublisher 对象,然后使用 next()方法来产生元素,使用 complete()方法来结束流。TestPublisher 主要用来测试开发人员自己创建的操作符。

    清单 23. 使用 TestPublisher 创建测试所用的流

    1

    2

    3

    4

    5

    6

    7

    8

    9

    final TestPublisher<String> testPublisher = TestPublisher.create();

    testPublisher.next("a");

    testPublisher.next("b");

    testPublisher.complete();

     

    StepVerifier.create(testPublisher)

            .expectNext("a")

            .expectNext("b")

            .expectComplete();

    调试

    由于反应式编程范式与传统编程范式的差异性,使用 Reactor 编写的代码在出现问题时比较难进行调试。为了更好的帮助开发人员进行调试,Reactor 提供了相应的辅助功能。

    启用调试模式

    当需要获取更多与流相关的执行信息时,可以在程序开始的地方添加代码清单 24 中的代码来启用调试模式。在调试模式启用之后,所有的操作符在执行时都会保存额外的与执行链相关的信息。当出现错误时,这些信息会被作为异常堆栈信息的一部分输出。通过这些信息可以分析出具体是在哪个操作符的执行中出现了问题。

    清单 24. 启用调试模式

    1

    Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

    不过当调试模式启用之后,记录这些额外的信息是有代价的。一般只有在出现了错误之后,再考虑启用调试模式。但是当为了找到问题而启用了调试模式之后,之前的错误不一定能很容易重现出来。为了减少可能的开销,可以限制只对特定类型的操作符启用调试模式。

    使用检查点

    另外一种做法是通过 checkpoint 操作符来对特定的流处理链来启用调试模式。代码清单 25 中,在 map 操作符之后添加了一个名为 test 的检查点。当出现错误时,检查点名称会出现在异常堆栈信息中。对于程序中重要或者复杂的流处理链,可以在关键的位置上启用检查点来帮助定位可能存在的问题。

    清单 25. 使用 checkpoint 操作符

    1

    Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

    日志记录

    在开发和调试中的另外一项实用功能是把流相关的事件记录在日志中。这可以通过添加 log 操作符来实现。在代码清单 26 中,添加了 log 操作符并指定了日志分类的名称。

    清单 26. 使用 log 操作符记录事件

    1

    Flux.range(1, 2).log("Range").subscribe(System.out::println);

    在实际的运行时,所产生的输出如代码清单 27 所示。

    清单 27. log 操作符所产生的日志

    1

    2

    3

    4

    5

    6

    7

    8

    13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

    13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

    13:07:56.753 [main] INFO Range - | request(unbounded)

    13:07:56.754 [main] INFO Range - | onNext(1)

    1

    13:07:56.754 [main] INFO Range - | onNext(2)

    2

    13:07:56.754 [main] INFO Range - | onComplete()

    “冷”与“热”序列

    之前的代码清单中所创建的都是冷序列。冷序列的含义是不论订阅者在何时订阅该序列,总是能收到序列中产生的全部消息。而与之对应的热序列,则是在持续不断地产生消息,订阅者只能获取到在其订阅之后产生的消息。

    在代码清单 28 中,原始的序列中包含 10 个间隔为 1 秒的元素。通过 publish()方法把一个 Flux 对象转换成 ConnectableFlux 对象。方法 autoConnect()的作用是当 ConnectableFlux 对象有一个订阅者时就开始产生消息。代码 source.subscribe()的作用是订阅该 ConnectableFlux 对象,让其开始产生数据。接着当前线程睡眠 5 秒钟,第二个订阅者此时只能获得到该序列中的后 5 个元素,因此所输出的是数字 5 到 9。

    清单 28. 热序列

    1

    2

    3

    4

    5

    6

    7

    8

    9

    final Flux<Long> source = Flux.intervalMillis(1000)

            .take(10)

            .publish()

            .autoConnect();

    source.subscribe();

    Thread.sleep(5000);

    source

            .toStream()

            .forEach(System.out::println);

    结束语

    反应式编程范式对于习惯了传统编程范式的开发人员来说,既是一个需要进行思维方式转变的挑战,也是一个充满了更多可能的机会。Reactor 作为一个基于反应式流规范的新的 Java 库,可以作为反应式应用的基础。本文对 Reactor 库做了详细的介绍,包括 Flux 和 Mono 序列的创建、常用操作符的使用、调度器、错误处理以及测试和调试技巧等。

    参考资源

    下载资源

    (reactor-source.zip | 8.84KB)

     

     

     

    展开全文
  • Reactor Mono和Flux 进行反应式编程详解

    万次阅读 2019-12-24 11:08:16
    官网:https://projectreactor.io/ 教程:... Reactor的类型 Reactor有两种类型,Flux<T>和Mono<T>。Flux类似RaxJava的Observable,它可以触发零到多个事件,并根据实际情况结束...
  • 使用Reactor进行反应式编程 反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎。在 Java 社区中比较流行的是 RxJava 和 RxJava 2。本文要介绍的是另外一个新的反应式编程库 Reactor。 ...
  • 1.1 反应器(Reactor)模式 1.1.1 什么是Reactor模式 Reactor模式首先是事件驱动的, 有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers; 这个Service Handler会同步的将输入的请求(Event)...
  • 官方文档:Reactor 3 Reference Guide 后边引入了Reactive Programming概念,出现了Rxjava,Reactor等,其实他们都是用的同一个api的,也就是Reactor-streams。spring5 webFlux 其实就是用的Reactor的API,所以本文...
  • Reactor 响应式编程

    2018-04-22 21:16:00
     响应式编程是一种编程范式,如果你了解事件编程,观察者模式,理解这个概率并不难,响应式编程宣言包含了四组关键字: Responsive: 可响应的。要求系统尽可能做到在任何时候都能及时响应。 Resilient: 可恢复...
  • Reactor Proactor模型 epoll模型

    千次阅读 2016-07-18 14:55:44
    Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的. 在Reactor模式中,事件分离者等待某个事件或者可应用或个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个 事件传...
  • 调试Reactive Streams可能是我们开始使用这些数据结构后必须面对的主要挑战之一。 考虑到Reactive Streams在过去几年中越来越受欢迎,了解我们如何有效地执行此任务是个好主意。 让我们首先使用Reactive Streams设置...
  •  https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html https://coyee.com/article/12086-spring-5-reactive-web ...
  • # ZMQ 第三章 高级请求-应答模式 在第二章中我们通过开发一系列的小应用来熟悉ØMQ的基本使用方法,每个应用会引入一些新的特性。本章会沿用这种方式,来探索更多建立在ØMQ请求-应答模式之上的高级工作模式。本章...
  • 它只需要简单配置不同的参数,便可启用不同的 Reactor 线程模型,而且无需变更其他的代码,很大程度上降低了用户开发和调试的成本。 Channel 初始化 设置 Channel 类型 NIO 模型是 Netty 中最成熟且被广泛使用的模型...
  • project-reactor提供了大量操作符操作Flux和Mono对象 文章目录1 转换操作符1.1. buffer1.2 map(映射)和flatMap1.3 window2 过滤操作符3 组合操作符4 条件操作符5 数学操作符6 observable工具操作符6.1. subscribe...
  • spring-boot-starter-webflux 里面包含了WebFlux框架, 同时还有默认的Web Engine: Reactor Netty (spring-boot-starter-reactor-netty). Reactive data support Spring Boot 2.0为以下数据存储提供自动配置: ...
  • 【进大厂必学】3W字180张图学习Linux基础总结

    千次阅读 多人点赞 2021-05-22 10:39:50
    BOOTPROTO = static,表示启用了静态 IP 地址,默认为 none NAME = eth0,与网卡配置文件对应的一个标签,如果为 eth0 则配置文件是ifcfg-eth0 UUID = xxxx,网卡唯一设备标识,系统自动得生成 DEVICE = XXX,网卡...
  • 前言: 如果以任务开发为目的,则按照上篇的内容直接去开发以及根据指南去操作springboot开发就可以。如果想要参考一些开源的集成例子可以参考这个GitHub库的内容,如果不想看如何使用,那么可以直接看第四天的内容...
  • 重磅 Spring Boot 2.1.4 正式版发布!

    千次阅读 2019-04-04 17:05:33
    调试模式不记录与Web和SQL相关的记录器#16018 使用Maven构建的胖jar不会将META-INF / .kotlin_module文件重新打包到BOOT-INF / classes#16004 仅Gradle POM依赖项导致jar应用程序加载程序失败#16001 在...
  • 一、 python语法 1. 请说一下你对迭代器和生成器的区别? 2. 什么是线程安全? 3. 你所遵循的代码规范是什么?请举例说明其要求? 4. Python中怎么简单的实现列表去重? 5. python 中 yield 的用法?...8....
  • 理解Spring Boot

    万次阅读 2016-02-16 10:16:55
    理解Spring Boot ...要以调试模式启动应用程序,可以使用-Ddebug标识,或者在application.properties文件这添加属性debug= true。 五、调试Spring Boot自动配置 Spring Boot的官方文档( ...
  • 通过@EnableAutoConfiguration启用Spring应用程序上下文的自动配置,这个注解会导入一个EnableAutoConfigurationImportSelector的类,而这个类会去读取一个spring.factories下key为EnableAutoConfiguration对应的全限...
  • -I + #将IDA设置为即时调试器(0禁用,1启用) -L + #### 指定log的文件名 -M 禁用鼠标(仅限文本) -O + #### 传递给插件的选项 -o + #### 指定输出数据库(隐含-c) -p + #### 指定处理器类型 -P+ 压缩数据库...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 535
精华内容 214
关键字:

启用reactor调试模式