学react前需掌握什么知识
2018-10-04 08:27:00 weixin_33919941 阅读数 19

本文主要是介绍响应式异步编程库Reactor的使用
响应式流简介

When the publisher is faster than the subscriber, the latter must have an unbounded buffer to store fast incoming items or it must drop items it cannot handle.Another solution is to use a strategy called backpressure in which the subscriber tells the publisher to slow down and hold the tems until the subscriber is ready to process more. Using backpressure may require the publisher to have an unbounded buffer if it keeps producing and storing elements for slower subscribers.The publisher may implement a bounded buffer to store a limited number of elements and may choose to drop them if its buffer is full.

What does the subscriber do when it requests items from the publisher and the items are not available?In a synchronous request, the subscriber must wait, possibly indefinitely, until items are available. If the publisher sends items to the subscriber synchronously and the subscriber processes them synchronously, the publisher must block until the data processing finishes. The solution is to have an asynchronous processing at both ends, where the subscriber may keep working on other tasks after requesting items from the publisher. When more items are ready, the publisher sends them to the subscriber asynchronously.

Reactive Streams started in 2013 as an initiative for providing a standard for asynchronous stream processing with non-blocking backpressure. It is aimed at solving the problems of processing a stream of items—how do you pass a stream of items from a publisher to a subscriber without requiring the publisher to block or the subscriber to have an unbounded buffer or drop.

The Reactive Streams model is very simple—the subscriber sends an asynchronous request to the publisher for N items. The publisher sends N or fewer items to the subscriber asynchronously.

Reactive Streams dynamically switches between the pull model and the push model streamprocessing mechanisms. It uses the pull model when the subscriber is slower and uses the push model when the subscriber is faster.

Reactor介绍
webflux与webmvc的类比:

webmvc webflux
controller handler
request mapping router
   <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.1.4.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <version>3.1.4.RELEASE</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>

一,Flue和Mono的简单用法

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;


public class SimpleReactor 
{
    private static void testConstructUsingJust()
    {
        //subscribe方法中的lambda表达式作用在了每一个数据元素上
        Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
        System.out.println();//回车的作用
        Mono.just(1).subscribe(System.out::println);
    }
    
    private static void testConstructFromArray()
    {
        Integer[] array = new Integer[]{1,2,3,4,5,6};
        Flux.fromArray(array).subscribe(x -> {
            System.out.println("收到 "+ x);
            });     
    }
    
    private static void testConstructFromList()
    {
        List<Integer> list = Arrays.asList(1,2,3,4,5,6);
        Flux<Integer> flux = Flux.fromIterable(list);   
        flux.subscribe(
                System.out::println,
                System.err::println,
                () -> System.out.println("Completed!"));
    }
    
    private static void testConstructFromStream()
    {
        Stream<Integer> stream = Arrays.asList(1,2,3,4,5,6).stream();
        Flux.fromStream(stream).subscribe(System.out::print);
    }
    
    private  static void testMonoError()
    {
        Mono.error(new Exception(" 注意注意,发生异常,注意处理啦")).subscribe(
                System.out::println,
                System.err::println,
                () -> System.out.println("Completed!")
        );
    }
    
    public static void main( String[] args )
    {
        testConstructUsingJust();
        nextTest();
        testConstructFromArray();
        nextTest();
        testConstructFromList();
        nextTest();
        testMonoError();
        nextTest();
        testConstructFromStream();
    }
    
    private static void nextTest()
    {
        System.out.println("********************************");
    }
}

二、Reactor中如何做單元測試


import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class SimpleReactorTest
{
    private Flux<Integer> generateFluxFrom1To6()
    {
        return Flux.just(1, 2, 3, 4, 5, 6);
    }

    private Mono<Integer> generateMonoWithError()
    {
        return Mono.error(new Exception("some error"));
    }

    @Test
    public void testViaStepVerifier()
    {
        StepVerifier.create(generateFluxFrom1To6()).expectNext(1, 2, 3, 4, 5, 6)
                .expectComplete().verify();
        StepVerifier.create(generateMonoWithError())
                .expectErrorMessage("some error").verify();
    }
}

三、Flux和Mono也支持map、flatMap、Filter、zip等operator

7007629-3b081c176d1befce.png
flatMap示意图

@Test
    public void testMapAndFlatMap()
    {
        // 注意下面的6表示6個,和IntStream的Range方法里面不一样
                StepVerifier.create(Flux.range(1, 6).map(i -> i * i))
                        .expectNext(1, 4, 9, 16, 25, 36).expectComplete();

        StepVerifier
                .create(Flux.just("flux", "mono")
                        .flatMap(s -> Flux.fromArray(s.split("\\s*"))
                                .delayElements(Duration.ofMillis(100)))
                        .doOnNext(System.out::print))
                .expectNextCount(8).verifyComplete();
    }

    @Test
    public void testFilter()
    {
        StepVerifier.create(Flux.range(1, 6).filter(i -> i % 2 == 1) // 1
                .map(i -> i * i)).expectNext(1, 9, 25) // 2
                .verifyComplete();
    }

    private Flux<String> getZipDescFlux()
    {
        String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
        return Flux.fromArray(desc.split("\\s+")); // 1
    }

    @Test
    public void testZip() throws InterruptedException
    {       
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //使用Flux.interval声明一个每200ms发出一个元素的long数据流;因为zip操作是一对一的,故而将其与字符串流zip之后,字符串流也将具有同样的速度;
        Flux.zip(getZipDescFlux(), Flux.interval(Duration.ofMillis(200)))
                .subscribe(
                        t -> System.out.println(t.getT1()), 
                        null,
                        countDownLatch::countDown); // 4
        countDownLatch.await(10, TimeUnit.SECONDS); // 5
    }

有些内容还没有研究完,请接着看 http://blog.51cto.com/liukang/2090191或者https://www.ibm.com/developerworks/cn/java/j-cn-with-reactor-response-encode/index.html

学react前需掌握什么知识 相关内容

2019-01-09 17:12:30 libaineu2004 阅读数 221

什么是Reactor模式

要回答这个问题,首先当然是求助Google或Wikipedia,其中Wikipedia上说:“The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.”。从这个描述中,我们知道Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。如果用图来表达:

从结构上,这有点类似生产者消费者模式,即有一个或多个生产者将事件放入一个Queue中,而一个或多个消费者主动的从这个Queue中Poll事件来处理;而Reactor模式则并没有Queue来做缓冲,每当一个Event输入到Service Handler之后,该Service Handler会主动的根据不同的Event类型将其分发给对应的Request Handler来处理。

讲到高性能IO绕不开Reactor模式,它是大多数IO相关组件如Netty、Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢?

最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:
while(true){ 
socket = accept(); 
handle(socket) 

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。
之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:
while(true){ 
socket = accept(); 
new thread(socket); 

tomcat服务器的早期版本确实是这样实现的。多线程的方式确实一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。最开始对这句话很不理解,线程中创建多个socket不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。
缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。
线程池本身可以缓解线程创建-销毁的代价,这样优化确实会好很多,不过还是存在一些问题的,就是线程的粒度太大。每一个线程把一次交互的事情全部做了,包括读取和返回,甚至连接,表面上似乎连接不在线程里,但是如果线程不够,有了新的连接,也无法得到处理,所以,目前的方案线程里可以看成要做三件事,连接,读取和写入。
线程同步的粒度太大了,限制了吞吐量。应该把一次连接的操作分为更细的粒度或者过程,这些更细的粒度是更小的线程。整个线程池的数目会翻倍,但是线程更简单,任务更加单一。这其实就是Reactor出现的原因,在Reactor中,这些被拆分的小线程或者子过程对应的是handler,每一种handler会出处理一种event。这里会有一个全局的管理者selector,我们需要把channel注册感兴趣的事件,那么这个selector就会不断在channel上检测是否有该类型的事件发生,如果没有,那么主线程就会被阻塞,否则就会调用相应的事件处理函数即handler来处理。典型的事件有连接,读取和写入,当然我们就需要为这些事件分别提供处理器,每一个处理器可以采用线程的方式实现。一个连接来了,显示被读取线程或者handler处理了,然后再执行写入,那么之前的读取就可以被后面的请求复用,吞吐量就提高了。

【Java】Reactor模式

几乎所有的网络连接都会经过读请求内容——》解码——》计算处理——》编码回复——》回复的过程,Reactor模式的的演化过程如下:

这种模型由于IO在阻塞时会一直等待,因此在用户负载增加时,性能下降的非常快。

server导致阻塞的原因:

1、serversocket的accept方法,阻塞等待client连接,直到client连接成功。

2、线程从socket inputstream读入数据,会进入阻塞状态,直到全部数据读完。

3、线程向socket outputstream写入数据,会阻塞直到全部数据写完。

 

改进:采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。

Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理。

Handler:负责处理非阻塞的行为,标识系统管理的资源;同时将handler与事件绑定。

Reactor为单个线程,需要处理accept连接,同时发送请求到处理器中。

由于只有单个线程,所以处理器中的业务需要能够快速处理完。

改进:使用多线程处理业务逻辑。

将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程。

继续改进:对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。

Using Multiple Reactors

mainReactor负责监听连接,accept连接给subReactor处理,为什么要单独分一个Reactor来处理监听呢?因为像TCP这样需要经过3次握手才能建立连接,这个建立连接的过程也是要耗时间和资源的,单独分一个Reactor来处理,可以提高性能。

 

---

参考资料

https://www.cnblogs.com/dirt2/p/5590320.html

https://www.cnblogs.com/doit8791/p/7461479.html

学react前需掌握什么知识 相关内容

2019-02-16 12:08:58 qq_36191137 阅读数 91

在网上看了很多reactor 模式,每个都是各有千秋,这里我写一下自己对reactor 模式感悟。


1. Reactor模式是什么

反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。

2.什么场景下使用Reactor模式?

对于高并发系统,常会使用Reactor模式,其代替了常用的多线程处理方式,节省系统的资源,提高系统的吞吐量。

3.Reactor模式简介

Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢?

4. 多线程IO的致命缺陷

最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:

while(true){

socket = accept();

handle(socket)

}

这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

package com.crazymakercircle.iodemo.base;

import com.crazymakercircle.config.SystemConfig;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

class BasicModel implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            //创建新线程来handle
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[SystemConfig.INPUT_SIZE];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] input) {
            byte[] output=null;
            /* ... */
            return output;
        }
    }
}

对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

tomcat服务器的早期版本确实是这样实现的。

  • 多线程并发模式,一个连接一个线程的优点是:

一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。

  • 多线程并发模式,一个连接一个线程的缺点是:

缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

改进方法是:

采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

学react前需掌握什么知识 相关内容

2017-08-25 19:49:01 Move_now 阅读数 768

Reactor模式(反应堆模式):

这便是libevent的中心思想。在常规的I/O多路复用中采用select和poll、epoll等来实现,而将这些机制封装而成的就是I/O多路复用模式,Reactor就是其中之一。通俗的来讲,它就是通过回调机制实现的。我们只需将事件的接口注册到Reactor上,当事件发生之后,会回调注册的接口。

比如你订闹钟明早六点半起床,那么在六点半之前你就可以安心睡觉,到了六点半,你的闹钟就会准时工作,叫你起床,而不是你一直等待你的闹钟响。订闹钟这个动作可以视为注册事件,而cpu(你)不用一直等它触发,而是可以处理其他任务,等到条件触发了,该事件才会被处理。

Reactor的组成:

Reactor模式需要有几个必要的组件:

  1. event demultiplexer(事件多路分发器):即一些I/O复用机制,如select、poll、epoll等。程序将事件源及事件注册到分发器上,等该事件触发时,就可以进行对应的处理了。拿epoll为例,假如采用的是epoll,那么其内部分别调用的接口其实就是epoll_create还有epoll_ctl之类的,不过做了层封装。
  2. handle(事件源):用于识别每一个事件,在linux上是文件描述符。
  3. reactor(反应器):它用于管理事件的调度以及注册删除事件等。当有激活的事件时,则调用其回调函数处理,如果没有的话,就运行事件主循环。在libevent中,对应event_base
  4. event handler(事件处理器):管理已经注册的事件和已经激活等待调度的事件,并分成了不同类型的事件(如读/写、信号),当事件发生时,会提供对应的处理程序。由Reactor调用。在libevent中,对应struct event

处理步骤

首先初始化Reactor管理器,然后注册事件并将事件的回调函数保存在事件处理器上,等待事件的发生,事件发生之后,便调度事件处理器,最后调用相应的回调函数处理事件。

大致的步骤如下:
1. 初始化一个Reactor管理器
2. 初始化事件处理器,设置事件源及回调函数
3. 将事件处理器注册到Reactor管理器上
4. 注册该事件
5. 进入循环等待事件发生并处理

其他

既然都提到了Reactor模式,那么Proactor模式也可以顺便一起了解下。Proactor模式是一种异步I/O模式,而Reactor模式是一种同步I/O模式,。
异步I/O是指从你发起I/O请求到完成I/O这些过程都是内核帮你完成了,你只用在调用了读/写操作之后,等着系统通知你I/O已经完成了然后你再做后序操作就行了,所以异步I/O是不会导致进程阻塞的。然而同步I/O即当I/O操作可以操作时,再提醒你进行操作。

接下来,我们先从大体上观摩一下libevent的源文件结构。

学react前需掌握什么知识 相关内容

2019-05-03 11:27:42 ndzjx 阅读数 185

 

Proactor和Reactor都是并发编程中的设计模式。他们都是用于派发/分离IO操作事件的。所谓的IO事件也就是诸如read/write的IO操作。"派发/分离"就是将单独的IO事件通知到上层模块。两个模式不同的地方在于,Proactor用于异步IO,而Reactor用于同步IO。

一般地,I/O多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)。分离器对象可将来自事件源的I/O事件分离出来,并分发到对应的read/write事件处理器(Event Handler)。开发人员预先注册需要处理的事件及其事件处理器(或回调函数);事件分离器负责将请求事件传递给事件处理器。两个与事件分离器有关的模式是Reactor和Proactor。


Reactor反应器模式


Reactor模式又叫反应器或反应堆。平时接触的开源产品如libevent/libev/libuv/ZeroMQ/ACE,事件模型都使用的Reactor模式;
在Reactor中,事件分离器负责等待文件描述符或socket为读写操作准备就绪,然后将就绪事件传递给对应的处理器,最后由处理器负责完成实际的读写工作。Reactor模式已经被广泛使用,著名的开源事件库libevent、libev、libuv都是使用Reactor模式。
Reactor处理耗时长的操作(如文件I/O)会造成事件分发的阻塞,影响到后续事件的处理。因此涉及到文件I/O相关的操作,需要使用异步I/O,即使用Proactor模式效果更佳。

Reactor包含如下角色:“反应”器名字中”反应“的由来;“反应”即“倒置”,“控制逆转”


Proactor前摄器模式


Proactor模式又叫前摄器或主动器模式。它用于实现异步I/O模型
在Proactor模式中,处理器--或者兼任处理器的事件分离器,只负责发起异步读写操作。IO操作本身由操作系统来完成。传递给操作系统的参数需要包括用户定义的数据缓冲区地址和数据大小,操作系统才能从中得到写出操作所需数据,或写入从socket读到的数据。事件分离器捕获IO操作完成事件,然后将事件传递给对应处理器。比如,在windows上,处理器发起一个异步IO操作,再由事件分离器等待IOCompletion事件。典型的异步模式实现,都建立在操作系统支持异步API的基础之上,我们将这种实现称为“系统级”异步或“真”异步,因为应用程序完全依赖操作系统执行真正的IO工作。boost的asio是proactor。


Reactor模式和Proactor模式的主要区别


1. Reactor实现同步I/O多路分发,Proactor实现异步I/O分发。
如果只是处理网络I/O单线程的Reactor尚可处理,但如果涉及到文件I/O,单线程的Reactor可能被文件I/O阻塞而导致其他事件无法被分发。所以涉及到文件I/O最好还是使用Proactor模式,或者用多线程模拟实现异步I/O的方式。


2. Reactor模式注册的是文件描述符的就绪事件,而Proactor模式注册的是完成事件。
即Reactor模式有事件发生的时候要判断是读事件还是写事件,然后用再调用系统调用(read/write等)将数据从内核中拷贝到用户数据区继续其他业务处理。
而Proactor模式一般使用的是操作系统的异步I/O接口,发起异步调用(用户提供数据缓冲区)之后操作系统将在内核态完成I/O并拷贝数据到用户提供的缓冲区中,完成事件到达之后,用户只需要实现自己后续的业务处理即可。
3. 主动和被动
Reactor模式是一种被动的处理,即有事件发生时被动处理。而Proator模式则是主动发起异步调用,然后循环检测完成事件。


观察者模式和Recactor模式,Proactor模式的主要区别


观察者模式,也叫发布-订阅模式,主要是适用于对象间一对多的依赖关系,通常用作消息分发和处理。而Reactor模式和Proactor模式主要用于高效的io模式,明显的特征是“回调”思想的运用,提高效率,避免没有必要的耗时的等待,与对象间的依赖关系无关
---------------------  
作者:ouyangshima  
来源:CSDN  
原文:https://blog.csdn.net/shimazhuge/article/details/5384949  
版权声明:本文为博主原创文章,转载请附上博文链接!

学react前需掌握什么知识 相关内容

没有更多推荐了,返回首页