精华内容
下载资源
问答
  • reactor

    2020-09-02 15:43:32
    在学习muduo之前,有必要先搞清楚reactor模式的含义。我觉得有一篇文章写的特别好,我就不复制粘贴了。Reactor模式详解。这边文章过后,我觉得基本能够理解reactor的基本流程了,以及工作线程池和Reactor线程池的...

    在学习muduo之前,有必要先搞清楚reactor模式的含义。我觉得有一篇文章写的特别好,我就不复制粘贴了。Reactor模式详解。这边文章过后,我觉得基本能够理解reactor的基本流程了,以及工作线程池和Reactor线程池的作用了,有助于接下来理解muduo源码。

    展开全文
  • Reactor

    2020-01-15 18:13:32
    Reactor模式 疯狂创客圈,一个Java 高并发研习社群【博客园 总入口】 疯狂创客圈,倾力推出:面试必备 + 面试必备 + 面试必备的基础原理+实战 书籍 《Netty Zookeeper Redis 高并发实战》 写在前面 ​大家...

    Reactor模式

     

    疯狂创客圈,一个Java 高并发研习社群 【博客园 总入口 】

    疯狂创客圈,倾力推出:面试必备 + 面试必备 + 面试必备 的基础原理+实战 书籍 《Netty Zookeeper Redis 高并发实战

     


    写在前面 

     

     大家好,我是 高并发的实战社群【疯狂创客圈】尼恩。Reactor模式非常重要,无论开发、还是面试。

    本文的内容,在《Netty Zookeeper Redis 高并发实战》一书时,进行内容的完善和更新,并且进行的源码的升级。 博客和书不一样,书更加层层升入、层次分明,请大家以书的内容为准。 具体请参考书的第四章 —— 鼎鼎大名的Reactor反应器模式 。

     

    基础篇:netty源码  死磕3-

    传说中神一样的Reactor反应器模式

    本文目录

    1. 为什么是Reactor模式
    2. Reactor模式简介
    3. 多线程IO的致命缺陷
    4. 单线程Reactor模型
    4.1. 什么是单线程Reactor呢?
    4.2. 单线程Reactor的参考代码
    4.3. 单线程模式的缺点:
    5. 多线程的Reactor
    5.1. 基于线程池的改进
    5.2. 改进后的完整示意图
    5.3. 多线程Reactor的参考代码
    6. Reactor持续改进
    7. Reactor编程的优点和缺点
    7.1. 优点
    7.2. 缺点

     

    1. 为什么是Reactor模式

    写多了代码的兄弟们都知道,JAVA代码由于到处面向接口及高度抽象,用到继承多态和设计模式,程序的组织不是按照正常的理解顺序来的,对代码跟踪很是个问题。所以,在阅读别人的源码时,如果不了解代码的组织方式,往往是晕头转向,不知在何处。尤其是阅读经典代码的时候,更是如此。

    反过来,如果先了解代码的设计模式,再来去代码,就会阅读的很轻松,不会那么难懂。

    像netty这样的精品中的极品,肯定也是需要先从设计模式入手的。netty的整体架构,基于了一个著名的模式——Reactor模式。Reactor模式,是高性能网络编程的必知必会模式。

    首先熟悉Reactor模式,一定是磨刀不误砍柴工。

    2. Reactor模式简介

    Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,本文站在巨人的肩膀上,借助 Doug Lea(就是那位让人无限景仰的大爷)的“Scalable IO in Java”中讲述的Reactor模式。

    “Scalable IO in Java”的地址是:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

    Reactor模式也叫反应器模式,大多数IO相关组件如Netty、Redis在使用的IO模式,为什么需要这种模式,它是如何设计来解决高性能并发的呢?

    3. 多线程IO的致命缺陷

    最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:

    复制代码

    while(true){
    
    socket = accept();
    
    handle(socket)
    
    }
    

    复制代码

    这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。

    之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:

    复制代码

    package com.crazymakercircle.iodemo.base;
    
    import com.crazymakercircle.config.SystemConfig;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    class BasicModel implements Runnable {
        public void run() {
            try {
                ServerSocket ss =
                        new ServerSocket(SystemConfig.SOCKET_SERVER_PORT);
                while (!Thread.interrupted())
                    new Thread(new Handler(ss.accept())).start();
                //创建新线程来handle
                // or, single-threaded, or a thread pool
            } catch (IOException ex) { /* ... */ }
        }
    
        static class Handler implements Runnable {
            final Socket socket;
            Handler(Socket s) { socket = s; }
            public void run() {
                try {
                    byte[] input = new byte[SystemConfig.INPUT_SIZE];
                    socket.getInputStream().read(input);
                    byte[] output = process(input);
                    socket.getOutputStream().write(output);
                } catch (IOException ex) { /* ... */ }
            }
            private byte[] process(byte[] input) {
                byte[] output=null;
                /* ... */
                return output;
            }
        }
    }

    复制代码

    对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。

    tomcat服务器的早期版本确实是这样实现的。

    多线程并发模式,一个连接一个线程的优点是:

    一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。

    多线程并发模式,一个连接一个线程的缺点是:

    缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。

    改进方法是:

    采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。

    4. 单线程Reactor模型

     

    Reactor模型的朴素原型

    Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型。

    复制代码

     static class Server
        {
    
            public static void testServer() throws IOException
            {
    
                // 1、获取Selector选择器
                Selector selector = Selector.open();
    
                // 2、获取通道
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                // 3.设置为非阻塞
                serverSocketChannel.configureBlocking(false);
                // 4、绑定连接
                serverSocketChannel.bind(new InetSocketAddress(SystemConfig.SOCKET_SERVER_PORT));
    
                // 5、将通道注册到选择器上,并注册的操作为:“接收”操作
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                // 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
                while (selector.select() > 0)
                {
                    // 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
                    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                    while (selectedKeys.hasNext())
                    {
                        // 8、获取“准备就绪”的时间
                        SelectionKey selectedKey = selectedKeys.next();
    
                        // 9、判断key是具体的什么事件
                        if (selectedKey.isAcceptable())
                        {
                            // 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            // 11、切换为非阻塞模式
                            socketChannel.configureBlocking(false);
                            // 12、将该通道注册到selector选择器上
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        else if (selectedKey.isReadable())
                        {
                            // 13、获取该选择器上的“读就绪”状态的通道
                            SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
    
                            // 14、读取数据
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int length = 0;
                            while ((length = socketChannel.read(byteBuffer)) != -1)
                            {
                                byteBuffer.flip();
                                System.out.println(new String(byteBuffer.array(), 0, length));
                                byteBuffer.clear();
                            }
                            socketChannel.close();
                        }
    
                        // 15、移除选择键
                        selectedKeys.remove();
                    }
                }
    
                // 7、关闭连接
                serverSocketChannel.close();
            }
    
            public static void main(String[] args) throws IOException
            {
                testServer();
            }
        }

    复制代码

     

    实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:

    (1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。

    (2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。

     

    4.1. 什么是单线程Reactor呢?

     

    如下图所示:

    wpsC334.tmp

    这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。

    下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多。Reactor和Hander 处于一条线程执行。

    wpsC345.tmp

    顺便说一下,可以将上图的accepter,看做是一种特殊的handler。

     

    4.2. 单线程Reactor的参考代码

    “Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:

     

    复制代码

    package com.crazymakercircle.ReactorModel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    class Reactor implements Runnable
    {
        final Selector selector;
        final ServerSocketChannel serverSocket;
    
        Reactor(int port) throws IOException
        { //Reactor初始化
            selector = Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(port));
            //非阻塞
            serverSocket.configureBlocking(false);
    
            //分步处理,第一步,接收accept事件
            SelectionKey sk =
                    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            //attach callback object, Acceptor
            sk.attach(new Acceptor());
        }
    
        public void run()
        {
            try
            {
                while (!Thread.interrupted())
                {
                    selector.select();
                    Set selected = selector.selectedKeys();
                    Iterator it = selected.iterator();
                    while (it.hasNext())
                    {
                        //Reactor负责dispatch收到的事件
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }
            } catch (IOException ex)
            { /* ... */ }
        }
    
        void dispatch(SelectionKey k)
        {
            Runnable r = (Runnable) (k.attachment());
            //调用之前注册的callback对象
            if (r != null)
            {
                r.run();
            }
        }
    
        // inner class
        class Acceptor implements Runnable
        {
            public void run()
            {
                try
                {
                    SocketChannel channel = serverSocket.accept();
                    if (channel != null)
                        new Handler(selector, channel);
                } catch (IOException ex)
                { /* ... */ }
            }
        }
    }
    

    复制代码

     

    Handler的代码如下:

     

    复制代码

    package com.crazymakercircle.ReactorModel;
    
    
    import com.crazymakercircle.config.SystemConfig;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    class Handler implements Runnable
    {
        final SocketChannel channel;
        final SelectionKey sk;
        ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
        ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
        Handler(Selector selector, SocketChannel c) throws IOException
        {
            channel = c;
            c.configureBlocking(false);
            // Optionally try first read now
            sk = channel.register(selector, 0);
    
            //将Handler作为callback对象
            sk.attach(this);
    
            //第二步,注册Read就绪事件
            sk.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        boolean inputIsComplete()
        {
            /* ... */
            return false;
        }
    
        boolean outputIsComplete()
        {
    
            /* ... */
            return false;
        }
    
        void process()
        {
            /* ... */
            return;
        }
    
        public void run()
        {
            try
            {
                if (state == READING)
                {
                    read();
                }
                else if (state == SENDING)
                {
                    send();
                }
            } catch (IOException ex)
            { /* ... */ }
        }
    
        void read() throws IOException
        {
            channel.read(input);
            if (inputIsComplete())
            {
    
                process();
    
                state = SENDING;
                // Normally also do first write now
    
                //第三步,接收write就绪事件
                sk.interestOps(SelectionKey.OP_WRITE);
            }
        }
    
        void send() throws IOException
        {
            channel.write(output);
    
            //write完就结束了, 关闭select key
            if (outputIsComplete())
            {
                sk.cancel();
            }
        }
    }
    
    

    复制代码

     

    这两段代码,是建立在JAVA NIO的基础上的,这两段代码建议一定要看懂。可以在IDE中去看源码,这样直观感觉更佳。

    如果对NIO的Seletor不完全了解,影响到上面的代码阅读,请阅读疯狂创客圈的Java NIO死磕 文章。

     

    4.3. 单线程模式的缺点:

    1、 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。

    2、因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

     

    5. 多线程的Reactor

    5.1. 基于线程池的改进

    在线程Reactor模式基础上,做如下改进:

    (1)将Handler处理器的执行放入线程池,多线程进行业务处理。

    (2)而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

    一个简单的图如下:

    image

    5.2. 改进后的完整示意图

    下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。

    wpsC376.tmp

     

    5.3. 多线程Reactor的参考代码

    “Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:

     

    复制代码

    package com.crazymakercircle.ReactorModel;
    
    
    import com.crazymakercircle.config.SystemConfig;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    class MthreadHandler implements Runnable
    {
        final SocketChannel channel;
        final SelectionKey selectionKey;
        ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE);
        ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
    
        ExecutorService pool = Executors.newFixedThreadPool(2);
        static final int PROCESSING = 3;
    
        MthreadHandler(Selector selector, SocketChannel c) throws IOException
        {
            channel = c;
            c.configureBlocking(false);
            // Optionally try first read now
            selectionKey = channel.register(selector, 0);
    
            //将Handler作为callback对象
            selectionKey.attach(this);
    
            //第二步,注册Read就绪事件
            selectionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        boolean inputIsComplete()
        {
           /* ... */
            return false;
        }
    
        boolean outputIsComplete()
        {
    
           /* ... */
            return false;
        }
    
        void process()
        {
           /* ... */
            return;
        }
    
        public void run()
        {
            try
            {
                if (state == READING)
                {
                    read();
                }
                else if (state == SENDING)
                {
                    send();
                }
            } catch (IOException ex)
            { /* ... */ }
        }
    
    
        synchronized void read() throws IOException
        {
            // ...
            channel.read(input);
            if (inputIsComplete())
            {
                state = PROCESSING;
                //使用线程pool异步执行
                pool.execute(new Processer());
            }
        }
    
        void send() throws IOException
        {
            channel.write(output);
    
            //write完就结束了, 关闭select key
            if (outputIsComplete())
            {
                selectionKey.cancel();
            }
        }
    
        synchronized void processAndHandOff()
        {
            process();
            state = SENDING;
            // or rebind attachment
            //process完,开始等待write就绪
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    
        class Processer implements Runnable
        {
            public void run()
            {
                processAndHandOff();
            }
        }
    
    }
    
    

    复制代码

     

    Reactor 类没有大的变化,参考前面的代码。

    6. Reactor持续改进

    对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:

     

    复制代码

    package com.crazymakercircle.ReactorModel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    class MthreadReactor implements Runnable
    {
    
        //subReactors集合, 一个selector代表一个subReactor
        Selector[] selectors=new Selector[2];
        int next = 0;
        final ServerSocketChannel serverSocket;
    
        MthreadReactor(int port) throws IOException
        { //Reactor初始化
            selectors[0]=Selector.open();
            selectors[1]= Selector.open();
            serverSocket = ServerSocketChannel.open();
            serverSocket.socket().bind(new InetSocketAddress(port));
            //非阻塞
            serverSocket.configureBlocking(false);
    
    
            //分步处理,第一步,接收accept事件
            SelectionKey sk =
                    serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
            //attach callback object, Acceptor
            sk.attach(new Acceptor());
        }
    
        public void run()
        {
            try
            {
                while (!Thread.interrupted())
                {
                    for (int i = 0; i <2 ; i++)
                    {
                        selectors[i].select();
                        Set selected =  selectors[i].selectedKeys();
                        Iterator it = selected.iterator();
                        while (it.hasNext())
                        {
                            //Reactor负责dispatch收到的事件
                            dispatch((SelectionKey) (it.next()));
                        }
                        selected.clear();
                    }
    
                }
            } catch (IOException ex)
            { /* ... */ }
        }
    
        void dispatch(SelectionKey k)
        {
            Runnable r = (Runnable) (k.attachment());
            //调用之前注册的callback对象
            if (r != null)
            {
                r.run();
            }
        }
    
    
        class Acceptor { // ...
            public synchronized void run() throws IOException
            {
                SocketChannel connection =
                        serverSocket.accept(); //主selector负责accept
                if (connection != null)
                {
                    new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection
                }
                if (++next == selectors.length) next = 0;
            }
        }
    }
    

    复制代码

     

    7. Reactor编程的优点和缺点

    6.1. 优点

    1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;

    2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;

    3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;

    4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

    6.2. 缺点

    1)相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。

    2)Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。

    3) Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。

     

    在开启Netty源码前,上面的经典代码,一定要看懂哦!

     

     

    展开全文
  • Reactor详解

    万次阅读 多人点赞 2019-05-06 18:27:32
    reactor 是什么 为何要用,能解决什么问题 如何用,更好的方式 其他事件处理模式 一、Reactor 是什么 关于reactor 是什么,我们先从wiki上看下: The reactor design pattern is an event ...
    1. reactor 是什么

    2. 为何要用,能解决什么问题

    3. 如何用,更好的方式

    4. 其他事件处理模式

    一、Reactor 是什么

    关于reactor 是什么,我们先从wiki上看下:

    The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

    从上述文字中我们可以看出以下关键点 :

    1. 事件驱动(event handling)

    2. 可以处理一个或多个输入源(one or more inputs)

    3. 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理

    自POSA2 中的关于Reactor Pattern 介绍中,我们了解了Reactor 的处理方式:

    1. 同步的等待多个事件源到达(采用select()实现)

    2. 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)

    3. 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler)

     

    关于Reactor Pattern 的OMT 类图设计:

     

    二、为何要用Reactor

    常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。

    1、BIO

    比如我们采用BIO的方式来维护和客户端的连接:

    // 主线程维护连接
      public void run() {
          try {
              while (true) {
                  Socket socket = serverSocket.accept();
                  //提交线程池处理
                  executorService.submit(new Handler(socket));
              }
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
    ​
      // 处理读写服务
      class Handler implements Runnable {
          public void run() {
              try {
                  //获取Socket的输入流,接收数据
                  BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                  String readData = buf.readLine();
                  while (readData != null) {
                      readData = buf.readLine();
                      System.out.println(readData);
                  }
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }

     

    很明显,为了避免资源耗尽,我们采用线程池的方式来处理读写服务。但是这么做依然有很明显的弊端:

    1. 同步阻塞IO,读写阻塞,线程等待时间过长

    2. 在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。

    3. 多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展

    4. 状态数据以及其他需要保持一致的数据,需要采用并发同步控制

    2、NIO

    那么可以有其他方式来更好的处理么,我们可以采用NIO来处理,NIO中支持的基本机制:

    1. 非阻塞的IO读写

    2. 基于IO事件进行分发任务,同时支持对多个fd的监听

    我们看下NIO 中实现相关方式:

    public NIOServer(int port) throws Exception {
          selector = Selector.open();
          serverSocket = ServerSocketChannel.open();
          serverSocket.socket().bind(new InetSocketAddress(port));
          serverSocket.configureBlocking(false);
          serverSocket.register(selector, SelectionKey.OP_ACCEPT);
      }
    ​
      @Override
      public void run() {
          while (!Thread.interrupted()) {
              try {
                  //阻塞等待事件
                  selector.select();
                  // 事件列表
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分发事件
                      dispatch((SelectionKey) (it.next()));
                  }
              } catch (Exception e) {
    ​
              }
          }
      }
    ​
      private void dispatch(SelectionKey key) throws Exception {
          if (key.isAcceptable()) {
              register(key);//新链接建立,注册
          } else if (key.isReadable()) {
              read(key);//读事件处理
          } else if (key.isWritable()) {
              wirete(key);//写事件处理
          }
      }
    ​
      private void register(SelectionKey key) throws Exception {
          ServerSocketChannel server = (ServerSocketChannel) key
                  .channel();
          // 获得和客户端连接的通道
          SocketChannel channel = server.accept();
          channel.configureBlocking(false);
          //客户端通道注册到selector 上
          channel.register(this.selector, SelectionKey.OP_READ);
      }

     

    我们可以看到上述的NIO例子已经差不多拥有reactor的影子了

    1. 基于事件驱动-> selector(支持对多个socketChannel的监听)

    2. 统一的事件分派中心-> dispatch

    3. 事件处理服务-> read & write

     

    事实上NIO已经解决了上述BIO暴露的1&2问题了,服务器的并发客户端有了量的提升,不再受限于一个客户端一个线程来处理,而是一个线程可以维护多个客户端(selector 支持对多个socketChannel 监听)。

    但这依然不是一个完善的Reactor Pattern ,首先Reactor 是一种设计模式,好的模式应该是支持更好的扩展性,显然以上的并不支持,另外好的Reactor Pattern 必须有以下特点:

    1. 更少的资源利用,通常不需要一个客户端一个线程

    2. 更少的开销,更少的上下文切换以及locking

    3. 能够跟踪服务器状态

    4. 能够管理handler 对event的绑定

    那么好的Reactor Pattern应该是怎样的?

    三、Reactor

    在应用Java NIO构建Reactor Pattern中,大神 Doug Lea(让人无限景仰的java 大神)在“Scalable IO in Java”中给了很好的阐述。我们采用大神介绍的3种Reactor 来分别介绍。

    首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:

    • Reactor 将I/O事件分派给对应的Handler

    • Acceptor 处理客户端新连接,并分派请求到处理器链中

    • Handlers 执行非阻塞读/写 任务

     

    1、单Reactor单线程模型

    我们看代码的实现方式:

      /**
        * 等待事件到来,分发事件处理
        */
      class Reactor implements Runnable {
    ​
          private Reactor() throws Exception {
    ​
              SelectionKey sk =
                      serverSocket.register(selector,
                              SelectionKey.OP_ACCEPT);
              // attach Acceptor 处理新连接
              sk.attach(new Acceptor());
          }
    ​
          public void run() {
              try {
                  while (!Thread.interrupted()) {
                      selector.select();
                      Set selected = selector.selectedKeys();
                      Iterator it = selected.iterator();
                      while (it.hasNext()) {
                          it.remove();
                          //分发事件处理
                          dispatch((SelectionKey) (it.next()));
                      }
                  }
              } catch (IOException ex) {
                  //do something
              }
          }
    ​
          void dispatch(SelectionKey k) {
              // 若是连接事件获取是acceptor
              // 若是IO读写事件获取是handler
              Runnable runnable = (Runnable) (k.attachment());
              if (runnable != null) {
                  runnable.run();
              }
          }
    ​
      }
      
      /**
        * 连接事件就绪,处理连接事件
        */
      class Acceptor implements Runnable {
          @Override
          public void run() {
              try {
                  SocketChannel c = serverSocket.accept();
                  if (c != null) {// 注册读写
                      new Handler(c, selector);
                  }
              } catch (Exception e) {
    ​
              }
          }
      }

    这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理。

    Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于racotor 分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,有Reactor分发)。

    该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。

     

    2、单Reactor多线程模型

    相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

    我们看下实现方式:

      /**
        * 多线程处理读写业务逻辑
        */
      class MultiThreadHandler implements Runnable {
          public static final int READING = 0, WRITING = 1;
          int state;
          final SocketChannel socket;
          final SelectionKey sk;
    ​
          //多线程处理业务逻辑
          ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    ​
    ​
          public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
              this.state = READING;
              this.socket = socket;
              sk = socket.register(selector, SelectionKey.OP_READ);
              sk.attach(this);
              socket.configureBlocking(false);
          }
    ​
          @Override
          public void run() {
              if (state == READING) {
                  read();
              } else if (state == WRITING) {
                  write();
              }
          }
    ​
          private void read() {
              //任务异步处理
              executorService.submit(() -> process());
    ​
              //下一步处理写事件
              sk.interestOps(SelectionKey.OP_WRITE);
              this.state = WRITING;
          }
    ​
          private void write() {
              //任务异步处理
              executorService.submit(() -> process());
    ​
              //下一步处理读事件
              sk.interestOps(SelectionKey.OP_READ);
              this.state = READING;
          }
    ​
          /**
            * task 业务处理
            */
          public void process() {
              //do IO ,task,queue something
          }
      }

     

    3、多Reactor多线程模型

     

    第三种模型比起第二种模型,是将Reactor分成两部分,

    1. mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。

    2. subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。

     

    我们看下实现方式:

      /**
        * 多work 连接事件Acceptor,处理连接事件
        */
      class MultiWorkThreadAcceptor implements Runnable {
    ​
          // cpu线程数相同多work线程
          int workCount =Runtime.getRuntime().availableProcessors();
          SubReactor[] workThreadHandlers = new SubReactor[workCount];
          volatile int nextHandler = 0;
    ​
          public MultiWorkThreadAcceptor() {
              this.init();
          }
    ​
          public void init() {
              nextHandler = 0;
              for (int i = 0; i < workThreadHandlers.length; i++) {
                  try {
                      workThreadHandlers[i] = new SubReactor();
                  } catch (Exception e) {
                  }
    ​
              }
          }
    ​
          @Override
          public void run() {
              try {
                  SocketChannel c = serverSocket.accept();
                  if (c != null) {// 注册读写
                      synchronized (c) {
                          // 顺序获取SubReactor,然后注册channel 
                          SubReactor work = workThreadHandlers[nextHandler];
                          work.registerChannel(c);
                          nextHandler++;
                          if (nextHandler >= workThreadHandlers.length) {
                              nextHandler = 0;
                          }
                      }
                  }
              } catch (Exception e) {
              }
          }
      }
     
      /**
        * 多work线程处理读写业务逻辑
        */
      class SubReactor implements Runnable {
          final Selector mySelector;
    ​
          //多线程处理业务逻辑
          int workCount =Runtime.getRuntime().availableProcessors();
          ExecutorService executorService = Executors.newFixedThreadPool(workCount);
    ​
    ​
          public SubReactor() throws Exception {
              // 每个SubReactor 一个selector 
              this.mySelector = SelectorProvider.provider().openSelector();
          }
    ​
          /**
            * 注册chanel
            *
            * @param sc
            * @throws Exception
            */
          public void registerChannel(SocketChannel sc) throws Exception {
              sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
          }
    ​
          @Override
          public void run() {
              while (true) {
                  try {
                  //每个SubReactor 自己做事件分派处理读写事件
                      selector.select();
                      Set<SelectionKey> keys = selector.selectedKeys();
                      Iterator<SelectionKey> iterator = keys.iterator();
                      while (iterator.hasNext()) {
                          SelectionKey key = iterator.next();
                          iterator.remove();
                          if (key.isReadable()) {
                              read();
                          } else if (key.isWritable()) {
                              write();
                          }
                      }
    ​
                  } catch (Exception e) {
    ​
                  }
              }
          }
    ​
          private void read() {
              //任务异步处理
              executorService.submit(() -> process());
          }
    ​
          private void write() {
              //任务异步处理
              executorService.submit(() -> process());
          }
    ​
          /**
            * task 业务处理
            */
          public void process() {
              //do IO ,task,queue something
          }
      }
    ​

    第三种模型中,我们可以看到,mainReactor 主要是用来处理网络IO 连接建立操作,通常一个线程就可以处理,而subReactor主要做和建立起来的socket做数据交互和事件业务处理操作,它的个数上一般是和CPU个数等同,每个subReactor一个县城来处理。

    此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。

    关于此种模型的应用,目前有很多优秀的矿建已经在应用了,比如mina 和netty 等。上述中去掉线程池的第三种形式的变种,也 是Netty NIO的默认模式。下一节我们将着重讲解netty的架构模式。

     

    四、事件处理模式

    在 Douglas Schmidt 的大作《POSA2》中有关于事件处理模式的介绍,其中有四种事件处理模式:

    1. Reactor  

    2. Proactor  

    3. Asynchronous Completion Token  

    4. Acceptor-Connector  

    1. Proactor

    本文介绍的Reactor就是其中一种,而Proactor的整体结构和reacotor的处理方式大同小异,不同的是Proactor采用的是异步非阻塞IO的方式实现,对数据的读写由异步处理,无需用户线程来处理,服务程序更专注于业务事件的处理,而非IO阻塞。

    2. Asynchronous Completion Token

    简单来说,ACT就是应对应用程序异步调用服务操作,并处理相应的服务完成事件。从token这个字面意思,我们大概就能了解到,它是一种状态的保持和传递。

    比如,通常应用程序会有调用第三方服务的需求,一般是业务线程请求都到,需要第三方资源的时候,去同步的发起第三方请求,而为了提升应用性能,需要异步的方式发起请求,但异步请求的话,等数据到达之后,此时的我方应用程序的语境以及上下文信息已经发生了变化,你没办法去处理。

    ACT 解决的就是这个问题,采用了一个token的方式记录异步发送前的信息,发送给接受方,接受方回复的时候再带上这个token,此时就能恢复业务的调用场景。

     

    上图中我们可以看到在client processing 这个阶段,客户端是可以继续处理其他业务逻辑的,不是阻塞状态。service 返回期间会带上token信息。  

    3. Acceptor-Connector

    Acceptor-Connector是于Reactor的结合,也可以看成是一种变种,它看起来很像上面介绍的Reactor第三种实现方式,但又有本质的不同。

    Acceptor-Connector模式是将网络中对等服务的连接和初始化分开处理,使系统中的连接建立及服务一旦服务初始化后就分开解除耦合。连接器主动地建立到远地接受器组件的连接,并初始化服务处理器来处理在连接上交换的数据。同样地,接受器被动地等待来自远地连接器的连接请求,在这样的请求到达时建立连接,并初始化服务处理器来处理在连接上交换的数据。随后已初始化的服务处理器执行应用特有的处理,并通过连接器和接受器组件建立的连接来进行通信。

    这么处理的好处是:

    1. 一般而言,用于连接建立和服务初始化的策略变动的频度要远小于应用服务实现和通信协议。

    2. 容易增加新类型的服务、新的服务实现和新的通信协议,而又不影响现有的连接建立和服务初始化软件。比如采用IPX/SPX通信协议或者TCP协议。

    3. 连接角色和通信角色的去耦合,连接角色只管发起连接 vs. 接受连接。通信角色只管数据交互。

    4. 将程序员与低级网络编程API(像socket或TLI)类型安全性的缺乏屏蔽开来。业务开发关系底层通信

    转载:https://my.oschina.net/u/1859679/blog/1844109 

    展开全文
  • Reactor:深入理解reactor core

    万次阅读 2020-11-09 09:26:26
    上篇文章我们简单的介绍了Reactor的发展史和基本的Flux和Mono的使用,本文将会进一步挖掘Reactor的高级用法,一起来看看吧。

    简介

    上篇文章我们简单的介绍了Reactor的发展史和基本的Flux和Mono的使用,本文将会进一步挖掘Reactor的高级用法,一起来看看吧。

    自定义Subscriber

    之前的文章我们提到了4个Flux的subscribe的方法:

    Disposable subscribe(); 
    
    Disposable subscribe(Consumer<? super T> consumer); 
    
    Disposable subscribe(Consumer<? super T> consumer,
              Consumer<? super Throwable> errorConsumer); 
    
    Disposable subscribe(Consumer<? super T> consumer,
              Consumer<? super Throwable> errorConsumer,
              Runnable completeConsumer); 
    
    Disposable subscribe(Consumer<? super T> consumer,
              Consumer<? super Throwable> errorConsumer,
              Runnable completeConsumer,
              Consumer<? super Subscription> subscriptionConsumer);
    

    这四个方法,需要我们使用lambda表达式来自定义consumer,errorConsumer,completeSonsumer和subscriptionConsumer这四个Consumer。

    写起来比较复杂,看起来也不太方便,我们考虑一下,这四个Consumer是不是和Subscriber接口中定义的4个方法是一一对应的呢?

        public static interface Subscriber<T> {
    
            public void onSubscribe(Subscription subscription);
    
            public void onNext(T item);
    
            public void onError(Throwable throwable);
    
            public void onComplete();
        }
    

    对的,所以我们有一个更加简单点的subscribe方法:

    public final void subscribe(Subscriber<? super T> actual) 
    

    这个subscribe方法直接接收一个Subscriber类。从而实现了所有的功能。

    自己写Subscriber太麻烦了,Reactor为我们提供了一个BaseSubscriber的类,它实现了Subscriber中的所有功能,还附带了一些其他的方法。

    我们看下BaseSubscriber的定义:

    public abstract class BaseSubscriber<T> implements CoreSubscriber<T>, Subscription,
                                                       Disposable
    

    注意,BaseSubscriber是单次使用的,这就意味着,如果它首先subscription到Publisher1,然后subscription到Publisher2,那么将会取消对第一个Publisher的订阅。

    因为BaseSubscriber是一个抽象类,所以我们需要继承它,并且重写我们需要自己实现的方法。

    下面看一个自定义的Subscriber:

    public class CustSubscriber<T> extends BaseSubscriber<T> {
    
        public void hookOnSubscribe(Subscription subscription) {
            System.out.println("Subscribed");
            request(1);
        }
    
        public void hookOnNext(T value) {
            System.out.println(value);
            request(1);
        }
    }
    

    BaseSubscriber中有很多以hook开头的方法,这些方法都是我们可以重写的,而Subscriber原生定义的on开头的方法,在BaseSubscriber中都是final的,都是不能重写的。

    我们看一个定义:

    	@Override
    	public final void onSubscribe(Subscription s) {
    		if (Operators.setOnce(S, this, s)) {
    			try {
    				hookOnSubscribe(s);
    			}
    			catch (Throwable throwable) {
    				onError(Operators.onOperatorError(s, throwable, currentContext()));
    			}
    		}
    	}
    

    可以看到,它内部实际上调用了hook的方法。

    上面的CustSubscriber中,我们重写了两个方法,一个是hookOnSubscribe,在建立订阅的时候调用,一个是hookOnNext,在收到onNext信号的时候调用。

    在这些方法中,给了我们足够的自定义空间,上面的例子中我们调用了request(1),表示再请求一个元素。

    其他的hook方法还有: hookOnComplete, hookOnError, hookOnCancel 和 hookFinally。

    Backpressure处理

    我们之前讲过了,reactive stream的最大特征就是可以处理Backpressure。

    什么是Backpressure呢?就是当consumer处理过不来的时候,可以通知producer来减少生产速度。

    我们看下BaseSubscriber中默认的hookOnSubscribe实现:

    	protected void hookOnSubscribe(Subscription subscription){
    		subscription.request(Long.MAX_VALUE);
    	}
    

    可以看到默认是request无限数目的值。 也就是说默认情况下没有Backpressure。

    通过重写hookOnSubscribe方法,我们可以自定义处理速度。

    除了request之外,我们还可以在publisher中限制subscriber的速度。

    	public final Flux<T> limitRate(int prefetchRate) {
    		return onAssembly(this.publishOn(Schedulers.immediate(), prefetchRate));
    	}
    

    在Flux中,我们有一个limitRate方法,可以设定publisher的速度。

    比如subscriber request(100),然后我们设置limitRate(10),那么最多producer一次只会产生10个元素。

    创建Flux

    接下来,我们要讲解一下怎么创建Flux,通常来讲有4种方法来创建Flux。

    使用generate

    第一种方法就是最简单的同步创建的generate.

    先看一个例子:

        public void useGenerate(){
            Flux<String> flux = Flux.generate(
                    () -> 0,
                    (state, sink) -> {
                        sink.next("3 x " + state + " = " + 3*state);
                        if (state == 10) sink.complete();
                        return state + 1;
                    });
    
            flux.subscribe(System.out::println);
        }
    

    输出结果:

    3 x 0 = 0
    3 x 1 = 3
    3 x 2 = 6
    3 x 3 = 9
    3 x 4 = 12
    3 x 5 = 15
    3 x 6 = 18
    3 x 7 = 21
    3 x 8 = 24
    3 x 9 = 27
    3 x 10 = 30
    

    上面的例子中,我们使用generate方法来同步的生成元素。

    generate接收两个参数:

    	public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator) 
    

    第一个参数是stateSupplier,用来指定初始化的状态。

    第二个参数是一个generator,用来消费SynchronousSink,并生成新的状态。

    上面的例子中,我们每次将state+1,一直加到10。

    然后使用subscribe来将所有的生成元素输出。

    使用create

    Flux也提供了一个create方法来创建Flux,create可以是同步也可以是异步的,并且支持多线程操作。

    因为create没有初始的state状态,所以可以用在多线程中。

    create的一个非常有用的地方就是可以将第三方的异步API和Flux关联起来,举个例子,我们有一个自定义的EventProcessor,当处理相应的事件的时候,会去调用注册到Processor中的listener的一些方法。

        interface MyEventListener<T> {
            void onDataChunk(List<T> chunk);
            void processComplete();
        }
    

    我们怎么把这个Listener的响应行为和Flux关联起来呢?

       public void useCreate(){
            EventProcessor myEventProcessor = new EventProcessor();
            Flux<String> bridge = Flux.create(sink -> {
                myEventProcessor.register(
                        new MyEventListener<String>() {
                            public void onDataChunk(List<String> chunk) {
                                for(String s : chunk) {
                                    sink.next(s);
                                }
                            }
                            public void processComplete() {
                                sink.complete();
                            }
                        });
            });
        }
    

    使用create就够了,create接收一个consumer参数:

        public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)
    

    这个consumer的本质是去消费FluxSink对象。

    上面的例子在MyEventListener的事件中对FluxSink对象进行消费。

    使用push

    push和create一样,也支持异步操作,但是同时只能有一个线程来调用next, complete 或者 error方法,所以它是单线程的。

    使用Handle

    Handle和上面的三个方法不同,它是一个实例方法。

    它和generate很类似,也是消费SynchronousSink对象。

    Flux<R> handle(BiConsumer<T, SynchronousSink<R>>);
    

    不同的是它的参数是一个BiConsumer,是没有返回值的。

    看一个使用的例子:

        public void useHandle(){
            Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
                    .handle((i, sink) -> {
                        String letter = alphabet(i);
                        if (letter != null)
                            sink.next(letter);
                    });
    
            alphabet.subscribe(System.out::println);
        }
    
        public String alphabet(int letterNumber) {
            if (letterNumber < 1 || letterNumber > 26) {
                return null;
            }
            int letterIndexAscii = 'A' + letterNumber - 1;
            return "" + (char) letterIndexAscii;
        }
    

    本文的例子learn-reactive

    本文作者:flydean程序那些事

    本文链接:http://www.flydean.com/reactor-core-in-depth/

    本文来源:flydean的博客

    欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

    展开全文
  • Reactor.CHM

    2020-08-31 11:10:42
    Reactor API。...包括:Reactor Adapter、Reactor Core、Reactor Extra、Reactor Kafka、Reactor Kotlin Extensions、Reactor Netty、Reactor Pool、Reactor Project、Reactor RabbitMQ、Reactor Test
  • 抽丝剥茧Reactor模式

    万次阅读 2016-11-03 10:31:53
    今天在看书的时候看到了一个新的设计模式——Reactor模式,这个模式是出现在NIO中,至于这到底是个什么模式,今天我们来细说一下。 reactor模式是javaNIO非堵塞技术的实现原理,我们不仅要知道其原理流程,还要知道...
  • .Net Reactor

    2019-02-15 12:29:20
    .Net Reactor混淆工具, 混淆代码,加密代码,无法通过反编译工具查看到源码
  • .NET Reactor

    2018-09-03 16:26:30
    .NET Reactor 5.0 DLL加密工具,破解版。发布项目需要使用的工具
  • Reactor example

    2021-01-08 19:28:05
    <div><p>Add a reactor example <p>https://github.com/reactor/reactor</p>该提问来源于开源项目:jhalterman/failsafe</p></div>
  • 文章目录单 Reactor 单线程工作原理示意图方案说明方案优缺点分析优点缺点使用场景单 Reactor 多线程工作原理示意图方案说明方案优缺点分析优点缺点主从 Reactor 多线程工作原理示意图方案说明方案优缺点分析优点...
  • Reactor Bindings

    2020-12-09 12:59:41
    <div><p>Is it possible to produce similar reactor bindings? <p>https://github.com/reactor/reactor</p>该提问来源于开源项目:vert-x/mod-rxvertx</p></div>
  • Reactor模式

    2021-02-01 01:01:26
    Reactor的模式 Reactor的模式-单reactor单线程 问题是还是会阻塞 Reactor的模式-主从Reactor多线程
  • fusion reactor

    2020-12-09 03:01:05
    in the helium plasma production, submit americium and naquadria, the reactor after finishing work on helium plasma will begin to produce neutrons, regardless of what the reactor</p><p>该提问来源于...
  • dotNET Reactor6.3

    2020-11-24 21:16:58
    dotNET Reactor6.3版本,相较于之前6.0以下的版本,dotNET Reactor6.3有了更大的改进,更难,已和谐
  • java reactor notify_Reactor

    2021-03-08 21:31:56
    软件简介Reactor 是一个基于 JVM 之上的异步应用基础库。为 Java 、Groovy 和其他 JVM语言提供了构建基于事件和数据驱动应用的抽象库。Reactor 性能相当高,在最新的硬件平台上,使用无堵塞分发器每秒钟可处理 1500 ...
  • reactor rabbitmq

    2020-12-31 09:57:26
    Reactor RabbitMQ is a reactive API for RabbitMQ based on Reactor and RabbitMQ Java Client. Reactor RabbitMQ API enables messages to be published to RabbitMQ and consumed from RabbitMQ using func
  • reactor讲解

    2020-05-29 00:40:23
    1. 为什么是Reactor模式 2. Reactor模式简介 3. 多线程IO的致命缺陷 4. 单线程Reactor模型 4.1. 什么是单线程Reactor呢? 4.2. 单线程Reactor的参考代码 4.3. 单线程模式的缺点: 5. 多线程的Reactor 5.1. 基于线程池...
  • Reactor rabbitMQ

    2020-12-26 00:48:13
    <div><p>Do you plan support for reactor rabbitMQ? https://projectreactor.io/docs/rabbitmq/snapshot/reference/</p><p>该提问来源于开源项目:spring-cloud/spring-cloud-contract</p></div>
  • Reactor.pdf

    2020-03-09 20:28:59
    这个是c++多线程开发中的Reactor模式技术技术讲解,Reactor模式是网络编程中极为重要,许多网络库都以它为基础,适合初学网络编程的同学阅读。
  • Reactor多线程: 针对单Reactor多线程模型中,Reactor在单线程中运行。高并发场景下容易成为性能瓶颈,可以让Reactor在多线程中运行 线程模型: 方法说明: 1.Reactor 对象通过select监控客户端请求事件,收到...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 16,620
精华内容 6,648
热门标签
关键字:

reactor