精华内容
下载资源
问答
  • 主要介绍了Java Reactor反应器模式使用方法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • React堆 带webFlux的示例JavaReact器
  • Reactor API 简单使用(Flux和Mono)1. 常用创建Flux及Mono的方式1.1. 使用just从现有的已知内容和大小的数据创建Flux或Mono//使用数组创建一个被观察者(生产者,Flux)Flux.just(new String[]{"hello",",","nihao","!...

    一. Reactor API 简单使用(Flux和Mono)

    1. 常用创建Flux及Mono的方式

    1.1. 使用just从现有的已知内容和大小的数据创建Flux或Mono

    //使用数组创建一个被观察者(生产者,Flux)

    Flux.just(new String[]{"hello",",","nihao","!"})

    //观察者监听被观察者(消费者)

    .subscribe(System.out::print);

    //使用可变参数创建Flux

    Flux.just("你","好","啊","!")

    .subscribe(System.out::print);

    //使用just创建Mo

    Mono.just("asd").subscribe(System.out::println);

    1.2. 使用fromIterable从可迭代对象中创建Flux

    //从可迭代的对象中创建Flux

    Flux.fromIterable(Arrays.asList("你好",",","fromIter","!"))

    .subscribe(System.out::print);

    var list = new ArrayList(List.of("你","好"));

    Flux flux = Flux.fromIterable(list);

    list.add("啊");//在创建Flux后追加元素

    flux.subscribe(System.out::print);//这里输出: 你好啊

    1.3. 使用fromStream从集合流中创建Flux

    //流也可以是Arrays.asList("a", "b").stream()等方式返回的流

    Flux.fromStream(Stream.of("从","流","中创建","Flux!"))

    .subscribe(System.out::println);

    1.4. 使用range中创建一个范围内迭代的Flux

    Flux.range(0,10).subscribe(System.out::print);

    1.5. 使用interval创建间隔某一时间异步执行的Flux

    Flux.interval(Duration.ofMillis(100))

    //限制执行10次

    .take(10)

    .subscribe(System.out::print);

    //避免主线程提前结束

    Thread.sleep(1100);

    1.6. 从Mono转化而来的Flux

    Mono.just("asd").flux().subscribe(System.out::print);

    1.7. 从多个Mono组合而来的Flux

    Mono.just("Mono1").concatWith(Mono.just("---Mono2"))

    .subscribe(System.out::println);

    1.8. 使用generate动态创建Flux只能执行一次的Flux

    // 同步动态创建,next 只能被调用一次

    Flux.generate(sink -> {

    sink.next("第一次");

    //第二次会报错:

    //java.lang.IllegalStateException: More than one call to onNext

    //sink.next("第二次");

    sink.complete();

    }).subscribe(System.out::print);

    1.9. 使用create动态创建Flux可以执行多次的Flux,及Mono

    // 同步动态创建,next 能被调用多次

    Flux.create(sink -> {

    for (int i = 0; i < 10; i++) {

    sink.next("现在的次数:" + i);

    }

    sink.complete();

    }).subscribe(System.out::println);

    // 同步动态创建Mono

    Mono.create(sink->{

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    sink.success("by create");

    }).subscribe(System.out::println);

    1.10. 使用fromCallable动态创建Mono

    Mono.fromCallable(() -> {

    Thread.sleep(1000);

    return "asd";

    }).subscribe(System.out::println);

    2. 异常处理

    //直接创建一个包含异常的Flux

    Flux.error(new Exception());

    //直接创建一个包含异常的Mono

    Mono.error(new Exception());

    Mono.just("mono")

    //连接一个包含异常的Mono

    .concatWith(Mono.error(new Exception("myExc")))

    //异常监听

    .doOnError(error -> System.out.println("错误: "+ error))

    //在发生异常时将其入参传递给订阅者

    .onErrorReturn("-excRet")

    .subscribe(System.out::println);

    /*最终输出:

    mono

    错误: java.lang.Exception: myExc

    -excRet

    */

    3. 常用方法

    3.1. 使用concatWith合并及concatWithValues追加

    //合并多个Mono为一个Flux

    Mono.just("Mono1").concatWith(Mono.just("---Mono2"))

    .subscribe(System.out::print);

    //连接多个Flux

    Flux.just("连接")

    //连接两个Flux

    .concatWith(Flux.just("两个"))

    //将元素追加到Flux

    .concatWithValues("或追加")

    .subscribe(System.out::print);

    3.2. 使用zipWith组合为元素

    // 结合为元祖,两个取其端的那个,长的那个多余的被舍弃

    Flux s1 = Flux.just("s1-0", "s1-1","s1-2");

    Flux s2 = Flux.just("s2-0", "s2-1");

    s1.zipWith(s2)

    .subscribe(tuple -> System.out.println(tuple.getT1() + " -> " + tuple.getT2()));

    3.3. 使用skip跳过元素

    Flux.just(1,2,3,4,5)

    //跳过前2两个

    .skip(2)

    //输出: 345

    .subscribe(System.out::print);

    3.4. 使用take截取元素

    Flux just = Flux.just("截取", "前几个", "元素");

    //截取前两个元素组成新的flux,不改变原flux

    Flux take = just.take(2);

    //输出: 截取前几个

    take.subscribe(System.out::print);

    System.out.println("\n=====");

    //输出: 截取前几个元素

    just.subscribe(System.out::print);

    3.5. 使用filter过滤元素

    Flux.just(1,2,3,4,5,6,7,8,9)

    //过滤偶数

    .filter(i->i%2==0)

    //输出: 2468

    .subscribe(System.out::print);

    3.6. 使用distinct去重元素

    //默认去重

    Flux.just(1,1,2,2,3,3)

    //去重

    .distinct()

    //输出: 123

    .subscribe(System.out::print);

    //将要去重的自定义的类

    class MyClass{

    public int key;

    public String val;

    MyClass(int k, String v){

    key=k;val=v;

    }

    public String toString(){

    return String.format("{%d, %s} ",key,val);

    }

    }

    Flux.just(new MyClass(1,"asd"),new MyClass(1,"asdf"),new MyClass(2,"asd"))

    //自定义对象的比较键(参与比较的字段)

    .distinct(s->s.key)

    //输出: {1, asd} {2, asd}

    .subscribe(System.out::print);

    3.7. 延迟执行(异步)

    Flux.just("这是","延迟","执行")

    //在一秒后输出: 这是延迟执行

    .delayElements(Duration.ofSeconds(1)).subscribe(System.out::print);

    Thread.sleep(1100);

    3.8. 从Flux获取首个元素

    Flux just = Flux.just("这是", "next", "执行");

    //获取第一个元素为Mono,原Flux中的元素不变

    Mono next = just.next();

    //输出: 这是

    next.subscribe(System.out::println);

    System.out.println("=========");

    //输出: 这是next执行

    just.subscribe(System.out::print);

    3.9. 从Flux阻塞式取一个元素

    Flux flux = Flux.create(skin -> {

    for(int i=0;i<2;++i){

    try {

    Thread.sleep(100);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    skin.next("这是第"+i+"个元素");

    }

    skin.complete();

    });

    //flux订阅者所有操作都是无副作用的,即不会改变原flux对象数据

    //阻塞式订阅,只要有一个元素进入Flux

    String first = flux.blockFirst();

    //输出: 这是第0个元素

    System.out.println(first);

    //还是输出: 这是第0个元素

    System.out.println(flux.blockFirst());

    //输出: 这是第1个元素

    System.out.println(flux.blockLast());

    //还是输出: 这是第1个元素

    System.out.println(flux.blockLast());

    3.10. Flux与Mono之间的相互转换

    Flux flux = Flux.just("asd","asd");

    //自定义收集器转换为Mono

    Mono> collect = flux.collect(Collectors.toList());

    //或使用默认收集器转换为Mono

    Mono> collect2 = flux.collectList();

    //将Mono转换为仅有一个元素的Flux

    Flux> flux2 = collect.flux();

    //将只有一个元素的Flux转换为Mono

    Flux.just("1").single().subscribe(System.out::println);

    3.11. 最终始终会执行的函数

    Flux.just("asd", "qwe").concatWith(Flux.error(new Exception()))

    //当流程完成后的第一件事,由于由异常这里不执行

    .doOnComplete(()-> System.out.println("数据组装完成"))

    .doFinally(t-> System.out.println("最后执行的:"+t))

    .subscribe(System.out::println);

    /*最终输出:

    asd

    qwe

    最后执行的:onError

    */

    4. 常用监听

    每次创建监听都会返回一个新的Flux对象,监听也在新的Flux对象

    4.1. 监听每次消费

    Flux flux = Flux.just("asd", "qwe");

    //每次消费前做什么,入参是将要消费的元素

    flux = flux.doOnNext(s -> System.out.println("当前消费:" + s));

    flux.subscribe(System.out::println);

    /*输出为:

    当前消费:asd

    asd

    当前消费:qwe

    qwe

    */

    4.2. 监听流程完成或错误

    //监听正常流程完成

    Flux.just("1","2").doOnComplete(()-> System.out.println("ok"))

    .subscribe(System.out::println);

    /*最终输出:

    1

    2

    ok

    */

    4.3. 监听消费者

    //消费者参与前的最后一件事,入参为消费者对象

    Flux.just("1","2").doOnSubscribe(System.out::println).subscribe(System.out::println);

    /*最终输出:

    reactor.core.publisher.FluxArray$ArraySubscription@66d18979

    1

    2

    */

    5. 背压简单使用

    使用 Subscription::request 主动控制订阅量

    5.1 原始的 Subscriber::onNext

    //生产者每10毫秒生产一个

    Flux.interval(Duration.ofMillis(10))

    //消费者每50毫秒消费一个

    .subscribe(new Subscriber<>() {

    Subscription subscription;

    AtomicInteger count = new AtomicInteger(0);

    @Override

    public void onSubscribe(Subscription subscription) {

    this.subscription = subscription;

    subscription.request(5);//首先请求5个

    count.set(5);

    }

    @Override

    public void onNext(Long val) {

    System.out.print(" val:"+val);

    try {

    //消费者每100毫秒消费一个

    Thread.sleep(100);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    if(count.decrementAndGet()<=0){

    System.out.println(" 搞完,重新请求5个");

    subscription.request(5);

    count.set(5);

    }

    }

    @Override

    public void onError(Throwable throwable) {

    }

    @Override

    public void onComplete() {

    }

    });

    Thread.sleep(5000);

    /*输出:

    val:0 val:1 val:2 val:3 val:4 搞完,重新请求5个

    val:5 val:6 val:7 val:8 val:9 搞完,重新请求5个

    val:10 val:11 val:12 val:13 val:14 搞完,重新请求5个

    val:15 val:16 val:17 val:18 val:19 搞完,重新请求5个

    val:20 val:21 val:22 val:23 val:24 搞完,重新请求5个

    ......

    */

    5.2. 使用 BaseSubscriber 简化操作

    Flux.range(1,5).log().subscribe(new BaseSubscriber<>() {

    private int count = 0;

    private final int limit = 2;

    @Override

    protected void hookOnSubscribe(Subscription subscription) {

    request(limit);

    }

    @Override

    protected void hookOnNext(Integer value) {

    if (++count == limit) {

    try {

    Thread.sleep(1000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    request(count);

    count = 0;

    }

    }

    });

    /*日志输出:

    16:42:39.401 [main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

    16:42:39.404 [main] INFO reactor.Flux.Range.1 - | request(2)

    16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(1)

    16:42:39.404 [main] INFO reactor.Flux.Range.1 - | onNext(2)

    16:42:40.418 [main] INFO reactor.Flux.Range.1 - | request(2)

    16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(3)

    16:42:40.418 [main] INFO reactor.Flux.Range.1 - | onNext(4)

    16:42:41.419 [main] INFO reactor.Flux.Range.1 - | request(2)

    16:42:41.419 [main] INFO reactor.Flux.Range.1 - | onNext(5)

    16:42:41.420 [main] INFO reactor.Flux.Range.1 - | onComplete()

    */

    5.3. 通过 limitRate 进一步简化并链式操作

    Flux.interval(Duration.ofMillis(100)).take(5)

    .log()

    //每次取2个

    .limitRate(2)

    .subscribe();

    Thread.sleep(1000);

    /*日志输出:

    16:46:30.627 [main] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)

    16:46:30.630 [main] INFO reactor.Flux.Take.1 - request(2)

    16:46:30.747 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0)

    16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - onNext(1)

    16:46:30.840 [parallel-1] INFO reactor.Flux.Take.1 - request(2)

    16:46:30.936 [parallel-1] INFO reactor.Flux.Take.1 - onNext(2)

    16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - onNext(3)

    16:46:31.048 [parallel-1] INFO reactor.Flux.Take.1 - request(2)

    16:46:31.143 [parallel-1] INFO reactor.Flux.Take.1 - onNext(4)

    16:46:31.144 [parallel-1] INFO reactor.Flux.Take.1 - onComplete()

    */

    二. Spring中webflux的应用

    1. 传统Controller的方式应用

    后端 JAVA 代码

    package com.example.wefluxdemo.web;

    import org.springframework.web.bind.annotation.GetMapping;

    import org.springframework.web.bind.annotation.PathVariable;

    import org.springframework.web.bind.annotation.RequestMapping;

    import org.springframework.web.bind.annotation.RestController;

    import reactor.core.publisher.Mono;

    import java.util.HashMap;

    @RestController

    @RequestMapping("/hello")

    public class TestHelloController {

    @GetMapping("/1")

    public Mono hello1(){

    return Mono.create(sink -> {

    var map = new HashMap();

    map.put("haha","hehe");

    map.put("wuwu","yingying");

    sink.success(map);

    });

    }

    @GetMapping("/2")

    public Mono hello2(String s){

    return Mono.just(String.format("{\"s\":\"%s\"}",s));

    }

    @GetMapping("/3/{s}")

    public Mono hello3(@PathVariable String s){

    var map = new HashMap();

    map.put("txt",s);

    return Mono.just(map);

    }

    }

    前端 js 调用服务代码

    async function get(url){

    const resp = await fetch(url)

    if(resp.ok){

    console.log(await resp.json())

    }

    }

    //测试请求

    get("/hello/1");//{"haha":"hehe","wuwu":"yingying"}

    get("/hello/2?s=qwe");//{"s":"qwe"}

    get("/hello/3/asd");//{"txt":"asd"}

    2. Router和Handler的方式应用

    后端 JAVA 代码

    package com.example.wefluxdemo.web;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

    import org.springframework.web.reactive.function.server.*;

    import reactor.core.publisher.Mono;

    import java.util.HashMap;

    @Configuration

    public class TestRouterAndHandler {

    //处理方法正常应该和路由方法分在不同的类中

    public Mono test1(ServerRequest request){

    var map = new HashMap();

    map.put("返回值","Mono");

    map.put("形参","约束必须为ServerRequest");

    map.put("获取查询参数s",request.queryParam("s").get());

    //输出: 当前的SessionId:4fa0eea8-3f44-4e85-b501-2c05970876c2

    request.session().subscribe(s-> System.out.println("当前的SessionId:"+s.getId()));

    return ServerResponse.ok().bodyValue(map);

    }

    @Bean

    public RouterFunction test1Router(){

    //路由/test/1的处理方法为test1

    return RouterFunctions.route(RequestPredicates.GET("/test/1"),this::test1);

    }

    }

    前端 js 调用服务代码

    get("/test/1?s=zxc");

    /*控制台输出:

    {

    "返回值": "Mono",

    "获取查询参数s": "zxc",

    "形参": "约束必须为ServerRequest"

    }

    */

    展开全文
  • java reactor与NIO

    2021-02-28 06:36:31
    reactor什么是Reactor模式Reactor 模式是一种事件驱动架构的实现技术The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one...

    reactor

    什么是Reactor模式

    Reactor 模式是一种事件驱动架构的实现技术

    The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

    我们知道Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的 Request Handler。

    1ef1dc37d81c?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

    标准reactor模式

    Handle

    Handle代表操作系统管理的资源,包括:网络链接,打开的文件,计时器,同步对象等等。在我们的web系统中,Handle代表与客户端连接的套接字,Synchronous Event Demultiplexer在这些套接字上等待事件的发生。

    Synchronous Event Demultiplexer

    在一个Handle集合上等待事件的发生。这里常用系统调用select[1],UNIX和WIN32平台都支持这个系统调用。select的返回结果说明handle上发生情况,需要被处理。

    Initiation Dispatcher

    提供接口:注册,删除和派发Event Handler。上面的Synchronous Event Demultiplexer等待事件的发生,当检测到新的事件,就把事件交给Initiation Dispatcher,它去回调Event Handler。事件种类一般有:接受到连接,数据输入,数据输出,超时。

    Event Handler

    定义一个抽象接口,包含一个钩子方法,实现特定服务的派发操作。这个方法实现了与特定应用相关的服务。

    Concrete Event Handler

    继承上面的类,实现钩子方法。应用把Concrete Event Handler注册到Initiation Dispatcher,等待被处理的事件。当事件发生,这些方法被回调。

    Reactor pattern in Java NIO

    我们将会基于java nio I/O 复用模型 实现reactor模式的demo

    Java NIO 是为了弥补传统 I/O 工作模式的不足而研发的,NIO 的工具包提出了基于 Selector(选择器)、Buffer(缓冲区)、Channel(通道)的新模式;Selector、Channel和 SelectionKey(选择键)配合起来使用,可以实现并发的非阻塞型 I/O 能力。

    1ef1dc37d81c?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

    java reactor实现

    让我们将java reactor的实现与标准意义对应起来:

    Selector (demultiplexer)

    Selector is the Java building block, which is analogous to the demultiplexer in the Reactor pattern. Selector is where you register your interest in various I/O events and the objects tell you when those events occur.

    Reactor/initiation dispatcher

    We should use the Java NIO Selector in the Dispatcher/Reactor. For this, we can introduce our own Dispatcher/Reactor implementation called ‘Reactor’. The reactor comprises java.nio.channels.Selector and a map of registered handlers. As per the definition of the Dispatcher/Reactor, ‘Reactor’ will call the Selector.select() while waiting for the IO event to occur.

    Handle

    对应 SelectionKey.

    Event

    不同的IO events 如 SlectionKey.OP_READ , SelectionKey.OP_ACCEPT,SelectionKey.OP_WRITE,SelectionKey.OP_CONNECT

    Handler

    事件处理器,A handler is often implemented as runnable or callable in Java.

    code time

    https://github.com/kasun04/rnd/tree/master/nio-reactor

    在我们的代码里通过一个map来管理我们关心的事件和对应的事件handler。

    当我们通过Selector.select来轮询到来的事件。并遍历Set.

    while (true) { // Loop indefinitely

    demultiplexer.select();// 是阻塞的

    Set readyHandles =

    demultiplexer.selectedKeys();

    Iterator handleIterator =

    readyHandles.iterator();

    while (handleIterator.hasNext()) {

    SelectionKey handle = handleIterator.next();

    if (handle.isAcceptable()) {

    EventHandler handler =

    registeredHandlers.get(SelectionKey.OP_ACCEPT);

    handler.handleEvent(handle);

    // 需要将此次事件key移除,避免重复处理

    handleIterator.remove();

    }

    if (handle.isReadable()) {

    EventHandler handler =

    registeredHandlers.get(SelectionKey.OP_READ);

    handler.handleEvent(handle);

    handleIterator.remove();

    }

    if (handle.isWritable()) {

    EventHandler handler =

    registeredHandlers.get(SelectionKey.OP_WRITE);

    handler.handleEvent(handle);

    handleIterator.remove();

    }

    }

    }

    我们在handleEvent里处理事件,并且注册新的关心的事件,比如在AcceptEventHandler处理完后,注册SelectionKey.OP_READ,将事件抛给下一个handler。并可以在

    public void handleEvent(SelectionKey handle) throws Exception {

    System.out.println("===== Accept Event Handler =====");

    ServerSocketChannel serverSocketChannel =

    (ServerSocketChannel) handle.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();

    if (socketChannel != null) {

    socketChannel.configureBlocking(false);

    socketChannel.register(

    demultiplexer, SelectionKey.OP_READ);

    }

    }

    我们看到上面轮询的代码其实是单线程的每一个handler的handleEvent都是在Reactor这个一个里的,为什么不做成多线程呢?

    NIO由原来的BIO的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。

    我们看read事件的handler,为了提高效率,业务代码开线程,将io处理与业务代码处理分离。

    public void handleEvent(SelectionKey handle) throws Exception {

    System.out.println("===== Read Event Handler =====");

    SocketChannel socketChannel =

    (SocketChannel) handle.channel();

    // 从socket读取数据

    byte[] buffer=read(socketChannel);

    // 此处执行业务操作,最好单线程或者多线程,将io处理与业务代码处理分离

    doBusiness(buffer);

    // Rewind the buffer to start reading from the beginning

    // Register the interest for writable readiness event for

    // this channel in order to echo back the message

    socketChannel.register(

    demultiplexer, SelectionKey.OP_WRITE, inputBuffer);

    }

    BIO与NIO的对比

    美团技术博客--Java NIO浅析

    关键差别:

    BIO accept,read,write 都是阻塞的。

    NIO select 阻塞,read,write是非阻塞的。

    bio 经典模型

    {

    ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池

    ServerSocket serverSocket = new ServerSocket();

    serverSocket.bind(8088);

    while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来

    Socket socket = serverSocket.accept();//阻塞

    //为新的连接创建新的线程

    executor.submit(new ConnectIOnHandler(socket));

    }

    class ConnectIOnHandler extends Thread{

    private Socket socket;

    public ConnectIOnHandler(Socket socket){

    this.socket = socket;

    }

    public void run(){

    while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件

    String someThing = socket.read()....//读取数据

    if(someThing!=null){

    ......//处理数据

    socket.write()....//写数据

    }

    }

    }

    }

    这是一个经典的每连接每线程的模型,之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里(比如阻塞在read,那么就无法accept到其他请求了);但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。

    NIO reactor模型

    如上设计和分析:

    由于read,write的非阻塞,所以可以不用多线程,并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。

    单线程处理I/O的效率确实非常高,没有线程切换,只是拼命的读、写、选择事件。但现在的服务器,一般都是多核处理器,如果能够利用多核心进行I/O,无疑对效率会有更大的提高。

    限制:

    Java的Selector对于Linux系统来说,有一个致命限制:同一个channel的select不能被并发的调用。因此,如果有多个I/O线程,必须保证:一个socket只能属于一个IoThread,而一个IoThread可以管理多个socket。

    展开全文
  • java-simple-reactor 目的 理解netty中的reactor模型细节 reactor相关的概念 1.什么是阻塞和非阻塞 同步和异步是针对应用程序和内核的交互而言的,同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否...
  • ChannelPool 设计实现解读 回到Connection的设定这节最初,我们主要是通过 ChannelPool 来解决与多个服务端交互以及与单 个服务端建立多个连接的问题。那么这里就来对 ChannelPool 其中的设计与实现进行探索一番。...
  • 1 RingBufferDispather  机制: ... 使用RingBuffer作为任务的存放容器,等待策略使用阻塞模式。... 由于采用单线程执行,因此任务的执行必须快,不能阻塞,否则当大批量的...reactor的消费者有阻塞操作。

    1 RingBufferDispather

       机制:

       使用RingBuffer作为任务的存放容器,等待策略使用阻塞模式。

       使用单线程执行任务。

       使用场景:

             由于采用单线程执行,因此任务的执行必须快,不能阻塞,否则当大批量的任务提交时,必然阻塞提交任务的线程。

            BlockingWaitStrategy策略机制采用ReentrantLock重入锁和显示条件队进行等待。当大量的任务阻塞提交时,必然导致线程的上线文切换频繁,导致过高的cpu。

         

      避免使用场景:

          生产者生产的速度远高于消费者消费的速度。reactor的消费者有阻塞操作。

    展开全文
  • Reactor 3 参考文档.pdf

    2019-12-10 12:58:57
    Reactor 3参考文档,reactor 3是一个围绕Reactive Streams规范构建的库,它在JVM上引入了响应式编程的一个范例。目前Spring5 引入的Webflux就是reactor 3实现的一个响应式web框架
  • 1. 前言最近写关于响应式编程的东西有点多,不少同窗反映对Flux和Mono这两个Reactor中的概念有点懵逼。可是目前Java响应式编程中咱们对这两个对象的接触又最多,诸如Spring WebFlux、RSocket、R2DBC。我开始也对这两...

    dcc4226873bebfac9e4a06cd74eafb0a.png

    1. 前言

    最近写关于响应式编程的东西有点多,不少同窗反映对Flux和Mono这两个Reactor中的概念有点懵逼。可是目前Java响应式编程中咱们对这两个对象的接触又最多,诸如Spring WebFlux、RSocket、R2DBC。我开始也对这两个对象头疼,因此今天咱们就简单来探讨一下它们。html

    2. 响应流的特色

    要搞清楚这两个概念,必须说一下响应流规范。它是响应式编程的基石。他具备如下特色:java

    响应流必须是无阻塞的。

    响应流必须是一个数据流。

    它必须能够异步执行。

    而且它也应该可以处理背压。

    背压是反应流中的一个重要概念,能够理解为,生产者能够感觉到消费者反馈的消费压力,并根据压力进行动态调整生产速率。形象点能够按照下面理解:react

    c809bb55da38d44ee24d904f6f415b24.png

    3. Publisher

    因为响应流的特色,咱们不能再返回一个简单的POJO对象来表示结果了。必须返回一个相似Java中的Future的概念,在有结果可用时通知消费者进行消费响应。编程

    Reactive Stream规范中这种被定义为Publisher ,Publisher是一个能够提供0-N个序列元素的提供者,并根据其订阅者Subscriber super T>的需求推送元素。一个Publisher能够支持多个订阅者,并能够根据订阅者的逻辑进行推送序列元素。下面这个Excel计算就能说明一些Publisher的特色。api

    7666ad4c2dbcd25f4b1176d9a3156788.gif

    A1-A9就能够看作Publisher及其提供的元素序列。A10-A13分别是求和函数SUM(A1:A9)、平均函数AVERAGE(A1:A9)、最大值函数MAX(A1:A9)、最小值函数MIN(A1:A9),能够看做订阅者Subscriber。假如说咱们没有A10-A13,那么A1-A9就没有实际意义,它们并不产生计算。这也是响应式的一个重要特色:当没有订阅时发布者什么也不作。异步

    而Flux和Mono都是Publisher在Reactor 3实现。Publisher提供了subscribe方法,容许消费者在有结果可用时进行消费。若是没有消费者Publisher不会作任何事情,他根据消费状况进行响应。 Publisher可能返回零或者多个,甚至多是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono和Flux。函数

    4. Flux

    Flux 是一个发出(emit)0-N个元素组成的异步序列的Publisher,能够被onComplete信号或者onError信号所终止。在响应流规范中存在三种给下游消费者调用的方法 onNext, onComplete, 和onError。下面这张图表示了Flux的抽象模型:翻译

    dbe2c6bb530984bab31bfc7d03f39d21.png

    以上的的讲解对于初次接触反应式编程的依然是难以理解的,因此这里有一个按部就班的理解过程。3d

    有些类比并非很稳当,可是对于你按部就班的理解这些新概念仍是有帮助的。code

    传统数据处理

    咱们在日常是这么写的:

    public List allUsers() {

    return Arrays.asList(new ClientUser("felord.cn", "reactive"),

    new ClientUser("Felordcn", "Reactor"));

    }

    咱们经过迭代返回值List来get这些元素进行再处理(消费),这种方式有点相似厨师作了不少菜,吃不吃在于食客。须要食客主动去来吃就好了(pull的方式),至于喜欢吃什么不喜欢吃什么本身随意,怎么吃也本身随意。

    流式数据处理

    在Java 8中咱们能够改写为流的表示:

    public Stream allUsers() {

    return Stream.of(new ClientUser("felord.cn", "reactive"),

    new ClientUser("Felordcn", "Reactor"));

    }

    依然是厨师作了不少菜,可是这种就更加高级了一些,提供了菜品的搭配方式(不包含具体细节),食客能够按照说明根据本身的习惯搭配着去吃,一但开始概不退换,吃完为止,过时不候。

    反应式数据处理

    在Reactor中咱们又能够改写为Flux表示:

    public Flux allUsers(){

    return Flux.just(new ClientUser("felord.cn", "reactive"),

    new ClientUser("Felordcn", "Reactor"));

    }

    这时候食客只须要订餐就好了,作好了天然就呈上来,并且能够随时根据食客的饭量进行调整。若是没有食客订餐那么厨师就什么都不用作。固然不止有这么点特性,不过对于方便咱们理解来讲这就够了。

    5. Mono

    Mono 是一个发出(emit)0-1个元素的Publisher,能够被onComplete信号或者onError信号所终止。

    c02409d0c4a4c31d2a5d3673f29c9636.png

    这里就不翻译了,总体和Flux差很少,只不过这里只会发出0-1个元素。也就是说不是有就是没有。象Flux同样,咱们来看看Mono的演化过程以帮助理解。

    传统数据处理

    public ClientUser currentUser () {

    return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;

    }

    直接返回符合条件的对象或者null。

    Optional的处理方式

    public Optional currentUser () {

    return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))

    : Optional.empty();

    }

    这个Optional我以为就有反应式的那种味儿了,固然它并非反应式。当咱们不从返回值Optional取其中具体的对象时,咱们不清楚里面到底有没有,可是Optional是必定客观存在的,不会出现NPE问题。

    反应式数据处理

    public Mono currentUser () {

    return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))

    : Mono.empty();

    }

    和Optional有点相似的机制,固然Mono不是为了解决NPE问题的,它是为了处理响应流中单个值(也多是Void)而存在的。

    6. 总结

    Flux和Mono是Java反应式中的重要概念,可是不少同窗包括我在开始都难以理解它们。这实际上是规定了两种流式范式,这种范式让数据具备一些新的特性,好比基于发布订阅的事件驱动,异步流、背压等等。另外数据是推送(Push)给消费者的以区别于平时咱们的拉(Pull)模式。同时咱们能够像Stream Api同样使用相似map、flatmap等操做符(operator)来操做它们。对Flux和Mono这两个概念须要花一些时间去理解它们,不能操之过急。若是你对个人这种见解有不一样的观点能够留言讨论,多多关注:码农小胖哥 获取更多干货知识。

    关注公众号:Felordcn 获取更多资讯

    展开全文
  • Reactor模式详解原文 前记 第一次听到Reactor模式是三年前的某个晚上,一个室友突然跑过来问我什么是Reactor模式?我上网查了一下,很多人都是给出NIO中的 Selector的例子,而且就是NIO里Selector多路复用模型,...
  • 回顾 Reactor 模式

    2021-03-08 21:32:53
    Reactor作为网络编程库的核心模式的 Reactor 模式是网络编程中的最常用的模式,反应器 Reactor 又名分派器 Dispatcher, 或通知器 Notifier, 重温一下 POSA2 是对这个模式的描述语境An event-driven application that...
  • 1. 前言最近写对于响应式编程的货色有点多,很多同学反映对Flux和Mono这两个Reactor中的概念有点懵逼。然而目前Java响应式编程中咱们对这两个对象的接触又最多,诸如Spring WebFlux、RSocket、R2DBC。我开始也对这两...
  • Java中的Reactor是什么

    2021-02-28 07:51:51
    Java中的Reactor是什么发布时间:2020-06-28 10:14:43来源:亿速云阅读:212作者:Leah今天就跟大家聊聊有关Java中的Reactor,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这...
  • 1.Reactor模式介绍Reactor模式是事件驱动模型,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。...
  • JavaNIO与Reactor模式

    2021-02-12 23:06:36
    一、NIO的简单介绍Java NIO(New IO)是一个可以替代标准Java IO API的IO API(从Java 1.4开始),Java NIO提供了与标准IO不同的IO工作方式。NIO中的核心内容有Channel、Buffer、Selector,其他组件如Pipe和FileLock只...
  • Reactor 3 参考文档

    2019-12-11 14:39:24
    Reactor 3 参考文档
  • 以前用C写的evnet,类似于libev的NIO Reactor框架,主要用于Android、IOS的游戏网络模块。随着它的完善,也开始用于服务端的服务编写。对于一些对资源要求苛刻的服务,用它来写,非常的轻量级,并且已经比较稳定了。...
  • Java高并发教程:Reactor反应器模式Reactor反应器模式到目前为止,高性能网络编程都绕不开反应器模式。很多著名的服务器软件或者中间件都是基于反应器模式实现的,如Nginx、Redis、Netty。反应器模式是高性能网络...
  • 标签:Reactor 模式简单实现在网上有部分文章在描述Netty时,会提到Reactor。这个Reactor到底是什么呢?为了搞清楚Reactor到底是什么鬼,我写了一个简单的Demo,来帮助大家理解他。网上是这么描述Reactor的:The ...
  • 单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的reactor模式引入线程池的概念,把耗时的IO操作交由线程池处理,处理完了之后再同步到selectionkey中,服务器架构图如下上文(reactor模式:单...
  • reactor设计模式

    2021-03-08 21:32:00
    reactor介绍reactor的工作模式就像它的名字一样,是一种反射模式,当事件发生时,根据发生的事件调用注册的处理器。Reactor的优点和应用Reactor最常用于非阻塞的socket传统的设计是一种同步的停等协议,读写操作执行...
  • Java的高性能IO——Reactor模式

    千次阅读 2018-12-29 14:50:08
    Reactor是一种处理客户端和服务端网络通信的IO模式。在Netty中被使用。 传统Java的网络通信模式: (1)BIO(同步阻塞IO):一个acceptor线程负责监听客户端的连接,一个请求一个应答,缺乏弹性伸缩能力。 (2)伪...
  • 1. Reactor简介Reactor 是 Spring 社区发布的基于事件驱动的异步框架,不仅解耦了程序之间的强调用关系,而且有效提升了系统的多线程并发处理能力。2. Spring Boot集成Reactor的pom.xml1 2 3 xsi:schemaLocation=...
  • JAVA IO模型演进及Reactor模式

    千次阅读 2018-06-19 18:39:21
    a、采用多个Reactor,每个Reactor在自己单独线程中执行,可以并行响应多个客户端的请求事件; b、Netty采用类似这种模式,boss线程池就是多个mainReactor,worker线程池就是多个subReactor。 注:以上内容参照...
  • java SpringBoot Reactor整合

    千次阅读 2020-06-20 15:33:28
    SpringBoot——Spring Reactor Web https://blog.csdn.net/No_Game_No_Life_/article/details/101450810 为了应对高并发的服务器端开发,在2009年的时候,微软提出了一个更优雅地实现异步编程的方式 —— R...
  • spring reactor 事件异步驱动. 使用了spring的mvc 使用了spring的 @Configuration annotation配置没有写xml文件.步骤1:声明 Reactor bean@Bean(name="reactor")public Reactor getReactor(){return new Reactor();}2...
  • 尽管 Reactive Streams 规范并未指定任何运算符(Operators),但 Reactor 的核心价值之一就是提供了丰富的运算符。从简单的转换、过滤到复杂的编排和错误处理,涉及方方面面。 推荐通过参考文档而不是 JavaDoc 来...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 31,450
精华内容 12,580
关键字:

javareactor

java 订阅