• Java Reactive 异步与并发编程【摘要】Reactive 编程在多数人眼中是异步、并发的“银弹/神器”。本文分析了Reactive 执行原理,说明 Reactive 编程是数据驱动的,而不是“事件”驱动的。Reactive 编程分为数据源准备...

    Java Reactive 异步与并发编程

    【摘要】Reactive 编程在多数人眼中是异步、并发的“银弹/神器”。本文分析了Reactive 执行原理,说明 Reactive 编程是数据驱动的,而不是“事件”驱动的。Reactive 编程分为数据源准备、数据流建模、调度者分配三个基本设计步骤,才能实现异步并发执行。最后,我们给出基于数据流图的计算模型的设计与编程方案。

    大数据和云计算(云服务)让 Reactive 编程成为新一代的编程神器。尽管 Reactive 编程模型大大简化了异步与并发编程的难度,但绝不是低门槛的。它首先需要你改变传统顺序处理的计算模式,建立面向数据流的计算模型;然后,需要有强大的线程、协程等并发知识,才能编写出 safe 的应用;再者还需要一些函数式编程的知识,如 Lambda、闭包等。本文努力描述响应式编程需要的最基础的知识,并用一些案例,让你体验 Reactive 编程的神奇与优雅。

    1、准备知识

    Java Reactive 编程使用 RxJava 库,虽然可以兼容 Java 5 编程,但你会失去 Java 8 给你带来的便利,如 Lambda 表达式,CompleteableFuture 等异步特性的支持。关键是没有 Lambda 函数,Reactive Java 程序几乎无法阅读!

    1.1 编程环境配置

    1. 项目文件配置

    maven 配置文件 pom.xml 需要

        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2. IDEA 设置

    Intellij IDEA 要设置三处位置,且要求版本一致,否则就有出错提示,例如:“source Release 8 Requires Target Release 1.8…”

    • File -> Project Structure -> Project
    • File -> Project Structure -> Modules
    • File -> Setting -> Build -> Compiler -> Java Compiler

    设置如图

    1.2 Lambda 表达式

    现在,不支持 Lambda 表达式的语言真不多。 在 Java 中,它主要作为单一方法的接口匿名实现。例如:

    public class TestLambda {
        public static void main(String[] args) {
    
            System.out.println("=== RunnableTest ===");
    
            // Anonymous Runnable
            Runnable r1 = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Hello world one!");
                }
            };
    
            // Lambda Runnable
            Runnable r2 = () -> System.out.println("Hello world two!");
    
            // Run em!
            r1.run();
            r2.run();
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    Lambda 表达式 的语法,例如:

    Argument ListArrow TokenBody
    (int x, int y)->x + y

    官方教程: Java SE 8: Lambda Quick Start

    2、Future<V> 与多线程编程

    Future<V> 是一个泛型接口,如果一个可运行的函数(实现 Callable 或 Runable 的类)在一个线程中运行,利用 Future<V> 可以用它的 get() 方法返回 V 类型的结果。 注意, get() 会阻塞当前线程。例如:

    public class TestFuture {
    
        // https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
        static ExecutorService executor = Executors.newCachedThreadPool();
    
        public void testTaskRunning(String name, Integer t) {
            System.out.println("Prepare for execution:" + name);
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            // using lambda may cause 10X time then Callable
    //        Future<String> fa = executor.submit(
    //                new Callable<String>() {
    //                    @Override
    //                    public String call() throws Exception {
    //                        try {
    //                            Thread.sleep(t);
    //                        } catch (Exception e) {
    //                            e.printStackTrace();
    //                        }
    //                        return String.format("service exec time: %d", t);
    //                    }
    //                }
    //        );
    
            Future<String> fa =  executor.submit(
                    () -> {
                            try {
                                Thread.sleep(t);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            return String.format("service exec time: %d", t);
                    });
    
            long endTime = System.currentTimeMillis();
            System.out.println("Start execute: " + (endTime - startTime) + "ms");
    
            try {
                String s = fa.get(); //Future to Blocked
                System.out.println(s);
            } catch (
                    Exception e)
            {
                e.printStackTrace();
            }
    
            endTime = System.currentTimeMillis(); //
            System.out.println("End execute: " + (endTime - startTime) + "ms");
    
        }
    
        public void testAsyncTaskRunning() {
            System.out.println("Prepare for execution: composite task" );
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            Future<String> fa = executor.submit(new TimeConsumingService("fa",200,new String[]{}));
            Future<String> fb = executor.submit(new TimeConsumingService("fb",400,new String[]{}));
    
            System.out.println("Start execute: " + (System.currentTimeMillis() - startTime) + "ms");
    
            try {
                // What will happen when change line fc and fd ?
                Future<String> fc = executor.submit(new TimeConsumingService("fc",400,new String[]{fa.get()}));
                Future<String> fd = executor.submit(new TimeConsumingService("fd",200,new String[]{fb.get()}));
                Future<String> fe = executor.submit(new TimeConsumingService("fe",200,new String[]{fb.get()}));
                fc.get(); fd.get(); fe.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            System.out.println("End execute: " + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        public static void main(String[] args) {
            TestFuture test = new TestFuture();
            test.testTaskRunning("fa", 300);
            //test.testAsyncTaskRunning();
            System.out.println("sssssssssssssssssss");
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • static ExecutorService executor = Executors.newCachedThreadPool(); 
      • Executors 返回一个线程模型,用于发射线程; 
        • newCachedThreadPool() 创建缓存线程池,常用于管理 IO 阻塞型线程
        • newFixedThreadPool(int nThreads) 创建管理计算需求的线程池,n=cpu*2;
        • newSingleThreadExecutor() 创建单个线程
        • … …
      • ExecutorService 管理发射(启动)、停止线程等操作 
        • submit 用于将实现 Callable 或 Runable 接口的对象,启动线程;
        • 它返回 Future<V> 接口,方便管理线程和获取结果
    • testTaskRunning() 
      • executor.submit 启动 Callable 接口实现,或 Lambda 函数为一个线程 
        • Lambda 准备需要远常于接口的时间
    • testAsyncTaskRunning() 
      • 在线程池中执行一群耗时的函数(实现 Callable的类)
      • TimeConsumingService 是模拟服务对象,它有名字、计算时间。代码见后面

    问题

    1. get() 顺序会影响出结果时间,关键 get 的阻塞;
    2. 如果能按这些线程出结果的时间序列,把数据结果交给后面的线程并行加工处理,CPU就不用阻塞在 get() 了。但编程无疑会很复杂。

    3、Reactive(响应式)编程

    大到大数据处理(如spark),小到 android界面事件流的处理,Reactive(响应式)编程以成为最流行的标配。然而它是如何工作的呢? 如何正确编写 Reactive 程序?

    3.1 响应式编程基本原理

    响应式编程是一种按流水线模式组织处理数据流的协程计算模型(实在找不到合适的定义)。一个流水线有输入源、工人(函数)、流水结构、调度员、输出构成。假设一个简单的线性(顺序处理)流水线,如图:

    (s0--S1----s2-s3---|)  [alpha]  ...  [beta]  ...  [theta]  (o0---o2------o2o4-)
                          |                                   |
                          |                                   |
                          +------------scheduler--------------+
    • 1
    • 2
    • 3
    • 4

    其中 :

    *(d0–d1—–d2–|)表示带终止标志的时序数据流; 
    * [worker] 表示一个操作函数,它只能从前道工序取数据,加工处理后交给下一个 worker; 
    * … 表示两个工人之间的存放数据的缓存队列(EndPoint); 
    * scheduler 是管理工人干活的调度员,一个调度员能仅能一次指挥一个工人,且工人一次只处理一个数据,要么成功,要么失败(异常)。

    例如:

        public static void hello(Integer... integers) {
            Observable<Integer> workflow = Observable.from(integers)
                    .filter( i -> (i < 10) && (i > 0))
                    .map( i -> i *2);
            workflow.subscribe(i -> System.out.print(i + "! ") );
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这是典型的链式函数式处理计算模式。输入与输出分别是:

    hello(-1,4,5,0,2,19,6);
    
    8! 10! 4! 12! 
    • 1
    • 2
    • 3

    关键不是结果,而是处理过程。第一条语句仅是搭建了一个框架,声明了数据源和两个操作工人 filter,map,它们各自使用定义的函数干活。最重要的工人(subscribe)出场了,它开动了流水线, scheduler 开始指挥工人干活,注意工人轮流上场(不是并行),直到遇到结果符号且每个工人无活可干,流水线关闭。

    这个程序是单线程程序(一个 scheduler),但是 filter,map,scheduler 干活的顺序是不可预知的,如果他们取数据的 EndPoint有数据,则可以被 scheduler 调度的。一个工人被调度一次,则是获得了CPU,直到函数操作完成,释放 CPU。这是标准的协程(Coroutine)概念。

    响应式编程意思就是根据 EndPoint 中的数据启动对应数据处理函数,是函数之间异步执行(链式反应)的过程,这对于减少程序阻塞、减低线程开销,特别是不支持多线程的nodejs,javascript,python等具有特别的意义。

    因此,Reactive 编程是数据驱动的编程,在数据驱动的模型上建立并发处理机制,需要引入多个调度者。

    千万不要认为响应式编程是并发的,建议你有时间时, 务必仔细阅读 RxJava Threading Examples 。理论上,Reactive 是数据驱动的,而不是事件驱动的。

    响应式编程不是真正意义上的并发,由于每个调度器是一个线程,它管理的操作函数之间一般都不需要对数据上锁,这些函数是按数据驱动,“并发运行”的。

    Reactive 要点

    1. Reactive 编程开始是流程建模过程,而不是数据处理过程
    2. 工作流由 subscribe 启动,subscribe 多次就会导致多次启动流程。这与普通语言按顺序执行不一样
    3. scheduler 是一个单线程的调度器,每个scheduler中管理的函数都是顺序执行的,阻塞函数会阻塞流水线工作。
    4. 工作函数是响应式的,观察者模式仅是实现数据驱动的技术概念,每个数据流,就称为 “Observable” 的对象。所以,到了 2.x 就改称 “Flowable” 了

    3.2 RxJava 入门

    RxJava 是 Reactive eXtend for Java VM 的缩写。Reactive eXtend 支持许多语言,支持 Groovy, Clojure, JRuby, Kotlin 和 Scala 等现代语言。官方网站: 
    https://github.com/ReactiveX/RxJava/wiki

    1. Maven 依赖

            <dependency>
                <groupId>io.reactivex</groupId>
                <artifactId>rxjava</artifactId>
                <version>1.2.9</version>
            </dependency>
    • 1
    • 2
    • 3
    • 4
    • 5

    个人认为 RxJava 1.x 比较合适入门,2.x 抽象程度高,较难入门。

    2. Hello world

        public static void hello(String... names) {
            Observable.from(names).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("Completed!");
                }
    
                @Override
                public void onError(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void onNext(String strings) {
                    System.out.println("same hello " + strings);
                }
    
            });
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    编程步骤

    1. 响应式编程第一步就是创建数据源。简单就是 Observable.from 从一个集合中取数,或者 Observable.just 直接取数,例如 Observable.just(1,2);
    2. 用操作函数搭建数据流处理流水线(workflow),即使用操作数据变换(transform) 定义工作流。官方有几百个操作或函数???,学习难度有点大!!!
    3. 最后操作 subscribe,启动流程,输出数据。

    subscribe支持3个函数,onNext 收到一个结果,onError 收到一个错误,onCompleted 流程结束。

    3.3 调度器与线程

    • 调度器(Scheduler)对象:操作调度的线程。
    • 调度器工作的线程模型: 
      • Scheduler.io() 创建调度器用于 IO 阻塞的工作,使用缓存线程池;
      • Scheduler.computation() 创建调度器用于耗时计算,使用固定线程池;
      • Scheduler.from(java.util.concurrent.Executor executor) 使用已有线程池;

    官方案例是:

    import io.reactivex.schedulers.Schedulers;
    
    Flowable.fromCallable(() -> {
        Thread.sleep(1000); //  imitate expensive computation
        return "Done";
    })
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.single())
      .subscribe(System.out::println, Throwable::printStackTrace);
    
    Thread.sleep(2000); // <--- wait for the flow to finish
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这是一个2.x的例子。

    • Flowable.fromCallable 创建一个耗时的源。
    • subscribeOn(Schedulers.io()) 源开始使用线程池中的这个调度器
    • observeOn(Schedulers.single()) 下面的操作使用这个调度器

    注:由于调度器在后台,没有最后一句,你将等不到任何输出主线程就结束了。在实际应用中,如android界面主线程、web服务异步线程一般不会结果,而是等你 subscribe 结果。

    案例研究:假设你有100个URL,要从网站爬这些URL数据,这样编程可以吗

        Observable.from(Urls)
            .observeOn(Schedulers.io())
            .map(url -> readFileFromUrl(url))
            .observeOn(Schedulers.computation())
            .flatMap(doc -> process(doc))
            .observeOn(Schedulers.single())
            .subscribe(out -> output(out));
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    问题

    • 这里有几个 Scheduler 线程呢?
    • 每个线程调度拥有那些计算函数呢?

    参考:理解RxJava的线程模型

    Reactive 并发编程要点

    1. 在数据流中,设计与管理 Scheduler 的数量与管理范围,是 RxJava 并发的关键
    2. 在数据流中,有当前 任务调度(线程) 概念,尽管线程是隐式的
    3. subscribeOn(Scheduler) 定义最近数据源的调度者,因此一个源头一个就够了,多了也没有意义。
    4. observeOn(Schedule) 定义以后任务的调度者。
    5. 并行分为函数处理(单线程)级别的并发,和调度者(多线程)级别的并发

    3.4. RxJava 并发编程设计

    案例研究:异步任务的依赖

    假设我们的程序需要五个 micro-service 协作完成计算任务,这些 micro-services 之间存在数据依赖关系:

    micro-services-deps

    为了实验方面,我们构造了实现 Callable 的类 TimeConsumingService:

    public class TimeConsumingService implements Callable<String> {
    
        private String service_name;
        private int wait_ms;
    
        public TimeConsumingService(String name, Integer waiting, String[] depandencies) {
            this.service_name = name;
            this.wait_ms = waiting;
        }
    
        @Override
        public String call() throws Exception {
                    Thread.sleep(wait_ms);
                    return String.format("service %s exec time is: %d", service_name,wait_ms);
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    为了确保这些函数能并发执行,要点就是要构造足够线程,让没有依赖关系的服务在不同线程中执行。这里我们采用

    join 设计方法

    • 画出数据流图;
    • 选择流程图上的流程归并节点;
    • 为每条归并点的一条执行路径设计一个调度者(线程);
    • 在归并点 merge 这些路径的流。

    代码如下:

        public void testAsyncCompositeJoin() {
            System.out.println("Prepare for execution:Async Composite Join");
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            // Tasks oa -> oc,  both in the same thread 1.
            Observable<String> oa = Observable.just("oa").observeOn(Schedulers.io()).flatMap(
                    soa -> Observable.fromCallable(new TimeConsumingService("fa", 1000, new String[]{}))
            );
            Observable<String> oc = oa.flatMap(
                    (String res) -> {
                        System.out.println(res);
                        System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                        return Observable.fromCallable(
                                new TimeConsumingService("fc", 2000, new String[]{res}));
                    });
    
            // tasks ob -> (od,oe),  ob, od, oe have special thread 2,3,4.
            Observable<String> ob = Observable.just("ob").observeOn(Schedulers.io()).flatMap(
                    sob -> Observable.fromCallable(new TimeConsumingService("fb", 2000, new String[]{}))
            );
            Observable<String> od_oe = ob.flatMap(
                    (String res) -> {
                        System.out.println(res);
                        System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                        Observable<String> od = Observable.just("od").observeOn(Schedulers.io()).flatMap(
                                sod -> Observable.fromCallable(new TimeConsumingService("fd", 1000, new String[]{res}))
                        );
                        Observable<String> oe = Observable.just("oe").observeOn(Schedulers.io()).flatMap(
                                sod -> Observable.fromCallable(new TimeConsumingService("fe", 1000, new String[]{res}))
                        );
                        return Observable.merge(od, oe);
                    });
    
            System.out.println("Observable build: " + (System.currentTimeMillis() - startTime) + "ms");
    
            // tasks join oc,(od_oe) and subscribe
            Observable.merge(oc, od_oe).toBlocking().subscribe(
                    (res) -> {
                        System.out.println(res);
                        System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                    });
    
            System.out.println("End executed: " + (System.currentTimeMillis() - startTime) + "ms");
        }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    注意:上述程序中既有 lambda 函数,也有 lambda 表达式。区别是前者需要 return,而后者不需要。

    启动程序:

            sample.testAsyncCompositeJoin();
            sample.testAsyncCompositeJoin();
    • 1
    • 2

    执行结果:

    Prepare for execution:Async Composite Join
    Observable build: 204ms
    service fa exec time is: 1000
    Executed At: 1294ms
    service fb exec time is: 2000
    Executed At: 2278ms
    service fe exec time is: 1000
    Executed At: 3285ms
    service fd exec time is: 1000
    Executed At: 3285ms
    service fc exec time is: 2000
    Executed At: 3295ms
    End executed: 3295ms
    Prepare for execution:Async Composite Join
    Observable build: 0ms
    service fa exec time is: 1000
    Executed At: 1001ms
    service fb exec time is: 2000
    Executed At: 2001ms
    service fc exec time is: 2000
    Executed At: 3003ms
    service fd exec time is: 1000
    Executed At: 3003ms
    service fe exec time is: 1000
    Executed At: 3003ms
    End executed: 3005ms
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    这个结果很有趣,sample第一次加载,流程准备用了 204 ms。第二次执行,准备时间变为 0 ms

    这段代码如果不算系统开销,应在 3 秒执行完毕。

    要点

    1. join 方法合适解决数据流程多调度者的设计
    2. fromCallable 生成一个可调度的函数,该把函数的结果输出
    3. flatMap 和 map 是最常用的操作函数 
      • map 合适返回一个其他类型数据函数,但不能是 Observable 的
      • flatMap 合适返回一个流的函数,并把这个流产生的数据合并输出到下一个 Endpoint
    4. flatMap – merge 构成 map-reduce 计算模型 
      • reduce 操作函数,用于聚合一个流的结果

    4. 小结

    本文用案例说明了 Reactive 数据流驱动计算的执行过程与编程。以前并发处理多个相关服务的无数行线程、信号灯与锁、管理的代码都不见了,变成了简单、优雅的计算模型。

    网上也有无数教程,多数都是从简单应用角度的介绍性文章,其中,绝大多数程序是没有考虑异步与并发同时出现的情况。当然,也有许多精品文章,精品文章是难以读懂的,如同官方文档难以阅读一样。

    Reactive 编程是数据驱动的,而不是“事件”驱动的。Reactive 编程分为数据源准备、数据流建模、调度者分配三个基本设计步骤,才能实现异步并发执行。数据源准备、数据流建模涉及几百个操作函数,异步数据处理绝不是简单的过程;调度者分配虽然只有几个函数,却是 bug 的根源。

    Reactive 编程模型是“微服务架构” 必须掌握的基本设计思想与应用技术,服务编排与组合永远是SOA 的核心!!!

    展开全文
  • Java Reactive

    2019-09-14 23:07:22
    Reactive概念: Reactive Programming: 响应式编程,异步非阻塞就是响应式编程,与之相对应的就是命令式编程。Reactive并不是一门新技术,不用Reactive照样可是实现非阻塞编程,比如用观察者模式实现。Reactive的...

    Reactive概念:

           Reactive Programming: 响应式编程,异步非阻塞就是响应式编程,与之相对应的就是命令式编程。Reactive并不是一门新技术,不用Reactive照样可是实现非阻塞编程,比如用观察者模式实现。Reactive的另一种实现方式就是消息队列。

           Reactive的关键点:

           申明式的编程规范,数据流,传播改变,使用动态数组或者静态事件。   

          spring5实现了一部分Reactive:

          Spring WebFlux:  Reactive Web(non-blocking  servers  in genneral)

          Spring Web MVC:传统的Servlet Web (serrvlet application in general)

    转载于:https://my.oschina.net/u/3126880/blog/3097625

    展开全文
  • Java 9 Reactive Streams

    2018-09-05 20:45:22
    Java 9 Reactive Streams允许我们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。  如果您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Streams的简短说明。RxJava和...

      Java 9 Reactive Streams允许我们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。

      如果您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Streams的简短说明。RxJava和Akka Streams一直是十分优秀的响应流实现库。现在java 9已经通过java.util.concurrent.Flow API 引入了响应流支持。

    Java 9 Reactive Streams

      Reactive Streams是关于流的异步处理,因此应该有一个发布者(Publisher)和一个订阅者(Subscriber)。发布者发布数据流,订阅者使用数据。

    这里写图片描述

      有时我们必须在Publisher和Subscriber之间转换数据。处理器(Processor)是位于最终发布者和订阅者之间的实体,用于转换从发布者接收的数据,以便订阅者能理解它。我们可以拥有一系列(chain )处理器。

    这里写图片描述

      从上面的图中可以清楚地看出,Processor既可以作为订阅者也可以作为发布者。

    Java 9 Flow API

      Java 9 Flow API实现了Reactive Streams规范。Flow API是IteratorObserver模式的组合。Iterator在pull模型上工作,用于应用程序从源中拉取项目;而Observer在push模型上工作,并在item从源推送到应用程序时作出反应。

      Java 9 Flow API订阅者可以在订阅发布者时请求N个项目。然后将项目从发布者推送到订阅者,直到推送玩所有项目或遇到某些错误。
      
    Java 9 Flow API

    Java 9 Flow API类和接口

    让我们快速浏览一下Flow API类和接口。

    • java.util.concurrent.Flow:这是Flow API的主要类。该类封装了Flow API的所有重要接口。这是一个final类,我们不能扩展它。
    • java.util.concurrent.Flow.Publisher:这是一个功能接口,每个发布者都必须实现它的subscribe方法,并添加相关的订阅者以接收消息。
    • java.util.concurrent.Flow.Subscriber:每个订阅者都必须实现此接口。订阅者中的方法以严格的顺序进行调用。此接口有四种方法:
      • onSubscribe:这是订阅者订阅了发布者后接收消息时调用的第一个方法。通常我们调用subscription.request开始从处理器(Processor)接收项目。
      • onNext:当从发布者收到项目时调用此方法,这是我们实现业务逻辑以处理流,然后从发布者请求更多数据的方法。
      • onError:当发生不可恢复的错误时调用此方法,我们可以在此方法中执行清理操作,例如关闭数据库连接。
      • onComplete:这就像finally方法,并且在发布者没有发布其他项目发布者关闭时调用。我们可以用它来发送流成功处理的通知。
    • java.util.concurrent.Flow.Subscription:这用于在发布者和订阅者之间创建异步非阻塞链接。订阅者调用其request方法来向发布者请求项目。它还有cancel取消订阅的方法,即关闭发布者和订阅者之间的链接。
    • java.util.concurrent.Flow.Processor:此接口同时扩展了PublisherSubscriber接口,用于在发布者和订阅者之间转换消息。
    • java.util.concurrent.SubmissionPublisher:一个Publisher实现,它将提交的项目异步发送给当前订阅者,直到它关闭为止。它使用Executor框架,我们将在响应流示例中使用该类来添加订阅者,然后向其提交项目。

    Java 9响应流示例

      让我们从一个简单的例子开始,我们将实现Flow API Subscriber接口并使用SubmissionPublisher来创建发布者和发送消息。

    Stream Data

      假设我们有一个Employee类,用于创建从发布者发送到订阅者的流消息。

    package com.journaldev.reactive.beans;
    
    public class Employee {
    
        private int id;
        private String name;
    
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
    
        public Employee(int i, String s) {
            this.id = i;
            this.name = s;
        }
    
        public Employee() {
        }
    
        @Override
        public String toString() {
            return "[id="+id+",name="+name+"]";
        }
    }

      我们还有一个实用程序类来为我们的示例创建一个员工列表。

    package com.journaldev.reactive_streams;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import com.journaldev.reactive.beans.Employee;
    
    public class EmpHelper {
    
        public static List<Employee> getEmps() {
    
            Employee e1 = new Employee(1, "Pankaj");
            Employee e2 = new Employee(2, "David");
            Employee e3 = new Employee(3, "Lisa");
            Employee e4 = new Employee(4, "Ram");
            Employee e5 = new Employee(5, "Anupam");
    
            List<Employee> emps = new ArrayList<>();
            emps.add(e1);
            emps.add(e2);
            emps.add(e3);
            emps.add(e4);
            emps.add(e5);
    
            return emps;
        }
    
    }

    Subscriber

    package com.journaldev.reactive_streams;
    
    import java.util.concurrent.Flow.Subscriber;
    import java.util.concurrent.Flow.Subscription;
    
    import com.journaldev.reactive.beans.Employee;
    
    public class MySubscriber implements Subscriber<Employee> {
    
        private Subscription subscription;
    
        private int counter = 0;
    
        @Override
        public void onSubscribe(Subscription subscription) {
            System.out.println("Subscribed");
            this.subscription = subscription;
            this.subscription.request(1); //requesting data from publisher
            System.out.println("onSubscribe requested 1 item");
        }
    
        @Override
        public void onNext(Employee item) {
            System.out.println("Processing Employee "+item);
            counter++;
            this.subscription.request(1);
        }
    
        @Override
        public void onError(Throwable e) {
            System.out.println("Some error happened");
            e.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("All Processing Done");
        }
    
        public int getCounter() {
            return counter;
        }
    
    }
    • Subscription变量以保持引用,以便可以在onNext方法中进行请求。
    • counter变量以记录处理的项目数,请注意它的值在onNext方法中增加。这将在我们的main方法中用于在结束主线程之前等待执行完成。
    • onSubscribe方法中调用订阅请求以开始处理。另请注意,onNext在处理项目后再次调用方法,要求对下一个从发布者发布的项目进行处理。
    • onErroronComplete在例子中没有太多逻辑,但在实际情况中,它们应该用于在发生错误时执行纠正措施或在处理成功完成时清理资源。

    响应流测试程序

      我们将使用SubmissionPublisherPublisher作为示例,让我们看一下响应流实现的测试程序。

    package com.journaldev.reactive_streams;
    
    import java.util.List;
    import java.util.concurrent.SubmissionPublisher;
    
    import com.journaldev.reactive.beans.Employee;
    
    public class MyReactiveApp {
    
        public static void main(String args[]) throws InterruptedException {
    
            // Create Publisher
            SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
    
            // Register Subscriber
            MySubscriber subs = new MySubscriber();
            publisher.subscribe(subs);
    
            List<Employee> emps = EmpHelper.getEmps();
    
            // Publish items
            System.out.println("Publishing Items to Subscriber");
            emps.stream().forEach(i -> publisher.submit(i));
    
            // logic to wait till processing of all messages are over
            while (emps.size() != subs.getCounter()) {
                Thread.sleep(10);
            }
            // close the Publisher
            publisher.close();
    
            System.out.println("Exiting the app");
    
        }
    
    }

      在上述代码中,最重要的部分是发布者subscribesubmit方法的调用。我们应该始终关闭发布者以避免任何内存泄漏。

      执行上述程序时,我们将得到以下输出。

    Subscribed
    Publishing Items to Subscriber
    onSubscribe requested 1 item
    Processing Employee [id=1,name=Pankaj]
    Processing Employee [id=2,name=David]
    Processing Employee [id=3,name=Lisa]
    Processing Employee [id=4,name=Ram]
    Processing Employee [id=5,name=Anupam]
    Exiting the app
    All Processing Done

    请注意,如果我们在处理所有项目之前,主线程已经退出了,那么我们将得到不想要的结果。

    消息转换示例

      处理器用于在发布者和订阅者之间转换消息。假设我们有另一个用户希望处理不同类型的消息。假设这个新的消息类型是Freelancer

    package com.journaldev.reactive.beans;
    
    public class Freelancer extends Employee {
    
        private int fid;
    
        public int getFid() {
            return fid;
        }
    
        public void setFid(int fid) {
            this.fid = fid;
        }
    
        public Freelancer(int id, int fid, String name) {
            super(id, name);
            this.fid = fid;
        }
    
        @Override
        public String toString() {
            return "[id="+super.getId()+",name="+super.getName()+",fid="+fid+"]";
        }
    }

      我们有一个新订阅者使用Freelancer流数据。

    package com.journaldev.reactive_streams;
    
    import java.util.concurrent.Flow.Subscriber;
    import java.util.concurrent.Flow.Subscription;
    
    import com.journaldev.reactive.beans.Freelancer;
    
    public class MyFreelancerSubscriber implements Subscriber<Freelancer> {
    
        private Subscription subscription;
    
        private int counter = 0;
    
        @Override
        public void onSubscribe(Subscription subscription) {
            System.out.println("Subscribed for Freelancer");
            this.subscription = subscription;
            this.subscription.request(1); //requesting data from publisher
            System.out.println("onSubscribe requested 1 item for Freelancer");
        }
    
        @Override
        public void onNext(Freelancer item) {
            System.out.println("Processing Freelancer "+item);
            counter++;
            this.subscription.request(1);
        }
    
        @Override
        public void onError(Throwable e) {
            System.out.println("Some error happened in MyFreelancerSubscriber");
            e.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("All Processing Done for MyFreelancerSubscriber");
        }
    
        public int getCounter() {
            return counter;
        }
    
    }

    processor

      代码重要的部分是实现Processor接口。由于我们想要使用它SubmissionPublisher,我们会扩展它并在适合的地方使用它。

    package com.journaldev.reactive_streams;
    
    import java.util.concurrent.Flow.Processor;
    import java.util.concurrent.Flow.Subscription;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.function.Function;
    
    import com.journaldev.reactive.beans.Employee;
    import com.journaldev.reactive.beans.Freelancer;
    
    public class MyProcessor extends SubmissionPublisher<Freelancer> implements Processor<Employee, Freelancer> {
    
        private Subscription subscription;
        private Function<Employee,Freelancer> function;
    
        public MyProcessor(Function<Employee,Freelancer> function) {  
            super();  
            this.function = function;  
          }  
    
        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1);
        }
    
        @Override
        public void onNext(Employee emp) {
            submit((Freelancer) function.apply(emp));  
            subscription.request(1);  
        }
    
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    
        @Override
        public void onComplete() {
            System.out.println("Done");
        }
    
    }
    • Function 将用于将Employee对象转换为Freelancer对象。
    • 我们将传入的Employee消息转换为onNext方法中的Freelancer消息,然后使用SubmissionPublisher submit方法将其发送给订阅者。
    • 由于Processor既是订阅者又是发布者,我们可以在终端发布者和订阅者之间创建一系列处理器。

    消息转换测试

    package com.journaldev.reactive_streams;
    
    import java.util.List;
    import java.util.concurrent.SubmissionPublisher;
    
    import com.journaldev.reactive.beans.Employee;
    import com.journaldev.reactive.beans.Freelancer;
    
    public class MyReactiveAppWithProcessor {
    
        public static void main(String[] args) throws InterruptedException {
            // Create End Publisher
            SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
    
            // Create Processor
            MyProcessor transformProcessor = new MyProcessor(s -> {
                return new Freelancer(s.getId(), s.getId() + 100, s.getName());
            });
    
            //Create End Subscriber
            MyFreelancerSubscriber subs = new MyFreelancerSubscriber();
    
            //Create chain of publisher, processor and subscriber
            publisher.subscribe(transformProcessor); // publisher to processor
            transformProcessor.subscribe(subs); // processor to subscriber
    
            List<Employee> emps = EmpHelper.getEmps();
    
            // Publish items
            System.out.println("Publishing Items to Subscriber");
            emps.stream().forEach(i -> publisher.submit(i));
    
            // Logic to wait for messages processing to finish
            while (emps.size() != subs.getCounter()) {
                Thread.sleep(10);
            }
    
            // Closing publishers
            publisher.close();
            transformProcessor.close();
    
            System.out.println("Exiting the app");
        }
    
    }

      阅读程序中的注释以正确理解它,最重要的变化是发布者 - 处理器 - 订阅者链的创建。执行上述程序时,我们将得到以下输出。

    Subscribed for Freelancer
    Publishing Items to Subscriber
    onSubscribe requested 1 item for Freelancer
    Processing Freelancer [id=1,name=Pankaj,fid=101]
    Processing Freelancer [id=2,name=David,fid=102]
    Processing Freelancer [id=3,name=Lisa,fid=103]
    Processing Freelancer [id=4,name=Ram,fid=104]
    Processing Freelancer [id=5,name=Anupam,fid=105]
    Exiting the app
    All Processing Done for MyFreelancerSubscriber
    Done

    取消订阅

      我们可以使用Subscription cancel方法停止在订阅者中接收消息。

    请注意,如果我们取消订阅,则订阅者将不会收到onCompleteonError信号。

      以下是一个示例代码,其中订阅者只消费3条消息,然后取消订阅。

    @Override
    public void onNext(Employee item) {
        System.out.println("Processing Employee "+item);
        counter++;
        if(counter==3) {
            this.subscription.cancel();
            return;
        }
        this.subscription.request(1);
    }

      请注意,在这种情况下,我们在处理所有消息之前停止主线程的逻辑将进入无限循环。我们可以为此场景添加一些额外的逻辑,如果订阅者已停止处理或取消订阅,就使用一些全局变量来标志该状态。

    Back Pressure

      当发布者以比订阅者消费更快的速度生成消息时,会产生背压。Flow API不提供任何关于背压或处理它的信号的机制。但我们可以设计自己的策略来处理它,例如微调用户或降低信息产生率。您可以阅读RxJava deals with Back Pressure

    总结

      Java 9 Flow API是响应式编程和创建异步非阻塞应用程序的良好举措。但是,只有在所有系统API都支持它时,才能创建真正的响应式应用程序。


    原文地址:Java 9 Reactive Streams written by Pankaj
    完整代码:Github

    展开全文
  • 转:https://www.cnblogs.com/IcanFixIt/p/7245377.html在本章中,主要介绍以下内容:什么是流(stream)响应式流(Reactive Streams)的倡议是什么,以及规范和Java API响应式流在JDK 中的API以及如何使用它们如何...

    转:https://www.cnblogs.com/IcanFixIt/p/7245377.html

    Java 9

    在本章中,主要介绍以下内容:

    • 什么是流(stream)
    • 响应式流(Reactive Streams)的倡议是什么,以及规范和Java API
    • 响应式流在JDK 中的API以及如何使用它们
    • 如何使用JDK 9中的响应式流的Java API来创建发布者,订阅者和处理者

    一. 什么是流

    流是由生产者生产并由一个或多个消费者消费的元素(item)的序列。 这种生产者——消费者模型也被称为source/sink模型或发布者——订阅者(publisher-subscriber )模型。 在本章中,将其称为发布者订阅者模型。

    有几种流处理机制,其中pull模型和push模型是最常见的。 在push模型中,发布者将元素推送给订阅者。 在pull模式中,订阅者将元素推送给发布者。 发布者和订阅者都以同样的速率工作,这是一个理想的情况,这些模式非常有效。 我们会考虑一些情况,如果他们不按同样的速率工作,这种情况下涉及的问题以及对应的解决办法。

    当发布者比订阅者快的时候,后者必须有一个无边界缓冲区来保存快速传入的元素,或者它必须丢弃它无法处理的元素。 另一个解决方案是使用一种称为背压(backpressure )的策略,其中订阅者告诉发布者减慢速率并保持元素,直到订阅者准备好处理更多的元素。 使用背压可确保更快的发布者不会压制较慢的订阅者。 使用背压可能要求发布者拥有无限制的缓冲区,如果它要一直生成和保存元素。 发布者可以实现有界缓冲区来保存有限数量的元素,如果缓冲区已满,可以选择放弃它们。 可以使用另一策略,其中发布者将发布元素重新发送到订阅者,这些元素发布时订阅者不能接受。

    订阅者在请求发布者的元素并且元素不可用时,该做什么? 在同步请求中订阅者户必须等待,无限期地,直到有元素可用。 如果发布者同步地向订阅者发送元素,并且订阅者同步处理它们,则发布者必须阻塞直到数据处理完成。 解决方案是在两端进行异步处理,订阅者可以在从发布者请求元素之后继续处理其他任务。 当更多的元素准备就绪时,发布者将它们异步发送给订阅者。

    二. 什么是响应式流(Reactive Streams)

    响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。

    响应式流模型非常简单——订阅者向发布者发送多个元素的异步请求。 发布者向订阅者异步发送多个或稍少的元素。

    Tips
    响应式流在pull模型和push模型流处理机制之间动态切换。 当订阅者较慢时,它使用pull模型,当订阅者更快时使用push模型。

    在2015年,出版了用于处理响应式流的规范和Java API。 有关响应式流的更多信息,请访问http://www.reactive-streams.org/。 Java API 的响应式流只包含四个接口:

    Publisher<T>
    Subscriber<T>
    Subscription
    Processor<T,R>

    发布者(publisher)是潜在无限数量的有序元素的生产者。 它根据收到的要求向当前订阅者发布(或发送)元素。

    订阅者(subscriber)从发布者那里订阅并接收元素。 发布者向订阅者发送订阅令牌(subscription token)。 使用订阅令牌,订阅者从发布者哪里请求多个元素。 当元素准备就绪时,发布者向订阅者发送多个或更少的元素。 订阅者可以请求更多的元素。 发布者可能有多个来自订阅者的元素待处理请求。

    订阅(subscription)表示订阅者订阅的一个发布者的令牌。 当订阅请求成功时,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求更多的元素或取消订阅。

    下图显示了发布者和订阅者之间的典型交互顺序。 订阅令牌未显示在图表中。 该图没有显示错误和取消事件。

    发布者和订阅者之间的交互

    处理者(processor)充当订阅者和发布者的处理阶段。 Processor接口继承了PublisherSubscriber接口。 它用于转换发布者——订阅者管道中的元素。 Processor<T,R>订阅类型T的数据元素,接收并转换为类型R的数据,并发布变换后的数据。 下图显示了处理者在发布者——订阅和管道中作为转换器的作用。 可以拥有多个处理者。

    处理者作为转化器

    下面显示了响应式流倡导所提供的Java API。所有方法的返回类型为void。 这是因为这些方法表示异步请求或异步事件通知。

    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    public interface Subscription {
        public void request(long n);
        public void cancel();
    }
    public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
    }

    用于响应式流的Java API似乎很容易理解。 但是,实现起来并不简单。 发布者和订阅者之间的所有交互的异步性质以及处理背压使得实现变得复杂。 作为应用程序开发人员,会发现实现这些接口很复杂。 类库应该提供实现来支持广泛的用例。 JDK 9提供了Publisher接口的简单实现,可以将其用于简单的用例或扩展以满足自己的需求。 RxJava是响应式流的Java实现之一。

    三. JDK 9 中响应式流的API

    JDK 9在java.util.concurrent包中提供了一个与响应式流兼容的API,它在java.base模块中。 API由两个类组成:

    Flow
    SubmissionPublisher<T>

    Flow类是final的。 它封装了响应式流Java API和静态方法。 由响应式流Java API指定的四个接口作为嵌套静态接口包含在Flow类中:

    Flow.Processor<T,R>
    Flow.Publisher<T>
    Flow.Subscriber<T>
    Flow.Subscription

    这四个接口包含与上面代码所示的相同的方法。 Flow类包含defaultBufferSize()静态方法,它返回发布者和订阅者使用的缓冲区的默认大小。 目前,它返回256。

    SubmissionPublisher<T>类是Flow.Publisher<T>接口的实现类。 该类实现了AutoCloseable接口,因此可以使用try-with-resources块来管理其实例。 JDK 9不提供Flow.Subscriber<T>接口的实现类; 需要自己实现。 但是,SubmissionPublisher<T>类包含可用于处理此发布者发布的所有元素的consume(Consumer<? super T> consumer)方法。

    1. 发布者——订阅者交互

    在开始使用JDK API之前,了解使用响应式流的典型发布者——订阅者会话中发生的事件顺序很重要。 包括在每个事件中使用的方法。 发布者可以拥有零个或多个订阅者。 这里只使用一个订阅者。

    • 创建发布者和订阅者,它们分别是Flow.PublisherFlow.Subscriber接口的实例。
    • 订阅者通过调用发布者的subscribe()方法来尝试订阅发布者。 如果订阅成功,发布者用Flow.Subscription异步调用订阅者的onSubscribe()方法。 如果尝试订阅失败,则使用调用订阅者的onError()方法,并抛出IllegalStateException异常,并且发布者——订阅者交互结束。
    • 订阅者通过调用Subscriptionrequest(N)方法向发布者发送多个元素的请求。 订阅者可以向发布者发送更多元素的多个请求,而不必等待其先前请求是否完成。
    • 订阅者在所有先前的请求中调用订阅者的onNext(T item)方法,直到订阅者户请求的元素数量上限——在每次调用中向订阅者发送一个元素。 如果发布者没有更多的元素要发送给订阅者,则发布者调用订阅者的onComplete()方法来发信号通知流,从而结束发布者——订阅者交互。 如果订阅者请求Long.MAX_VALUE元素,则它实际上是无限制的请求,并且流实际上是推送流。
    • 如果发布者随时遇到错误,它会调用订阅者的onError()方法。
    • 订阅者可以通过调用其Flow.Subscriptioncancel()方法来取消订阅。 一旦订阅被取消,发布者——订阅者交互结束。 然而,如果在请求取消之前存在未决请求,订阅者可以在取消订阅之后接收元素。

    总结上述结束条件的步骤,一旦在订阅者上调用了onComplete()onError()方法,订阅者就不再收到发布者的通知。

    在发布者的subscribe()方法被调用之后,如果订阅者不取消其订阅,则保证以下订阅方法调用序列:

    onSubscribe onNext* (onError | onComplete)?

    这里,符号*?在正则表达式中被用作关键字,一个*表示零个或多个出现, ?意为零或一次。

    在订阅者上的第一个方法调用是onSubscribe()方法,它是成功订阅发布者的通知。订阅者的onNext()方法可以被调用零次或多次,每次调用指示元素发布。onComplete()onError()方法可以被调用为零或一次来指示终止状态; 只要订阅者不取消其订阅,就会调用这些方法。

    2. 创建发布者

    创建发布者取决于Flow.Publisher<T>接口的实现类。该类包含以下构造函数:

    SubmissionPublisher()
    SubmissionPublisher(Executor executor, int maxBufferCapacity)
    SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)

    SubmissionPublisher使用提供的Executor向其订阅者提供元素。 如果使用多个线程来生成要发布的元素并且可以估计订阅者数量,则可以使用具有固定线程池的newFixedThreadPool(int nThread),这可以使用Executors类的newFixedThreadPool(int nThread)静态方法获取。 否则,使用默认的Executor,它使用ForkJoinPool类的commonPool()方法获取。

    SubmissionPublisher类为每个订阅者使用一个独立的缓冲区。 缓冲区大小由构造函数中的maxBufferCapacity参数指定。 默认缓冲区大小是Flow类的defaultBufferSize()静态方法返回的值,该值为256。如果发布的元素数超过了订户的缓冲区大小,则额外的元素将被删除。 可以使用SubmissionPublisher类的getMaxBufferCapacity()方法获取每个订阅者的当前缓冲区大小。

    当订阅者的方法抛出异常时,其订阅被取消。 当订阅者的onNext()方法抛出异常时,在其订阅被取消之前调用构造函数中指定的处理程序。 默认情况下,处理程序为null。

    以下代码片段会创建一个SubmissionPublisher,它发布所有属性设置为默认值的Long类型的元素:

    // Create a publisher that can publish Long values
    SubmissionPublisher<Long> pub = new SubmissionPublisher<>();

    SubmissionPublisher类实现了AutoCloseable接口。 调用其close()方法调用其当前订阅者上的onComplete()方法。 调用close()方法后尝试发布元素会抛出IllegalStateException异常。

    3. 发布元素

    SubmissionPublisher<T>类包含以下发布元素的方法:

    int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
    int offer(T item, BiPredicate<Flow.Subscriber <? super T>,? super T> onDrop)
    int submit(T item)

    submit()方法阻塞,直到当前订阅者的资源可用于发布元素。 考虑每个订阅者的缓冲区容量为10的情况。 订阅者订阅了发布者并且不请求任何元素。 发布者发布了10个元素并全部缓冲所有元素。 尝试使用submit()方法发布另一个元素将阻塞,因为订阅者的缓冲区已满。

    offer()方法是非阻塞的。 该方法的第一个版本允许指定超时,之后删除该项。 可以指定一个删除处理器,它是一个BiPredicate。 在删除订阅者的元素之前调用删除处理器的test()方法。 如果test()方法返回true,则再次重试该项。 如果test()方法返回false,则在不重试的情况下删除该项。 从offer()方法返回的负整数表示向订阅者发送元素失败的尝试次数;正整数表示在所有当前订阅者中提交但尚未消费的最大元素数量的估计。

    应该使用哪种方法发布一个元素:submit()offer()? 这取决于你的要求。 如果每个已发布的元素必须发给所有订阅者,则submit()方法是最好选择。 如果要等待发布一段特定时间的元素进行重试,则可以使用offer()方法。

    4. 举个例子

    来看一个使用SubmissionPublisher作为发布者的例子。 SubmissionPublisher可以使用其submit(T item)方法发布元素。 以下代码片段生成并发布五个整数(1,2,3,4和5),假设pub是对SubmissionPublisher对象的引用:

    // Generate and publish 10 integers
    LongStream.range(1L, 6L)
              .forEach(pub::submit);

    需要订阅者才能使用发布者发布的元素。 SubmissionPublisher类包含一个consume(Consumer<? super T> consumer)方法,它允许添加一个希望处理所有已发布元素的订阅者,并且对任何其他通知(如错误和完成通知)不感兴趣。 该方法返回一个CompletedFuture<Void>,当发布者调用订阅者的onComplete()方法时,表示完成。 以下代码片段将一个Consumer作为订阅者添加到发布者中:

    // Add a subscriber that prints the published items
    CompletableFuture<Void> subTask = pub.consume(System.out::println);

    本章中的代码是com.jdojo.stream的模块的一部分,其声明如下所示。

    // module-info.java
    module com.jdojo.stream {
        exports com.jdojo.stream;
    }

    下面包含了NumberPrinter类的代码,它显示了如何使用SubmissionPublisher类来发布整数。 示例代码的详细说明遵循NumberPrinter类的输出。

    // NumberPrinter.java
    package com.jdojo.stream;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.stream.LongStream;
    public class NumberPrinter {
        public static void main(String[] args) {        
            CompletableFuture<Void> subTask = null;
            // The publisher is closed when the try block exits
            try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
                // Print the buffer size used for each subscriber
                System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());
                // Add a subscriber to the publisher. The subscriber prints the published elements
                subTask = pub.consume(System.out::println);
                // Generate and publish five integers
                LongStream.range(1L, 6L)
                          .forEach(pub::submit);
            }
            if (subTask != null) {
                try {
                    // Wait until the subscriber is complete
                    subTask.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    输出结果为:

    Subscriber Buffer Size: 256
    1
    2
    3
    4
    5

    main()方法声明一个subTask的变量来保存订阅者任务的引用。 subTask.get()方法将阻塞,直到订阅者完成。

    CompletableFuture<Void> subTask = null;

    发布类型为Long的元素发布者是在资源块中创建的。 发布者是SubmissionPublisher<Long>类的实例。 当try块退出时,发布者将自动关闭。

    try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
      //...
    }

    该程序打印将订阅发布者的每个订阅者的缓冲区大小。

    // Print the buffer size used for each subscriber
    System.out.println("Subscriber Buffer Size: " + pub.getMaxBufferCapacity());

    订阅者将使用consume()方法添加到发布者。 请注意,该方法允许指定一个Consumer,它在内部转换为Subscriber。 每个发布的元素会通知订阅者。 订阅者只需打印它接收的元素。

    // Add a subscriber to the publisher. The subscriber prints the published elements
    subTask = pub.consume(System.out::println);

    现在是发布整数的时候了。 该程序生成五个整数,1到5,并使用发布者的submit()方法发布它们。

    // Generate and publish five integers
    LongStream.range(1L, 6L)
              .forEach(pub::submit);

    已发布的整数以异步方式发送给订阅者。 当try块退出时,发布者关闭。 要保持程序运行,直到订阅者完成处理所有已发布的元素,必须调用subTask.get()。 如果不调用此方法,则可能不会在输出中看到五个整数。

    4. 创建订阅者

    要有订阅者,需要创建一个实现Flow.Subscriber<T>接口的类。 实现接口方法的方式取决于具体的需求。 在本节中,将创建一个SimpleSubscriber类,该类实现Flow.Subscriber<Long>接口。 下面包含此类的代码。

    // SimpleSubscriber.java
    package com.jdojo.stream;
    import java.util.concurrent.Flow;
    public class SimpleSubscriber implements Flow.Subscriber<Long> {    
        private Flow.Subscription subscription;
        // Subscriber name
        private String name = "Unknown";
        // Maximum number of items to be processed by this subscriber
        private final long maxCount;
        // keep track of number of items processed
        private long counter;
        public SimpleSubscriber(String name, long maxCount) {
            this.name = name;
            this.maxCount = maxCount <= 0 ? 1 : maxCount;
        }
        public String getName() {
            return name;
        }
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            System.out.printf("%s subscribed with max count %d.%n", name, maxCount);        
            // Request all items in one go
            subscription.request(maxCount);
        }
        @Override
        public void onNext(Long item) {
            counter++;
            System.out.printf("%s received %d.%n", name, item);
            if (counter >= maxCount) {
                System.out.printf("Cancelling %s. Processed item count: %d.%n", name, counter);            
                // Cancel the subscription
                subscription.cancel();
            }
        }
        @Override
        public void onError(Throwable t) {
            System.out.printf("An error occurred in %s: %s.%n", name, t.getMessage());
        }
        @Override
        public void onComplete() {
            System.out.printf("%s is complete.%n", name);
        }
    }

    SimpleSubscriber类的实例表示一个订阅者,它有一个名称和要处理的最大数量的items (maxCount)方法。 需要将其名称和maxCount传递给其构造函数。 如果maxCount小于1,则在构造函数中设置为1。

    onSubscribe()方法中,它保存发布者在名为subscription的实例变量中传递的Flow.Subscription。 它打印有关Flow.Subscription的消息,并请求一次可以处理的所有元素。 该订阅者有效地使用push模型,因为在该请求之后,不再向发布者发送更多的元素的请求。 发布着将推送maxCount或更少的元素数量给该订阅者。

    onNext()方法中,它将counter实例变量递增1。counter实例变量跟踪订阅者接收到的元素数量。 该方法打印详细说明接收到的元素消息。 如果它已经收到可以处理的最后一个元素,它将取消订阅。 取消订阅后,发布者不再收到任何元素。

    onError()onComplete()方法中,它打印一个有关其状态的消息。

    以下代码段创建一个SimpleSubscriber,其名称为S1,可以处理最多10个元素。

    SimpleSubscriber sub1 = new SimpleSubscriber("S1", 10);

    现在看一下具体使用SimpleSubscriber的例子。 下包含一个完整的程序。 它定期发布元素。 发布一个元素后,它等待1到3秒钟。 等待的持续时间是随机的。 以下详细说明本程序的输出。 该程序使用异步处理可能导致不同输出结果。

    // PeriodicPublisher.java
    package com.jdojo.stream;
    import java.util.Random;
    import java.util.concurrent.Flow.Subscriber;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.concurrent.TimeUnit;
    public class PeriodicPublisher {
        final static int MAX_SLEEP_DURATION = 3;
        // Used to generate sleep time
        final static Random sleepTimeGenerator = new Random();
        public static void main(String[] args) {
            SubmissionPublisher<Long> pub = new SubmissionPublisher<>();
            // Create three subscribers
            SimpleSubscriber sub1 = new SimpleSubscriber("S1", 2);
            SimpleSubscriber sub2 = new SimpleSubscriber("S2", 5);
            SimpleSubscriber sub3 = new SimpleSubscriber("S3", 6);
            SimpleSubscriber sub4 = new SimpleSubscriber("S4", 10);
            // Subscriber to the publisher
            pub.subscribe(sub1);
            pub.subscribe(sub2);
            pub.subscribe(sub3);
            // Subscribe the 4th subscriber after 2 seconds
            subscribe(pub, sub4, 2);
            // Start publishing items
            Thread pubThread = publish(pub, 5);
            try {
                // Wait until the publisher is finished
                pubThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static Thread publish(SubmissionPublisher<Long> pub, long count) {
            Thread t = new Thread(() -> {
                for (long i = 1; i <= count; i++) {
                    pub.submit(i);
                    sleep(i);
                }
                // Close the publisher
                pub.close();
            });
            // Start the thread
            t.start();
            return t;
        }
        private static void sleep(Long item) {
            // Wait for 1 to 3 seconds
            int sleepTime = sleepTimeGenerator.nextInt(MAX_SLEEP_DURATION) + 1;
            try {
                System.out.printf("Published %d. Sleeping for %d sec.%n", item, sleepTime);
                TimeUnit.SECONDS.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        private static void subscribe(SubmissionPublisher<Long> pub, Subscriber<Long> sub,
                                      long delaySeconds) {
            new Thread(() -> {
                try {
                    TimeUnit.SECONDS.sleep(delaySeconds);
                    pub.subscribe(sub);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }            
            }).start();
        }
    }

    输出结果为:

    S2 subscribed with max count 5.
    Published 1. Sleeping for 1 sec.
    S3 subscribed with max count 6.
    S1 subscribed with max count 2.
    S1 received 1.
    S3 received 1.
    S2 received 1.
    Published 2. Sleeping for 1 sec.
    S1 received 2.
    S2 received 2.
    S3 received 2.
    Cancelling S1. Processed item count: 2.
    S4 subscribed with max count 10.
    Published 3. Sleeping for 1 sec.
    S4 received 3.
    S3 received 3.
    S2 received 3.
    Published 4. Sleeping for 2 sec.
    S4 received 4.
    S3 received 4.
    S2 received 4.
    Published 5. Sleeping for 2 sec.
    S2 received 5.
    Cancelling S2. Processed item count: 5.
    S4 received 5.
    S3 received 5.
    S3 is complete.
    S4 is complete.

    PeriodicPublisher类使用两个静态变量。 MAX_SLEEP_DURATION静态变量保存发布这等待发布下一个元素最大秒数。 它设置为3。sleepTimeGenerator静态变量Random对象的引用,该对象在sleep()方法中用于生成下一个等待的随机持续时间。

    PeriodicPublisher类的main()方法执行以下操作:

    • 它创建作为SubmissionPublisher<Long>类的实例的发布者。
    • 它创建了四个为S1S2S3S4的订阅者。每个订阅者能够处理不同数量的元素。
    • 三个订阅者立即订阅。
    • S4的订阅者在两秒钟的最短延迟之后以单独的线程订阅。 PeriodicPublisher类的subscribe()方法负责处理此延迟订阅。注意到在两个元素(1和2)已经发布之后S4订阅的输出中,它将不会收到这两个元素。
    • 它调用publish()方法,它启动一个新的线程来发布五个元素,它启动线程并返回线程引用。
    • main()方法调用发布元素线程的join()方法,所以在所有元素发布之前程序不会终止。
    • publish()方法负责发布五个元素。最后关闭发布者。它调用sleep()方法,使当前线程休眠一个和MAX_SLEEP_DURATION秒之间的随机选择的持续时间。
    • 在输出中注意到,一些订阅者取消了订阅,因为他们从发布商那里收到指定数量的元素。

    请注意,该程序保证所有元素将在终止之前发布,但不保证所有订阅者都将收到这些元素。 在输出中,会看到订阅者收到所有已发布的元素。 这是因为发布者在发布最后一个元素后等待至少一秒钟,这给了订阅者足够的时间,在这个小程序中接收和处理最后一个元素。

    该程序没有表现出背压(backpressure) ,因为所有订阅者都通过一次性请求元素来使用push模型。 可以将SimpleSubscriber类修改为分配任务,以查看背压的效果:

    • onSubscribe()方法中使用subscription.request(1)方法请求一个元素。
    • onNext()方法中,延迟后请求更多的元素。 延迟应使订阅者的工作速度较慢,发布者发布元素的速度较慢。
    • 需要发布超过256个元素,这是每个发布者向订阅者使用的默认缓冲区,或者使用SubmissionPublisher类的另一个构造函数使用较小的缓冲区大小。 这将迫使发布者发布比订阅者可以处理的更多的元素。
    • 订阅者使用删除处理程序( drop handler)订阅,以便可以看到发布者何时发现背压。
    • 使用SubmissionPublisher类的offer()方法发布元素,因此当订阅者无法处理更多元素时,发布者不会无限期地等待。

    5. 使用处理者

    处理者(Processor)同时是订阅者也是发布者。 要使用处理者,需要一个实现Flow.Processor<T,R>接口的类,其中T是订阅元素类型,R是已发布的元素类型。 在本节中,创建了一个基于Predicate<T>过滤元素的简单处理者。 处理者订阅发布六个整数——1,2,3,4,5和6的发布者。订阅者订阅处理者。 处理者从其发布者接收元素,如果它们通过了Predicate<T>指定的标准,则重新发布相同的元素。 下面包含其实例作为处理者的FilterProcessor<T>类的代码。

    // FilterProcessor.java
    package com.jdojo.stream;
    import java.util.concurrent.Flow;
    import java.util.concurrent.Flow.Processor;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.function.Predicate;
    public class FilterProcessor<T> extends SubmissionPublisher<T> implements Processor<T,T>{
        private Predicate<? super T> filter;
        public FilterProcessor(Predicate<? super T> filter) {
            this.filter = filter;
        }
        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            // Request an unbounded number of items
            subscription.request(Long.MAX_VALUE);
        }
        @Override
        public void onNext(T item) {
            // If the item passes the filter publish it. Otherwise, no action is needed.
            System.out.println("Filter received: " + item);
            if (filter.test(item)) {            
                this.submit(item);
            }
        }
        @Override
        public void onError(Throwable t) {
            // Pass the onError message to all subscribers asynchronously        
            this.getExecutor().execute(() -> this.getSubscribers()
                                                 .forEach(s -> s.onError(t)));
        }
        @Override
        public void onComplete() {
            System.out.println("Filter is complete.");
            // Close this publisher, so all its subscribers will receive a onComplete message
            this.close();
        }
    }

    FilterProcessor<T>类继承自SubmissionPublisher<T>类,并实现了Flow.Processor<T,T>接口。 处理者必须是发布者以及订阅者。 从SubmissionPublisher<T>类继承了这个类,所以不必编写代码来使其成为发布者。 该类实现了Processor<T,T>接口的所有方法,因此它将接收和发布相同类型的元素。

    构造函数接受Predicate<? super T> 参数并将其保存在实例变量filter中,将在onNext()方法中使用filter元素。

    onNext()方法应用filter。 如果filter返回true,则会将该元素重新发布到其订阅者。 该类从其超类SubmissionPublisher继承了用于重新发布元素的submit()方法。

    onError()方法异步地将错误重新发布给其订阅者。 它使用SubmissionPublisher类的getExecutor()getSubscribers()方法,该方法返回Executor和当前订阅者的列表。 Executor用于异步地向当前订阅者发布消息。

    onComplete()方法关闭处理者的发布者部分,它将向所有订阅者发送一个onComplete消息。

    让我们看看这个处理者具体的例子。 下面包含ProcessorTest类的代码。 可能会得到一个不同的输出,因为这个程序涉及到几个异步步骤。 该程序的详细说明遵循程序的输出。

    // ProcessorTest.java
    package com.jdojo.stream;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.LongStream;
    public class ProcessorTest {
        public static void main(String[] args) {
            CompletableFuture<Void> subTask = null;
            // The publisher is closed when the try block exits
            try (SubmissionPublisher<Long> pub = new SubmissionPublisher<>()) {
                // Create a Subscriber
                SimpleSubscriber sub = new SimpleSubscriber("S1", 10);
                // Create a processor
                FilterProcessor<Long> filter = new FilterProcessor<>(n -> n % 2 == 0);
                // Subscribe the filter to the publisher and a subscriber to the filter
                pub.subscribe(filter);            
                filter.subscribe(sub);
                // Generate and publish 6 integers
                LongStream.range(1L, 7L)
                          .forEach(pub::submit);
            }
            try {
                // Sleep for two seconds to let subscribers finish handling all items
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    输出的结果为:

    S1 subscribed with max count 10.
    Filter received: 1
    Filter received: 2
    Filter received: 3
    S1 received 2.
    Filter received: 4
    S1 received 4.
    Filter received: 5
    Filter received: 6
    Filter is complete.
    S1 received 6.
    S1 is complete.

    ProcessorTest类的main()方法创建一个发布者,它将发布六个整数——1,2,3,4,5和6。该方法做了很多事情:

    • 它创建一个使用try-with-resources块的发布者,所以当try块退出时它将自动关闭。
    • 它创建一个SimpleSubscriber类的实例的订阅者。订阅者名为S1,最多可处理10个元素。
    • 它创建一个处理者,它是FilterProcessor<Long>类的实例。传递一个Predicate<Long>,让处理者重新发布整数并丢弃奇数。
    • 处理者被订阅发布者,并且简单订阅者被订阅到处理者。这完成了发布者到订阅者的管道——发布者到处理者到订阅者。
    • 在第一个try块的末尾,代码生成从1到6的整数,并使用发布者发布它们。
    • main()方法结束时,程序等待两秒钟,以确保处理者和订阅者有机会处理其事件。如果删除此逻辑,程序可能无法打印任何内容。必须包含这个逻辑,因为所有事件都是异步处理的。当第一个try块退出时,发布者将完成向处理者发送所有通知。然而,处理者和订阅者需要一些时间来接收和处理这些通知。

    四. 总结

    流是生产者生产并由一个或多个消费者消费的元素序列。 这种生产者——消费者模型也被称为source/sink模型或发行者——订阅者模型。

    有几种流处理机制,pull模型和push模型是最常见的。 在push模型中,发布者将数据流推送到订阅者。 在pull模型中,定于这从发布者拉出数据。 当两端不以相同的速率工作的时,这些模型有问题。 解决方案是提供适应发布者和订阅者速率的流。 使用称为背压的策略,其中订阅者通知发布者它可以处理多少个元素,并且发布者仅向订阅者发送那些需要处理的元素。

    响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的举措。 它旨在解决处理元素流的问题 ——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或者订阅者有无限制的缓冲区或丢弃。 响应式流模型在pull模型和push模型流处理机制之间动态切换。 当订阅者处理较慢时,它使用pull模型,当订阅者处理更快时使用push模型。

    在2015年,出版了一个用于处理响应式流的规范和Java API。 Java API 中的响应式流由四个接口组成:Publisher<T>Subscriber<T>SubscriptionProcessor<T,R>

    发布者根据收到的要求向订阅者发布元素。 用户订阅发布者接收元素。 发布者向订阅者发送订阅令牌。 使用订阅令牌,订阅者从发布者请求多个数据元素。 当数据元素准备就绪时,发布者向订阅者发送多个个或稍少的数据元素。 订阅者可以请求更多的数据元素。

    JDK 9在java.util.concurrent包中提供了与响应式流兼容的API,它在java.base模块中。 API由两个类组成:FlowSubmissionPublisher<T>

    Flow类封装了响应式流Java API。 由响应式流Java API指定的四个接口作为嵌套静态接口包含在Flow类中:Flow.Processor<T,R>Flow.Publisher<T>Flow.Subscriber<T>Flow.Subscription

    标签: JavaJava 9
    展开全文
  • Reactive Java 9 中文字幕 Reactive Java 9 中文字幕Reactive Java 9 反应式编程是围绕数据流构建的异步编程范例 事件,服务器请求,消息传送,甚至值都通过流传送 在Java应用程序中使用响应式编程可实现并发性,...

    Reactive Java 9 中文字幕

    Reactive Java 9 中文字幕Reactive Java 9

    Alt
    反应式编程是围绕数据流构建的异步编程范例
    事件,服务器请求,消息传送,甚至值都通过流传送
    在Java应用程序中使用响应式编程可实现并发性,从而提高应用程序的性能,并改进标准错误处理
    本课程介绍如何遵循Reactive Streams规范开发强大且结构良好的Java 9应用程序
    教练Manuel Vicente Vivo还涵盖了运营商和流行的反压流处理Akka Streams API
    在最后一章中,他展示了如何使用Spring,Jersey,RxJava 2.0和Java 9逐步构建一个真实的反应式RESTful Web应用程序

    本课程由Packt Publishing创建和制作
    我们很荣幸能够在我们的图书馆举办这一培训

    主题包括:
    什么是响应式编程?
    Java 9反应式功能
    使用RxJava创建和观察源代码
    运营商
    单元测试
    Akka在反应性环境中流动
    构建一个示例反应式应用
    Alt
    Alt
    Alt
    Alt
    Alt

    • [导师]嗨,我是Manuel Vicente Vivo,欢迎来到Packt Publishing的Reactive Java 9。
      我是Capital One的软件工程师,现在我在英国诺丁汉工作。
      我每天都会在工作和个人项目中使用反应式编程。
      它加速了我的开发过程,让我的生活变得更加轻松。
      我希望它在课程结束时对你有同样的效果。
      我通常在Medium中编写不同类型的文章。
      我最受欢迎的是关于响应式编程,RxJava一,二,代码结构。
      您可以在Twitter,LinkedIn或Medium中找到我,并使用该用户名:@manuelvicnt。
      本课程将包括响应式编程和Java 9的完整概述,包括可以实现的功能,从响应式编程的基本知识及其优点开始。
      我们将详细介绍反应式编程库RxJava,直到将它用于现实生活中的Web应用程序。
      您将了解Java 9中包含的新功能,以及如何使用工具包Akka使用反应式编程。
      本视频为您提供了整个课程的一瞥。
      它由七个部分组成。
      在第一节中,我们将解释什么是反应式编程,并介绍了Reactive Streams API规范。
      您将了解Java 9中提供的新功能,以及它在第二部分中如何本机支持响应式编程。
      第三部分介绍了RxJava 2以及它如何实现Reactive Streams API。
      你会知道如何创建观察者以及如何观察他们。
      第四部分介绍了我们可以与RxJava一起使用的不同运营商。
      大理石图将帮助我们更好地理解它们。
      在第五部分中,我们将看到如何使用RxJava 2处理并发性和单元测试反应式代码。
      如何在Akka工具包中使用Akka流中的反应式编程概念。
      这就是第六部分的内容。
      在课程的最后部分,我们将把我们学到的一切付诸实践。
      我们将创建一个完整的RESTful Web服务。
      我们将集成Java,Spring,Jersey和RxJava 2。
      到本课程结束时,您将对反应式编程及其好处以及如何从本质上以反应方式思考问题有着深刻的理解。
      您将熟悉Java 9中提供的新功能。
      您将会对RxJava 2的力量以及您拥有的无限可能性获得赞赏。
      您将了解如何以被动方式使用Akka,并且您将能够创建和操作反应式编程应用程序。
      在这种情况下,我们将向您展示如何创建一个RESTful Web服务。
      如果您想成功完成本课程,有几个小的先决条件。
      您需要了解Java的基础知识。
      不过,你不需要成为这方面的专家。
      在编程和面向对象的编程概念中,图书馆是理所当然的。
      对于本课程的最后一节,如果您熟悉RESTful Web服务,那将会很好。
      没有硬件要求,您只需要一台可以运行Java IDN的个人计算机。
      希望看到你参加课程。
      这将会非常有趣,我相信你会学到很多东西。
      到时候那里见。
      本课程视频下载地址:Reactive Java 9
    展开全文
  • 详解介绍如何使用java 9 新特性进行reactive 编程。从Rxjava 到Spring 均有涉及。
  • Reactive Programming with Java 8 中文字幕 使用Java 8进行反应式编程 中文字幕Reactive Programming with Java 8 Java开发人员面临着许多挑战:复杂的分布式系统,对响应性和性能的高期望,以及比以往更多的用户...
  • Java社区来说,Java 9把反应式流规范以java.util.concurrent.Flow 类的形式添加到了标准库中。Spring 5 已经支持了反应式编程实践,并提供了 WebFlux 这样的 Web 编程框架。其他语言也都有类似的反应式编程框架,...
  • Reactive编程很有趣,现在也有各种各样的讨论,概念上不是很容易理解。本文会以具体的形式介绍相关的概念。Reactive编程跟并发和高性能在概念上有一些重合,但在原理上完全不同。Reactive编程跟函数式编程是非常类似...
  • 在这篇文章中,我想看看Java中的整体Reactive发展环境。 Reactive Programming vs. Reactive Streams 有了这些新的流行语,就很容易对它们的含义感到困惑。 反应式编程是一种编程范式,但我不会称之为新的。它...
  • Rx = Observables +LiNQ +SchedulersRx模式的优点: 使用观察者模式,方便创建事件流和数据流 查询方式的操作符组合和变换数据流,订阅任何观察的数据流并执行操作,轻松实现并发。 Rx四个基本概念:Observable...
  • 什么是流(stream) 响应式流(Reactive Streams)的倡议是什么,以及规范和Java API 响应式流在JDK 中的API以及如何使用它们 如何使用JDK 9中的响应式流的Java API来创建发布者,订阅者和处理者 一. 什么是流 流...
  • Reactive Streams Reactive Streams 是一个使用非阻塞背压机制的异步流处理标准。 back pressure(背压)是其中的关键概念。在异步模式中,消费者订阅生产者,从生产者那里获取数据,需要提供回调方法,当生产者...
  • Reactive Streams简单介绍
  • 调试Reactive Streams可能是我们开始使用这些数据结构后必须面对的主要挑战之一。 考虑到Reactive Streams在过去几年中越来越受欢迎,了解我们如何有效地执行此任务是个好主意。 让我们首先使用Reactive Streams设置...
  • 第一次听到reactive这个词还是在几年前,偶然了解到了Rxjava这个项目,仿佛为我打开了一扇新的大门,Rxjava是ReactiveX的java实现,ReactiveX家族除了Rxjava还有RxJS, Rx.NET,RxScala等等。 ReactiveX的本质就是...
  • lettuce - 高级Java Redis客户端,用于线程安全同步,异步和reactive用法。 支持群集,Sentinel,管道和编解码器。
  • Reactive编程

    2019-01-23 14:20:35
    Reactive编程很有趣,现在也有各种各样的讨论,概念上不是很容易理解。本文会以具体的形式介绍相关的概念。Reactive编程跟并发和高性能在概念上有一些重合,但在原理上完全不同。Reactive编程跟函数式编程是非常类似...
  • java9 reactive

    2019-01-24 14:47:45
    http://blog.51cto.com/zero01/2293823
1 2 3 4 5 ... 20
收藏数 10,377
精华内容 4,150
关键字:

8 java reactive