reactive_reactivex - CSDN
精华内容
参与话题
  • Java Reactive 异步与并发编程

    万次阅读 2017-04-23 17:12:39
    Reactive 编程在多数人眼中是异步、并发的“银弹/神器”。本文分析了Reactive 执行原理,说明 Reactive 编程是数据驱动的,而不是“事件”驱动的。Reactive 编程分为数据源准备、数据流建模、调度者分配三个基本设计...

    Java Reactive 异步与并发编程

    【摘要】Reactive 编程在多数人眼中是异步、并发的“银弹/神器”。本文分析了Reactive 执行原理,说明 Reactive 编程是数据驱动的,而不是“事件”驱动的。Reactive 编程分为数据源准备、数据流建模、调度者分配三个基本设计步骤,才能实现异步并发执行。最后,我们给出基于数据流图的计算模型的设计与编程方案。

    大数据和云计算(云服务)让 Reactive 编程成为新一代的编程神器。尽管 Reactive 编程模型大大简化了异步与并发编程的难度,但绝不是低门槛的。它首先需要你改变传统顺序处理的计算模式,建立面向数据流的计算模型;然后,需要有强大的线程、协程等并发知识,才能编写出 safe 的应用;再者还需要一些函数式编程的知识,如 Lambda、闭包等。本文努力描述响应式编程需要的最基础的知识,并用一些案例,让你体验 Reactive 编程的神奇与优雅。

    1、准备知识

    Java Reactive 编程使用 RxJava 库,虽然可以兼容 Java 5 编程,但你会失去 Java 8 给你带来的便利,如 Lambda 表达式,CompleteableFuture 等异步特性的支持。关键是没有 Lambda 函数,Reactive Java 程序几乎无法阅读!

    1.1 编程环境配置

    1. 项目文件配置

    maven 配置文件 pom.xml 需要

        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>

    2. IDEA 设置

    Intellij IDEA 要设置三处位置,且要求版本一致,否则就有出错提示,例如:“source Release 8 Requires Target Release 1.8…”

    • File -> Project Structure -> Project
    • File -> Project Structure -> Modules
    • File -> Setting -> Build -> Compiler -> Java Compiler

    设置如图

    1.2 Lambda 表达式

    现在,不支持 Lambda 表达式的语言真不多。 在 Java 中,它主要作为单一方法的接口匿名实现。例如:

    public class TestLambda {
        public static void main(String[] args) {
    
            System.out.println("=== RunnableTest ===");
    
            // Anonymous Runnable
            Runnable r1 = new Runnable() {
                @Override
                public void run() {
                    System.out.println("Hello world one!");
                }
            };
    
            // Lambda Runnable
            Runnable r2 = () -> System.out.println("Hello world two!");
    
            // Run em!
            r1.run();
            r2.run();
        }
    }

    Lambda 表达式 的语法,例如:

    Argument List Arrow Token Body
    (int x, int y) -> x + y

    官方教程: Java SE 8: Lambda Quick Start

    2、Future<V> 与多线程编程

    Future<V> 是一个泛型接口,如果一个可运行的函数(实现 Callable 或 Runable 的类)在一个线程中运行,利用 Future<V> 可以用它的 get() 方法返回 V 类型的结果。 注意, get() 会阻塞当前线程。例如:

    public class TestFuture {
    
        // https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executor.html
        static ExecutorService executor = Executors.newCachedThreadPool();
    
        public void testTaskRunning(String name, Integer t) {
            System.out.println("Prepare for execution:" + name);
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            // using lambda may cause 10X time then Callable
    //        Future<String> fa = executor.submit(
    //                new Callable<String>() {
    //                    @Override
    //                    public String call() throws Exception {
    //                        try {
    //                            Thread.sleep(t);
    //                        } catch (Exception e) {
    //                            e.printStackTrace();
    //                        }
    //                        return String.format("service exec time: %d", t);
    //                    }
    //                }
    //        );
    
            Future<String> fa =  executor.submit(
                    () -> {
                            try {
                                Thread.sleep(t);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            return String.format("service exec time: %d", t);
                    });
    
            long endTime = System.currentTimeMillis();
            System.out.println("Start execute: " + (endTime - startTime) + "ms");
    
            try {
                String s = fa.get(); //Future to Blocked
                System.out.println(s);
            } catch (
                    Exception e)
            {
                e.printStackTrace();
            }
    
            endTime = System.currentTimeMillis(); //
            System.out.println("End execute: " + (endTime - startTime) + "ms");
    
        }
    
        public void testAsyncTaskRunning() {
            System.out.println("Prepare for execution: composite task" );
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            Future<String> fa = executor.submit(new TimeConsumingService("fa",200,new String[]{}));
            Future<String> fb = executor.submit(new TimeConsumingService("fb",400,new String[]{}));
    
            System.out.println("Start execute: " + (System.currentTimeMillis() - startTime) + "ms");
    
            try {
                // What will happen when change line fc and fd ?
                Future<String> fc = executor.submit(new TimeConsumingService("fc",400,new String[]{fa.get()}));
                Future<String> fd = executor.submit(new TimeConsumingService("fd",200,new String[]{fb.get()}));
                Future<String> fe = executor.submit(new TimeConsumingService("fe",200,new String[]{fb.get()}));
                fc.get(); fd.get(); fe.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            System.out.println("End execute: " + (System.currentTimeMillis() - startTime) + "ms");
        }
    
        public static void main(String[] args) {
            TestFuture test = new TestFuture();
            test.testTaskRunning("fa", 300);
            //test.testAsyncTaskRunning();
            System.out.println("sssssssssssssssssss");
        }
    }
    • static ExecutorService executor = Executors.newCachedThreadPool();
      • Executors 返回一个线程模型,用于发射线程;
        • newCachedThreadPool() 创建缓存线程池,常用于管理 IO 阻塞型线程
        • newFixedThreadPool(int nThreads) 创建管理计算需求的线程池,n=cpu*2;
        • newSingleThreadExecutor() 创建单个线程
        • … …
      • ExecutorService 管理发射(启动)、停止线程等操作
        • submit 用于将实现 Callable 或 Runable 接口的对象,启动线程;
        • 它返回 Future<V> 接口,方便管理线程和获取结果
    • testTaskRunning()
      • executor.submit 启动 Callable 接口实现,或 Lambda 函数为一个线程
        • Lambda 准备需要远常于接口的时间
    • testAsyncTaskRunning()
      • 在线程池中执行一群耗时的函数(实现 Callable的类)
      • TimeConsumingService 是模拟服务对象,它有名字、计算时间。代码见后面

    问题

    1. get() 顺序会影响出结果时间,关键 get 的阻塞;
    2. 如果能按这些线程出结果的时间序列,把数据结果交给后面的线程并行加工处理,CPU就不用阻塞在 get() 了。但编程无疑会很复杂。

    3、Reactive(响应式)编程

    大到大数据处理(如spark),小到 android界面事件流的处理,Reactive(响应式)编程以成为最流行的标配。然而它是如何工作的呢? 如何正确编写 Reactive 程序?

    3.1 响应式编程基本原理

    响应式编程是一种按流水线模式组织处理数据流的协程计算模型(实在找不到合适的定义)。一个流水线有输入源、工人(函数)、流水结构、调度员、输出构成。假设一个简单的线性(顺序处理)流水线,如图:

    (s0--S1----s2-s3---|)  [alpha]  ...  [beta]  ...  [theta]  (o0---o2------o2o4-)
                          |                                   |
                          |                                   |
                          +------------scheduler--------------+

    其中 :

    *(d0–d1—–d2–|)表示带终止标志的时序数据流;
    * [worker] 表示一个操作函数,它只能从前道工序取数据,加工处理后交给下一个 worker;
    * … 表示两个工人之间的存放数据的缓存队列(EndPoint);
    * scheduler 是管理工人干活的调度员,一个调度员能仅能一次指挥一个工人,且工人一次只处理一个数据,要么成功,要么失败(异常)。

    例如:

        public static void hello(Integer... integers) {
            Observable<Integer> workflow = Observable.from(integers)
                    .filter( i -> (i < 10) && (i > 0))
                    .map( i -> i *2);
            workflow.subscribe(i -> System.out.print(i + "! ") );
        }

    这是典型的链式函数式处理计算模式。输入与输出分别是:

    hello(-1,4,5,0,2,19,6);
    
    8! 10! 4! 12! 

    关键不是结果,而是处理过程。第一条语句仅是搭建了一个框架,声明了数据源和两个操作工人 filter,map,它们各自使用定义的函数干活。最重要的工人(subscribe)出场了,它开动了流水线, scheduler 开始指挥工人干活,注意工人轮流上场(不是并行),直到遇到结果符号且每个工人无活可干,流水线关闭。

    这个程序是单线程程序(一个 scheduler),但是 filter,map,scheduler 干活的顺序是不可预知的,如果他们取数据的 EndPoint有数据,则可以被 scheduler 调度的。一个工人被调度一次,则是获得了CPU,直到函数操作完成,释放 CPU。这是标准的协程(Coroutine)概念。

    响应式编程意思就是根据 EndPoint 中的数据启动对应数据处理函数,是函数之间异步执行(链式反应)的过程,这对于减少程序阻塞、减低线程开销,特别是不支持多线程的nodejs,javascript,python等具有特别的意义。

    因此,Reactive 编程是数据驱动的编程,在数据驱动的模型上建立并发处理机制,需要引入多个调度者。

    千万不要认为响应式编程是并发的,建议你有时间时, 务必仔细阅读 RxJava Threading Examples 。理论上,Reactive 是数据驱动的,而不是事件驱动的。

    响应式编程不是真正意义上的并发,由于每个调度器是一个线程,它管理的操作函数之间一般都不需要对数据上锁,这些函数是按数据驱动,“并发运行”的。

    Reactive 要点

    1. Reactive 编程开始是流程建模过程,而不是数据处理过程
    2. 工作流由 subscribe 启动,subscribe 多次就会导致多次启动流程。这与普通语言按顺序执行不一样
    3. scheduler 是一个单线程的调度器,每个scheduler中管理的函数都是顺序执行的,阻塞函数会阻塞流水线工作。
    4. 工作函数是响应式的,观察者模式仅是实现数据驱动的技术概念,每个数据流,就称为 “Observable” 的对象。所以,到了 2.x 就改称 “Flowable” 了

    3.2 RxJava 入门

    RxJava 是 Reactive eXtend for Java VM 的缩写。Reactive eXtend 支持许多语言,支持 Groovy, Clojure, JRuby, Kotlin 和 Scala 等现代语言。官方网站:
    https://github.com/ReactiveX/RxJava/wiki

    1. Maven 依赖

            <dependency>
                <groupId>io.reactivex</groupId>
                <artifactId>rxjava</artifactId>
                <version>1.2.9</version>
            </dependency>

    个人认为 RxJava 1.x 比较合适入门,2.x 抽象程度高,较难入门。

    2. Hello world

        public static void hello(String... names) {
            Observable.from(names).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    System.out.println("Completed!");
                }
    
                @Override
                public void onError(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
                @Override
                public void onNext(String strings) {
                    System.out.println("same hello " + strings);
                }
    
            });
        }

    编程步骤

    1. 响应式编程第一步就是创建数据源。简单就是 Observable.from 从一个集合中取数,或者 Observable.just 直接取数,例如 Observable.just(1,2);
    2. 用操作函数搭建数据流处理流水线(workflow),即使用操作数据变换(transform) 定义工作流。官方有几百个操作或函数???,学习难度有点大!!!
    3. 最后操作 subscribe,启动流程,输出数据。

    subscribe支持3个函数,onNext 收到一个结果,onError 收到一个错误,onCompleted 流程结束。

    3.3 调度器与线程

    • 调度器(Scheduler)对象:操作调度的线程。
    • 调度器工作的线程模型:
      • Scheduler.io() 创建调度器用于 IO 阻塞的工作,使用缓存线程池;
      • Scheduler.computation() 创建调度器用于耗时计算,使用固定线程池;
      • Scheduler.from(java.util.concurrent.Executor executor) 使用已有线程池;

    官方案例是:

    import io.reactivex.schedulers.Schedulers;
    
    Flowable.fromCallable(() -> {
        Thread.sleep(1000); //  imitate expensive computation
        return "Done";
    })
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.single())
      .subscribe(System.out::println, Throwable::printStackTrace);
    
    Thread.sleep(2000); // <--- wait for the flow to finish

    这是一个2.x的例子。

    • Flowable.fromCallable 创建一个耗时的源。
    • subscribeOn(Schedulers.io()) 源开始使用线程池中的这个调度器
    • observeOn(Schedulers.single()) 下面的操作使用这个调度器

    注:由于调度器在后台,没有最后一句,你将等不到任何输出主线程就结束了。在实际应用中,如android界面主线程、web服务异步线程一般不会结果,而是等你 subscribe 结果。

    案例研究:假设你有100个URL,要从网站爬这些URL数据,这样编程可以吗

        Observable.from(Urls)
            .observeOn(Schedulers.io())
            .map(url -> readFileFromUrl(url))
            .observeOn(Schedulers.computation())
            .flatMap(doc -> process(doc))
            .observeOn(Schedulers.single())
            .subscribe(out -> output(out));

    问题

    • 这里有几个 Scheduler 线程呢?
    • 每个线程调度拥有那些计算函数呢?

    参考:理解RxJava的线程模型

    Reactive 并发编程要点

    1. 在数据流中,设计与管理 Scheduler 的数量与管理范围,是 RxJava 并发的关键
    2. 在数据流中,有当前 任务调度(线程) 概念,尽管线程是隐式的
    3. subscribeOn(Scheduler) 定义最近数据源的调度者,因此一个源头一个就够了,多了也没有意义。
    4. observeOn(Schedule) 定义以后任务的调度者。
    5. 并行分为函数处理(单线程)级别的并发,和调度者(多线程)级别的并发

    3.4. RxJava 并发编程设计

    案例研究:异步任务的依赖

    假设我们的程序需要五个 micro-service 协作完成计算任务,这些 micro-services 之间存在数据依赖关系:

    micro-services-deps

    为了实验方面,我们构造了实现 Callable 的类 TimeConsumingService:

    public class TimeConsumingService implements Callable<String> {
    
        private String service_name;
        private int wait_ms;
    
        public TimeConsumingService(String name, Integer waiting, String[] depandencies) {
            this.service_name = name;
            this.wait_ms = waiting;
        }
    
        @Override
        public String call() throws Exception {
                    Thread.sleep(wait_ms);
                    return String.format("service %s exec time is: %d", service_name,wait_ms);
        }
    }

    为了确保这些函数能并发执行,要点就是要构造足够线程,让没有依赖关系的服务在不同线程中执行。这里我们采用

    join 设计方法

    • 画出数据流图;
    • 选择流程图上的流程归并节点;
    • 为每条归并点的一条执行路径设计一个调度者(线程);
    • 在归并点 merge 这些路径的流。

    代码如下:

        public void testAsyncCompositeJoin() {
            System.out.println("Prepare for execution:Async Composite Join");
            long startTime = System.currentTimeMillis(); //获取开始时间
    
            // Tasks oa -> oc,  both in the same thread 1.
            Observable<String> oa = Observable.just("oa").observeOn(Schedulers.io()).flatMap(
                    soa -> Observable.fromCallable(new TimeConsumingService("fa", 1000, new String[]{}))
            );
            Observable<String> oc = oa.flatMap(
                    (String res) -> {
                        System.out.println(res);
                        System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                        return Observable.fromCallable(
                                new TimeConsumingService("fc", 2000, new String[]{res}));
                    });
    
            // tasks ob -> (od,oe),  ob, od, oe have special thread 2,3,4.
            Observable<String> ob = Observable.just("ob").observeOn(Schedulers.io()).flatMap(
                    sob -> Observable.fromCallable(new TimeConsumingService("fb", 2000, new String[]{}))
            );
            Observable<String> od_oe = ob.flatMap(
                    (String res) -> {
                        System.out.println(res);
                        System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                        Observable<String> od = Observable.just("od").observeOn(Schedulers.io()).flatMap(
                                sod -> Observable.fromCallable(new TimeConsumingService("fd", 1000, new String[]{res}))
                        );
                        Observable<String> oe = Observable.just("oe").observeOn(Schedulers.io()).flatMap(
                                sod -> Observable.fromCallable(new TimeConsumingService("fe", 1000, new String[]{res}))
                        );
                        return Observable.merge(od, oe);
                    });
    
            System.out.println("Observable build: " + (System.currentTimeMillis() - startTime) + "ms");
    
            // tasks join oc,(od_oe) and subscribe
            Observable.merge(oc, od_oe).toBlocking().subscribe(
                    (res) -> {
                        System.out.println(res);
                        System.out.println("Executed At: " + (System.currentTimeMillis() - startTime) + "ms");
                    });
    
            System.out.println("End executed: " + (System.currentTimeMillis() - startTime) + "ms");
        }

    注意:上述程序中既有 lambda 函数,也有 lambda 表达式。区别是前者需要 return,而后者不需要。

    启动程序:

            sample.testAsyncCompositeJoin();
            sample.testAsyncCompositeJoin();

    执行结果:

    Prepare for execution:Async Composite Join
    Observable build: 204ms
    service fa exec time is: 1000
    Executed At: 1294ms
    service fb exec time is: 2000
    Executed At: 2278ms
    service fe exec time is: 1000
    Executed At: 3285ms
    service fd exec time is: 1000
    Executed At: 3285ms
    service fc exec time is: 2000
    Executed At: 3295ms
    End executed: 3295ms
    Prepare for execution:Async Composite Join
    Observable build: 0ms
    service fa exec time is: 1000
    Executed At: 1001ms
    service fb exec time is: 2000
    Executed At: 2001ms
    service fc exec time is: 2000
    Executed At: 3003ms
    service fd exec time is: 1000
    Executed At: 3003ms
    service fe exec time is: 1000
    Executed At: 3003ms
    End executed: 3005ms
    

    这个结果很有趣,sample第一次加载,流程准备用了 204 ms。第二次执行,准备时间变为 0 ms

    这段代码如果不算系统开销,应在 3 秒执行完毕。

    要点

    1. join 方法合适解决数据流程多调度者的设计
    2. fromCallable 生成一个可调度的函数,该把函数的结果输出
    3. flatMap 和 map 是最常用的操作函数
      • map 合适返回一个其他类型数据函数,但不能是 Observable 的
      • flatMap 合适返回一个流的函数,并把这个流产生的数据合并输出到下一个 Endpoint
    4. flatMap – merge 构成 map-reduce 计算模型
      • reduce 操作函数,用于聚合一个流的结果

    4. 小结

    本文用案例说明了 Reactive 数据流驱动计算的执行过程与编程。以前并发处理多个相关服务的无数行线程、信号灯与锁、管理的代码都不见了,变成了简单、优雅的计算模型。

    网上也有无数教程,多数都是从简单应用角度的介绍性文章,其中,绝大多数程序是没有考虑异步与并发同时出现的情况。当然,也有许多精品文章,精品文章是难以读懂的,如同官方文档难以阅读一样。

    Reactive 编程是数据驱动的,而不是“事件”驱动的。Reactive 编程分为数据源准备、数据流建模、调度者分配三个基本设计步骤,才能实现异步并发执行。数据源准备、数据流建模涉及几百个操作函数,异步数据处理绝不是简单的过程;调度者分配虽然只有几个函数,却是 bug 的根源。

    Reactive 编程模型是“微服务架构” 必须掌握的基本设计思想与应用技术,服务编排与组合永远是SOA 的核心!!!

    展开全文
  • 随着对Spring Cloud研究的深入,发现其实在Spring 5之后,它已经包含了一套完整的服务端技术栈,这一套就是它的Reactive Web框架。它从Web容器、请求处理框架等都提供了技术实现,是一套非常值得学习的服务端框架。 ...

    前言

    在Spring 5中的Reactive Web框架是一个包含了一套完整的服务端技术框架,这里面包括Web容器以及Web应用框架,是一套非常值得学习的技术栈。

    6.1 Reactive简介

    Reactive Streams 是 JVM 中面向流的库标准和规范,它包含以下特性:

    • 处理可能无限数量的元素
    • 按顺序处理
    • 组件之间异步传递
    • 强制性非阻塞背压(Backpressure)
      可以说是为了方便进行异步程序开发的一种新的编程模型。经过这样简单的介绍,响应式编程这个概念还是很抽象,不过可以参考这里仔细学习一下。

    在Spring官网中,提供了下图的框架图:
    在这里插入图片描述
    从这张结构图我们可以清晰地区分出基于Servlet api和Reactive各自的技术栈结构,不过在最上层,Spring都给我们封装成了我们在SpringMVC中使用的基于注解的Web层api。如果以前在使用SpringMVC的时候,Controller里面没有耦合太多Servlet api的话,那么迁移过去就是一件很轻松的事情,在新的框架上开发也和以往没有太大的区别,除非使用到了Reactive的新特性。这一点Spring做的非常棒,充分体现出了Spring框架设计的优雅。

    异步框架在处理IO密集型业务上有比较突出的性能优势,因此有了这一套框架后,以后在相关的技术选型上又多了一套方案。Spring Cloud Gateway也要求必须运行在Reactive Web中。

    6.2 Reactive Web框架的设计

    针对Reactive框架,Spring对Server端重新进行了抽象,其中一个最底层的接口就是HttpHandler,它的定义如下:

    public interface HttpHandler {
    	/**
    	 * Handle the given request and write to the response.
    	 * @param request current request
    	 * @param response current response
    	 * @return indicates completion of request handling
    	 */
    	Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response);
    }
    

    但是真正在Web层中,它提供的抽象是WebHandler接口,它的定义如下:

    public interface WebHandler {
    	/**
    	 * Handle the web server exchange.
    	 * @param exchange the current server exchange
    	 * @return {@code Mono<Void>} to indicate when request handling is complete
    	 */
    	Mono<Void> handle(ServerWebExchange exchange);
    }
    

    两者通过HttpWebHandlerAdapter进行适配。结构如下:
    在这里插入图片描述
    比起以往我们熟悉的HttpServletRequest和HttpServletResponse这两个Servlet api,在Reactive框架中的api就再也看不到Servlet身影了。正因为如此,底层的Web容器就不一定要是Servlet容器了,所以Spring为此增加了一个新的基于Netty的内嵌Web容器。这个Web容器只是做了很简单的封装,使用了Netty原生HttpServerCodec对http请求和响应进行decode和encode。接下来的内容需要一些Netty的知识才能理解Spring的这套设计。
    Spring封装了HttpServer、NettyWebServer以及NettyReactiveWebServerFactory来注入和启动整个Netty框架。整体流程大致如下:
    在这里插入图片描述
    过程稍微有些复杂,其中关键的地方就是TcpBridgeServer(TcpServer的子类)中的newHandler方法了:

    @Override
    	public final Mono<? extends NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler) {
    		Objects.requireNonNull(handler, "handler");
    		return Mono.create(sink -> {
    			ServerBootstrap b = options.get();
    			SocketAddress local = options.getAddress();
    			b.localAddress(local);
    			ContextHandler<Channel> contextHandler = doHandler(handler, sink);
    			b.childHandler(contextHandler);
    			if(log.isDebugEnabled()){
    				b.handler(loggingHandler());
    			}
    			contextHandler.setFuture(b.bind());
    		});
    	}
    

    这里就是很熟悉的Netty框架的启动代码了,我们可以看到doHandler方法返回的ContextHandler被添加进Netty的pipeline中,而这里的doHandler已经在TcpBridgeServer中被重写,节选部分代码:

    return ContextHandler.newServerContext(sink,
    					options,
    					loggingHandler,
    					(ch, c, msg) -> {
    						HttpServerOperations ops = HttpServerOperations.bindHttp(ch,
    								handler,
    								c,
    								compressPredicate,
    								msg);
    						if (alwaysCompress) {
    							ops.compression(true);
    						}
    						return ops;
    					})
    			                     .onPipeline(this)
    			                     .autoCreateOperations(false);
    

    这里的onPipeline(this)方法就是设置一个回调,因为ContextHandler是Netty的ChannelInitializer,所以在它initChannel方法中会回调onPipeline设置进去的回调方法用来设置Netty的pipeline,具体被回调的就是TcpBridgeServer中的accept方法:

    @Override
    public void accept(ChannelPipeline p, ContextHandler<Channel> c) {
    	p.addLast(NettyPipeline.HttpCodec, new HttpServerCodec(
    	        options.httpCodecMaxInitialLineLength(),
    	        options.httpCodecMaxHeaderSize(),
    	        options.httpCodecMaxChunkSize(),
    	        options.httpCodecValidateHeaders(),
    	      options.httpCodecInitialBufferSize()));
    	if (ACCESS_LOG_ENABLED) {
    		p.addLast(NettyPipeline.AccessLogHandler, new AccessLogHandler());
    	}
    	p.addLast(NettyPipeline.HttpServerHandler, new HttpServerHandler(c));
    }
    

    于是,经过配置后每个请求的Netty pipeline都会有如下结构的Handler:
    在这里插入图片描述
    由此可以看到,核心是HttpServerHandler,在这里面会将经过Netty的原生HTTP解码器处理过后的请求封装为ChannelOperations,并在ChannelOperations里面调用HttpHandler(在构造NettyWebServer的时候从Spring容器获取)的apply方法,交由web框架来处理请求。相关代码片段如下:

    //136行 HttpServerHandler的channelRead方法中
    parentContext.createOperations(ctx.channel(), msg);
    
    //249行 ContextHandler的createOperations方法中
    channel.eventLoop().execute(op::onHandlerStart);
    
    //380行ChannelOperations的applyHandler方法中
    Mono.fromDirect(handler.apply((INBOUND) this, (OUTBOUND) this))
    			    .subscribe(this);
    

    以上介绍的都是Spring根据Netty实现的Reactive Web容器,同时,Spring Boot也对常见的Servlet容器进行了适配,使得Reactive框架也能运行在Servlet 3.1容器里。至于适配则是通过ServletHttpHandlerAdapter,将Servlet的api转换成对应的Reactive api。

    6.3 Spring WebFlux简介

    从SpringBoot原生的HttpHandlerAutoConfiguration中可以发现,前面介绍的HttpHandler其实是一个Web容器的底层抽象,Spring愿意暴露给应用层的还是6.1节中提到的WebHandler,这两者之间的适配是在HttpHandlerAutoConfiguration里面进行配置的,这里面使用到了WebHttpHandlerBuilder来进行构造WebHandler bean:

    public HttpHandler build() {
    		WebHandler decorated = new FilteringWebHandler(this.webHandler, this.filters);
    		decorated = new ExceptionHandlingWebHandler(decorated,  this.exceptionHandlers);
    		HttpWebHandlerAdapter adapted = new HttpWebHandlerAdapter(decorated);
    		if (this.sessionManager != null) {
    			adapted.setSessionManager(this.sessionManager);
    		}
    		if (this.codecConfigurer != null) {
    			adapted.setCodecConfigurer(this.codecConfigurer);
    		}
    		if (this.localeContextResolver != null) {
    			adapted.setLocaleContextResolver(this.localeContextResolver);
    		}
    		if (this.applicationContext != null) {
    			adapted.setApplicationContext(this.applicationContext);
    		}
    		return adapted;
    	}
    

    即将Spring容器中注入的WebHandler和WebFilter经过FilteringWebHandler和ExceptionHandlingWebHandler两层委派,然后由HttpWebHandlerAdapter进行适配。FilteringWebHandler是一个能够使用WebFilter过滤器的WebHandler,让整个Web容器具备了过滤器功能,可以类比于Servlet中的Filter。从这里就可以得到一些总结和经验,不管是tomcat、原生netty,还是SpringBoot里面封装的EmbeddedNettyServer,或者是SpringMVC等和web服务相关的框架,都应用了责任链的设计模式,可见它在web服务器设计上的重要性。

    在应用层中有一个WebHandler的实现是DispatcherHandler,它则是WebFlux的核心分发处理器。其实从名字上可以看出来它充当的角色和SpringMVC里面的DispatcherServlet是一样的,两者在核心处理的设计上也基本上是一样的。在DispatcherHandler中,同样包含了HandlerMapping、HandlerAdapter等抽象概念,处理逻辑也是按照下图所示:
    在这里插入图片描述
    其实,这样的设计和SpringMVC的DispatcherServlet有着异曲同工之处。因为Spring在web层没有直接强耦合SpringMVC的api,而是提供了Spring Web层,这也充分体现了Spring在设计框架时的严谨和优雅。如果我们在自己web层的代码遵循它的设计思想的话,没有耦合像ModelAndView、HttpServletRequest、HttpServletResponse(其实也没有必要耦合)等api的话,从理论上讲是很容易迁移到WebFlux上的。因为我们项目中要使用到Spring Cloud Gateway这种IO密集型业务,所以在技术选型的时候,想尝试一下异步框架可能带来的性能或者吞吐量的提升,所以在一开始便考虑使用WebFlux。其实对于以往使用SpringMVC的业务,也并非就需要迁移,也都是根据实际情况而定的。在Spring官网有更多关于WebFlux的背景和特性介绍。可以关注一下它关于Applicability和Performance的描述。

    从WebFlux的设计上来看,它是不依赖Servlet容器的,虽然Spring也针对Tomcat,Jetty等Web容器进行了适配,但是我觉得Spring内置的Netty Server里应该才是它最合适的选择。所以这里就要求classpath下面不能有Servlet相关框架的依赖,也不能使用官方提供的方式来打war包在外置的Servlet容器中启动一个Spring Boot工程。

    小结

    至此,Spring Boot中内嵌的Netty容器已经分析完了,可以看出来,这个Web容器很轻薄,只是基于Netty原生提供的HTTP编解码handler上面封装了一层HttpHandler。不过在如何将这个HttpHandler作用进Netty的pipeline中,Spring做了一些设计,有一些复杂。不过理解了配置后的pipeline最终结构,就可以清楚它的工作原理了,这样的设计也对我们使用Netty提供了一个很好的参考示例,所以个人觉得很有学习的价值。

    展开全文
  • Reactive 架构才是未来

    千次阅读 2020-05-21 13:33:52
    Reactive 编程模型有哪些价值?它的原理是什么?如何正确使用?本文作者将根据他学习和使用的经历,分享 Reactive 的概念、规范、价值和原理。欢迎同学们共同探讨、斧正。 ReactiveReactive programming ...

    简介: Reactive 编程模型有哪些价值?它的原理是什么?如何正确使用?本文作者将根据他学习和使用的经历,分享 Reactive 的概念、规范、价值和原理。欢迎同学们共同探讨、斧正。image.png

    Reactive 和 Reactive programming

    Reactive 直接翻译的意思式反应式,反应性。咋一看,似乎不太好懂。

    举个例子:在 Excel 里,C 单元格上设置函数 Sum(A+B),当你改变单元格 A 或者单元格 B 的数值时,单元格 C 的值同时也会发生变化。这种行为就是 Reactive。

    在计算机编程领域,Reactive 一般指的是 Reactive programming。指的是一种面向数据流并传播事件的异步编程范式(asynchronous programming paradigm)。

    先举个例子大家感受一下:

    public static void main(String[] args) {
      FluxProcessor<Integer, Integer> publisher = UnicastProcessor.create();
      publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe();
    
      publisher.onNext(1); // print 'receive event: 1'
      publisher.onNext(2); // print 'receive event: 2'
    }

    代码 1

    以上例代码(使用 Reactor 类库)为例,publisher 产生了数据流 (1,2),并且传播给了 OnNext 事件, 上例中 lambda 响应了该事件,输出了相应的信息。上例代码中生成数据流和注册/执行 lambda 是在同一线程中,但也可以在不同线程中。

    注:如果上述代码执行逻辑有些疑惑,可以暂时将 lambda 理解成 callback 就可以了。

    Reactive Manifesto

    对于 Reactive 现在你应该大致有一点感觉了,但是 Reactive 有什么价值,有哪些设计原则,估计你还是有些模糊。这就是 Reactive Manifesto 要解决的疑问了。

    使用 Reactive 方式构建的系统具有以下特征:

    即时响应性 (Responsive)

    只要有可能, 系统就会及时地做出响应。即时响应是可用性和实用性的基石, 而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。即时响应的系统专注于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。这种一致的行为转而将简化错误处理、 建立最终用户的信任并促使用户与系统作进一步的互动。

    回弹性 (Resilient)

    系统在出现失败时依然保持即时响应性。这不仅适用于高可用的、 任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。回弹性是通过复制、 遏制、 隔离以及委托来实现的。失败的扩散被遏制在了每个组件内部, 与其他组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。每个组件的恢复都被委托给了另一个(外部的)组件, 此外,在必要时可以通过复制来保证高可用性。(因此)组件的客户端不再承担组件失败的处理。

    弹性 (Elastic)

    系统在不断变化的工作负载之下依然保持即时响应性。反应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。这意味着设计上并没有争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。通过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。

    消息驱动 (Message Driven)

    反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。这一边界还提供了将失败作为消息委托出去的手段。使用显式的消息传递,可以通过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。使用位置透明的消息传递作为通信的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。非阻塞的通信使得接收者可以只在活动时才消耗资源, 从而减少系统开销。

    image.png

    注:

    • 上面描述有很多专有名词,可能有些疑惑,可以看下相关名词解释。
    • 为什么使用 Reactive 方式构建的系统会具有以上价值, 我稍后在 Reactor 章节中介绍。

    Reactive Stream

    知道了 Reactive 的概念,特征和价值后,是否有相关的产品或者框架来帮助我们构建 Reactive 式系统呢?在早些时候有一些类库 (Rxjava 1.x, Rx.Net) 可以使用,但是规范并不统一,所以后来 Netfilx, Pivotal 等公司就制定了一套规范指导大家便于实现它(该规范也是受到早期产品的启发),这就是 Reactive Stream 的作用。

    Reactive Stream 是一个使用非阻塞 back pressure(回压)实现异步流式数据处理的标准。目前已经在 JVM 和 JavaScript 语言中实现同一套语意的规范;以及尝试在各种涉及到序列化和反序列化的传输协议(TCP, UDP, HTTP and WebSockets)基础上,定义传输 reactive 数据流的网络协议。

    The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.

    Reactive Streams 解决的问题场景

    当遇到未预料数据流时,依然可以在可控资源消耗下保持系统的可用性。

    Reactive Streams 的目标

    控制在一个异步边界的流式数据交换。例如传递一个数据到另外一个线程或者线程池,确保接收方没有 buffer(缓存)任意数量的数据。而 back pressure(回压)是解决这种场景的不可或缺的特性。

    Reactive Streams 规范适用范围

    此标准只描述通过回压来实现异步流式数据交换的必要的行为和实体,最小接口,例如下方的 Publisher, Subscriber。Reactive Streams 只关注在这些组件之间的流式数据中转,并不关注流式数据本身的组装,分割,转换等行为, 例如 map, zip 等 operator。Reactive Streams 规范包括:

    Publisher

    产生一个数据流(可能包含无限数据), Subscriber 们可以根据它们的需要消费这些数据。

    public interface Publisher<T> {    
      public void subscribe(Subscriber<? super T> s);
    }

    Subscriber

    Publisher 创建的元素的接收者。监听指定的事件,例如 OnNext, OnComplete, OnError 等。

    publicinterface Subscriber<T> {
      public void onSubscribe(Subscription s);
      public void onNext(T t);
      public void onError(Throwable t);
      public void onComplete();
    }

    Subscription

    是 Publisher 和 Subscriber 一对一的协调对象。Subscriber 可以通过它来向 Publisher 取消数据发送或者 request 更多数据。

    public interface Subscription {  
      public void request(long n);  
      public void cancel();
    }

    Processor

    同时具备 Publisher 和 Subscriber 特征。代码1中 FluxProcessor 既可以发送数据(OnNext),也可以接收数据 (doOnNext)。

    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

    为什么规范强调使用非阻塞异步方式而不是阻塞同步方式?

    • 同步方式一般通过多线程来提高性能,但系统可创建的线程数是有限的,且线程多以后造成线程切换开销。
    • 同步方式很难进一步提升资源利用率。
    • 同步调用依赖的系统出现问题时,自身稳定性也会受到影响。

    实现非阻塞的方式有很多种,为什么规范会选择上述的实现方式呢?

    Thread

    • thread 不是非常轻量(相比下面几种实现方案)。
    • thread 数量是有限的,最终可能会成为主要瓶颈。
    • 有一些平台可能不支持多线程。例如:JavaScript。
    • 调试,实现上有一定复杂性。

    Callback

    • 多层嵌套 callback 比较复杂,容易形成"圣诞树" (callback hell)。
    • 错误处理比较复杂。
    • 多用于 event loop 架构的语言中,例如:JavaScript。

    Future

    • 无法逻辑组合各种行为,支持业务场景有限。
    • 错误处理依然复杂。

    Reactive Extensions (Rx)

    • 和 Future 很相似。Future 可以认为返回一个独立的元素,而 Rx 返回一个可以被订阅的 Stream。
    • 多平台支持同一套规范。
    • 同一套 API 同时支持异步、同步。
    • 错误处理方便。

    Coroutines

    • kotlin coroutine 和 goroutine 在语法层面上提供异步支持, 而且比Rx更简洁,但无法跨多个语言平台形成统一的规范。

    Reactive 的实现原理个人认为还是回调,kotlin 协程实现原理同样也是回调。但实现回掉的方式不一样。一个是通过事件传播, 一个是通过状态机。但 cooutine 编程的易用性明显强于 Rx,后面有空我会专门写篇文章介绍 kotlin coroutine 的实现原理。

    Reactor

    有了 Reactive Stream 这个规范,就会有相应实现该规范的类库。Reactor 就是其中之一。

    Reactor 是遵守 Reactive Stream 规范构建非阻塞应用的 Java 语言 Reactive 类库,已经在 spring 5 中集成,与他相似的类库有 RxJava2, RxJs, JDK9 Flow 等。

    阿里内部的 Faas 系统目前使用 Reactor 来构建整个系统,包括函数应用和各种核心应用(逻辑架构)。根据我们压测结果显示,使用 Reactive 方式构建的系统确实会有这些特点:

    • 回弹性 (Resilient):当函数出现严重超时时 (rt >= 10s),函数上游的 broker, gateway 应用几乎无任何影响。
    • 及时响应性:不管是高并发场景(资源足够),还是正常场景,RT 表现一致。

    另外从原理上,我认为资源利用率和吞吐量也会高于非反应式的应用。

    为什么 Reactive 的架构系统有这些特点?

    阿里内部的 Faas 系统主要做了两件事情:

    涉及到 IO 的地方几乎全异步化。例如中间件(HSF, MetaQ 等提供异步 API)调用。

    IO 线程模型变化。使用较少(一般 CPU 核数)线程处理所有的请求。

    传统 Java 应用 IO 线程模型

    参考 Netty 中 Reactor IO (worker thread pool) 模型,下方伪代码(kotlin)进行了简化。

    image.png

    // 非阻塞读取客户端请求数据(in), 读取成功后执行lambda.
    inChannel.read(in) {
        workerThreadPool.execute{
          // 阻塞处理业务逻辑(process), 业务逻辑在worker线程池中执行,同步执行完后,再向客户端返回输出(out)
          val out = process(in)
          outChannel.write(out)
        }  
    }

    Reactive 应用 IO 线程模型
    IO 线程也可以执行业务逻辑 (process),可以不需要 worker 线程池。
    image.png

    // 非阻塞读取客户端请求数据(in), 读取成功后执行lambda
    inChannel.read(in) {
        // IO线程执行业务逻辑(process),  然后向客户端返回输出(out). 这要求业务处理流程必须是非阻塞的.
        process(in){ out->
            outChannel.write(out) {
                // this lambda is executed when the writing completes
            ...
            }
        }
    }

    如何开始 Reactive Programing

    以 Reactive 方式构建的系统有很多值得学习和发挥价值的地方,但坦白讲 Reactive programing 方式目前接受程度并不高。特别是使用 Java 语言开发同学,我个人也感同身受,因为这和 Java 面向命令控制流程的编程思维方式有较大差异。所以这里以 Reactor (Java) 学习为例:

    • Reactor 基础文档
    • Reactive Streams 规范文档
    • Operator

    总结

    反应式的系统有很多优点,但是完整构建反应式的系统却并不容易。不仅仅是语言上的差异,还有一些组件就不支持非阻塞式的调用方式,例如:JDBC。但是有一些开源组织正在推动这些技术进行革新,例如:R2DBC。另外,为了方便构建反应式系统,一些组织/个人适配了一些主流技术组件 reactor-core, reactor-netty, reactor-rabbimq, reactor-kafka 等,来方便完整构建反应式系统。

    当你的系统从底层到上层,从系统内部到依赖外部都变成了反应式,这就形成了 Reactive 架构。

    这种架构价值有多大?未来可期。

    参考

    https://www.reactivemanifesto.org/
    https://www.reactive-streams.org/
    https://kotlinlang.org/docs/tutorials/coroutines/async-programming.html
    https://projectreactor.io/docs/core/release/reference/index.html

    阿里云开发者社区直播间:Java 系列直播

    Reactive 将对架构设计以及开发方式带来什么样的改变?Java 下 DDD 应该怎么落地?Java 性能优化怎么做?全球顶尖 Java 开发者 & 布道师坐镇,聚焦前沿技术以及全新云原生开发方式,中国最大 Java 用户组联手 Spring 发起 Java 系列直播,与你畅谈。

    识别下方二维码立即收看:

    image.png

    展开全文
  • Reactive的方式访问Redis

    千次阅读 2019-05-15 17:57:43
    本文是以Reactive 对方式访问 Redis ,当然也可以访问mongodb,以及部分关系型数据库,例如 Postgres,H2,Microsoft SQL Sever,目前只支持这些,持续更新请关注(https://spring.io/projects/sprin...

    前言

    本文主要大概介绍一下响应式/反应式编程方式访问 redis,不能解决很多生产问题,只是帮助大家对响应式编程有一个认识。

    本文是以Reactive 对方式访问 Redis ,当然也可以访问mongodb,以及部分关系型数据库,例如 Postgres,H2,Microsoft SQL Sever,目前只支持这些,持续更新请关注(https://spring.io/projects/spring-data-r2dbc),这个子工程是spring为了更好支持关系型数据库开发的。

    响应式编程目前支持最多的是 web 层面,也就是我们springboot 依赖的 spring-boot-starter-webflux

    正文

    通俗解释Reactive: a=b+c ,我们给a 赋值后,再去改变b或者c不会影响a,我们在单元格写一个公式时,这样的值会随着其他值的改变而改变,这就是响应式的一个体现。

    Java操作Redis的库有两个,Jedis和Lettuce,目前SpringBoot 2.x中已经将Jedis换成了Lettuce。

    Lettuce能够支持 Reactive 方式

    Spring Data Redis 中主要的支持

    • ReactiveRedisConnection
    • ReactiveRedisConnectionFactory
    • ReactiveRedisTemplate   

     

    使用所有框架和中间件的版本

    环境
    框架 版本
    Spring Boot                 2.1.3.RELEASE
    redis redis-4.0.11
    JDK 1.8.x

     

    首先,创建一个maven工程,其pom文件如下

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.3.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
    
    <dependencies>
    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
            </dependency>
    </dependencies>

    这样我们就可以使用springboot 2.0+ 自带的  lettuce。

     

    创建一个启动类 名字叫  RedisDemoApplication

     

    @SpringBootApplication
    public class RedisDemoApplication implements ApplicationRunner {
        @Autowired
        private ReactiveStringRedisTemplate redisTemplate;
    
        public static void main(String[] args) {
            SpringApplication.run(RedisDemoApplication.class, args);
        }
    
        @Bean
        ReactiveStringRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory factory) {
            return new ReactiveStringRedisTemplate(factory);
        }
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            ReactiveHashOperations<String, String, String> hashOps = redisTemplate.opsForHash();
            Mono mono1 = hashOps.put("apple", "x", "6000");
            mono1.subscribe(System.out::println);
            Mono mono2 = hashOps.put("apple", "xr", "5000");
            mono2.subscribe(System.out::println);
            Mono mono3 = hashOps.put("apple", "xs max", "8000");
            mono3.subscribe(System.out::println);
        }
    }

     

    如果说我们想连接 reids的话,那么现在已经做完了。

    测试一下

    启动上面的程序,回到redis客户端,输出如下命令

    127.0.0.1:6379> hgetall apple
    1) "x"
    2) "6000"
    3) "xr"
    4) "5000"
    5) "xs max"
    6) "8000"
    127.0.0.1:6379> 
    

     

    我们看到网上其他博主的文章有些累赘,我觉得很多可以用默认就用默认的,可以不写的那就不写,我连个配置文件都没用不是一样连接到redis嘛。

    另外响应式编程目前我还没有听说哪家企业普及,但是这应该是未来的趋势,我们可能对返回 Flux 和Mono 有些不习惯,放心,一定有你习惯的一天。

     

    关于 响应式编程的其他操作网上有很多,可以访问如下

    https://blog.csdn.net/liubenlong007/article/details/86541913

    https://www.jianshu.com/p/5172c48cb877

    http://www.pianshen.com/article/868527566/

     

     

     

     

     

     

     

     

     

     

    展开全文
  • Spring 5发布了一个非常重要的模块,名字叫做:spring-webflux。该模块平级的就是spring-webmvc。     具体能做什么呢?自然是mvc不擅长的事情了。自然是人们一直希望实现,但总是比较困难的功能了。...
  • Reactive简介

    2020-07-26 23:31:52
    理解Reactive 相关技术 反应堆模式(Reactor) 同步非阻塞,多工模式,一个事情可以分为几个步骤,每个步骤相应去做,同步串行先做A,后做B Proactor模式 异步非阻塞,多工模式,A,B,C同时去做,异步去做。 ...
  • Reactive编程(一):Reactive编程的背景

    万次阅读 2018-03-04 17:24:05
    Reactive编程很有趣,现在也有各种各样的讨论,概念上不是很容易理解。本文会以具体的形式介绍相关的概念。Reactive编程跟并发和高性能在概念上有一些重合,但在原理上完全不同。Reactive编程跟函数式编程是非常类似...
  • Reactive 到 WebFlux 1

    2019-03-02 14:24:51
    OK,还记得我开博客的第一篇文章就是想好好学习一下WebFlux,前段...在讲之前我想提一些问题,我们是否经常在网上看到这样一些关于reactive的讲法: 1.Reactive 是异步非阻塞编程 2.Reactive 能够提升程序性能 3...
  • 如果使用RxJava的jar包,出现这样的提示,那是因为还缺少一个jar包 还需要reactive-streams.jar包
  • Vue项目启动报错整理2

    万次阅读 2018-08-22 13:17:40
    Vue项目启动报错整理 1、如若发生如下 报错: vue.esm.js?efeb:574[Vue warn]: Property or method "pics" is not defined on the ... Make sure that this property is reactive, either in the...
  • <dependency> <groupId>org.springframework.boot</groupId> <...spring-boot-starter-data-redis-reactive</artifactId> </dependency> <dependency> ...
  • Reactive programming包括Object-oriented reactive programming 和 Functional reactive programming Object-oriented reactive programming (OORP) 面向对象响应编程是结合面向对象编程(OOP)和响应式...
  • Cannot set reactive property on undefined, null, or primitive value: //无法对未定义的值、空值或基元值设置反应属性: 比如我们在写一个表单,提交成功后要清空表单 我把数据绑在上面了方便看,确定提交成功...
  • Vue 封装数据时报该错误? vue.set(参数1,参数2,参数3); 参数1:对象.数组 参数2:下标 i 参数3:数组【i】 这是正确得格式 ,格式不正确会报错
  • 原文:http://www.atmarkit.co.jp/fdotnet/introrx/introrx_01/introrx_01_02.html作者:河合 宜文安装方法关于 Rx 的安装,可以通过 Reactive Extensions (Rx) 的主页 的直接下载安装,当然也可以利用 NuGet 导入 ...
  • Avoid adding reactive properties to a Vue instance or its root $data at runtime - declare it upfront in the data option. 报错代码 为啥会报错? 知识点 受 ES5 的限制,Vue.js 不能检测到对象属性的添加...
  • 【好文收藏】Reactive Extensions入门

    千次阅读 2016-11-24 22:41:53
    【好文收藏】Reactive Extensions入门最近想用ReactiveUI.NET来做一个新项目,网上找了一些比较不错的资料,赶紧收藏。中文博客 Reactive Extensions入门 Reactive Extensions入门(1):LINQ和Rx简单介绍 Reactive ...
  • reactive stream协议详解

    千次阅读 2020-05-22 10:02:35
    Stream大家应该都很熟悉了,java8中为所有的集合类都引入了Stream的概念。优雅的链式操作,流式处理逻辑,相信用过的人都会爱不释手。 每个数据流都有一个生产者一个消费者。生产者负责产生数据,而消费者负责消费...
  • spring boot 2.1 之前集成的是同步阻塞的redis,这里讲述集成异步非阻塞的redis,响应式redis集成
  • Reactive Streams介绍

    千次阅读 2015-12-22 16:50:39
    本文来源于我在InfoQ中文站翻译的文章,原文地址是:http://www.infoq.com/cn/news/2015/12/reactive-streams-introduction现代软件对近乎实时地处理数据的需求越来越强烈。对变化的信息的即时响应中蕴含着巨大的...
1 2 3 4 5 ... 20
收藏数 21,050
精华内容 8,420
关键字:

reactive