精华内容
下载资源
问答
  • tomcat nio

    2017-05-06 17:51:00
    tomcat conect 组件 可以选择aio io bio 处理nio 不管哪种 tomcat 转换成http协议。 nio是双向的 nio 组成 buffer buffer 有大小 位置 人工控制字节的大小。 buffer 有两种 直接内存,内存 socket 是自己处理 ...

    tomcat 类似 实现一套socket

    实现http协议。

    nio


    是io处理的一种

    tomcat conect 组件 可以选择aio io bio 处理nio

    不管哪种 tomcat 转换成http协议。

    nio
    是双向的


    nio 组成 buffer
    buffer
    有大小 位置 人工控制字节的大小。


    buffer 有两种 直接内存,内存

    socket 是自己处理

    tomct 按照http标准解析 处理。


    通过
    数据传输通过 优化传输过程。。

    file channel

    sockete channel
    serverscoketchannel


    转载于:https://www.cnblogs.com/itxuexiwang/p/6817492.html

    展开全文
  • Tomcat NIO

    千次阅读 2018-04-11 15:00:23
    说起TomcatNIO,不得不提的就是Connector这个Tomcat组件。Connector是Tomcat的连接器,其主要任务是负责处理收到的请求,并创建一个Request和Response的对象,然后用一个线程用于处理请求,Connector会把Request和...

    说起Tomcat的NIO,不得不提的就是Connector这个Tomcat组件。Connector是Tomcat的连接器,其主要任务是负责处理收到的请求,并创建一个Request和Response的对象,然后用一个线程用于处理请求,Connector会把Request和Response对象传递给该线程,该线程的具体的处理过程就是Container容器的事了。

    在tomcat启动过程中,会初始化Connector,并调用Connector的startInternal()方法开启Connector,开始监听、处理请求。


    想了解Tomcat NIO的工作方式,就得先了解一下Connector的实现原理。下面从三个方面来了解一下Connector组件:Connector的数据结构、Connector初始化以及Connector开启。


    Connector

    Connector的数据结构

    先了解一下Connector的数据结构。Connector的一个主要的属性:ProtocolHandler protocolHandler(协议)


    protocolHandler(协议)

    • 维护服务器使用的协议,如http1.1等。ProtocolHandler是接口,实现类有Http11Nio2Protocol 、Http11Nio2Protocol等
    • 维护服务提供的IO方式,负责EndPoint的初始化、启动。目前有BIO、NIO、AIO等IO方式,来实现监听端口、读写socket数据的功能。通过EndPoint封装实现不同的IO方式
    • EndPoint监听到IO读写,交给Tomcat线程池中的一个线程来处理,SocketProcessor会根据protocolHandler采用的协议,调用协议的process方法处理请求。
    • 维护adapter(适配器),可以将请求/响应数据进行适配


    protocolHandler会找到socket对应的处理器(如Http11Processor),然后进行数据读写、适配,处理。请求由adapter最终会交给servlet处理


    常说的BIO、NIO,主要的应用就在protocolHandler中。protocolHandler负责维护Connector使用的协议以及IO方式。在protocolHandler中,不同的IO方式,会使用不同的EndPoint,具体采用哪种IO方式,取决于采用哪个EndPoint,每一个EndPoint的实现类,都封装了一种IO策略。若采用NIO,则为NioEndpoint。


    Connector初始化


    创建Connector时,会拿到Tomcat目录下conf/server.xml中Connector的协议配置,利用反射创建ProtocolHandler:

    /**
     * Coyote Protocol handler class name.
     * Defaults to the Coyote HTTP/1.1 protocolHandler.
     */
    protected String protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol";
    
    
    public Connector(String protocol) {
    	//设置protocolHandlerClassName类名
        setProtocol(protocol);
        // Instantiate protocol handler
        ProtocolHandler p = null;
        try {
    		//根据server.xml中<connector/>标签的protocol属性值,获取到对应的http协议类
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            p = (ProtocolHandler) clazz.getConstructor().newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        } finally {
            this.protocolHandler = p;
        }
    
        if (Globals.STRICT_SERVLET_COMPLIANCE) {
            uriCharset = StandardCharsets.ISO_8859_1;
        } else {
            uriCharset = StandardCharsets.UTF_8;
        }
    }
    
    //设置protocolHandlerClassName类名
    public void setProtocol(String protocol) {
    
        boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
                AprLifecycleListener.getUseAprConnector();
    
    	//若配置了protocol="HTTP/1.1"或者没配,则默认是Http11NioProtocol或者Http11AprProtocol
        if ("HTTP/1.1".equals(protocol) || protocol == null) {
            if (aprConnector) {
                setProtocolHandlerClassName("org.apache.coyote.http11.Http11AprProtocol");
            } else {
                setProtocolHandlerClassName("org.apache.coyote.http11.Http11NioProtocol");
            }
        } else if ("AJP/1.3".equals(protocol)) {
            if (aprConnector) {
                setProtocolHandlerClassName("org.apache.coyote.ajp.AjpAprProtocol");
            } else {
                setProtocolHandlerClassName("org.apache.coyote.ajp.AjpNioProtocol");
            }
        } else {
    		//直接取配置的类名
            setProtocolHandlerClassName(protocol);
        }
    }

    以Tomcat8.5.20为例,这里默认是http1.1的NIO。



    Connector.start()开启


    Connector初始化后,调用start方法开启。主要涉及一下几个方法:


    Connector的startInternal()方法,会调用protocolHandler.start();

    protocolHandler中会调用endpoint.start(),从而达到开启endpoint、监听端口、读写Socket的目的:

    //Connector开启
    protected void startInternal() throws LifecycleException {
    
        // 校验端口
        if (getPort() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
        }
    	
    	//设置Connector的状态为开启
        setState(LifecycleState.STARTING);
    
        try {
    		//开启protocolHandler
            protocolHandler.start();
        } catch (Exception e) {
            String errPrefix = "";
            if(this.service != null) {
                errPrefix += "service.getName(): \"" + this.service.getName() + "\"; ";
            }
    
            throw new LifecycleException
                (errPrefix + " " + sm.getString
                 ("coyoteConnector.protocolHandlerStartFailed"), e);
        }
    }
    
    
    //protocolHandler开启
    public void start() throws Exception {
        if (getLog().isInfoEnabled())
            getLog().info(sm.getString("abstractProtocolHandler.start",
                    getName()));
        try {
    		//endpoint开启,初始化Processor缓存、event缓存、exector线程池,开启轮询线程、acceptor线程
            endpoint.start();
        } catch (Exception ex) {
            getLog().error(sm.getString("abstractProtocolHandler.startError",
                    getName()), ex);
            throw ex;
        }
    
        // Start async timeout thread
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }


    至此,Connector完成了开启的过程,开启监听端口、可以读写Socket了。


    总结一下,关于Connector:

    创建Connector时,会拿到Tomcat目录下conf/server.xml中Connector的协议配置,利用反射创建ProtocolHandler。

    ProtocolHandler负责维护Connector使用的协议以及IO方式,不同的IO方式如BIO、NIO、AIO封装在EndPoint中

    开启Connector时,会开启protocolHandler,从而达到EndPoint的开启,开始监听端口、读写socket数据了

    protocolHandler中将请求拿到的数据进行适配,通过adapter适配成Request和Response对象,最终交给Container去处理


    下面重点就来了,NIO。

    Tomcat NIO


    Tomcat在处理客户端请求时,读写socket数据是一种网络IO操作。目前Tomcat有几种IO方式,分别是BIO(同步阻塞),NIO(同步非阻塞)和AIO(异步非阻塞)。不同IO方式的读写机制,被封装在了Endpoint中。BIO、AIO不再赘述。这里主要看NIO。

    Tomcat NIO模型

    当然要了解一下Tomcat NIO的模型了。Tomcat NIO是基于Java NIO实现的,其基本原理如下:


    Tomcat NIO是对Java NIO的一种典型的应用方式:通过JDK提供的同步非阻塞的IO方式,实现了IO多路复用,即一个线程管理多个客户端的连接。了解Java NIO,可以看一下Java NIO

    Tomcat在NIO模式下,所有客户端的请求先由一个接收线程接收,然后由若干个(一般为CPU的个数)线程轮询读写事件,最后将具体的读写操作交由线程池处理。

    NioEndpoint


    要了解Tomcat的NIO实现,其实就是了解NioEndpoint的实现原理。


    数据结构

    它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5个部分


    • LimitLatch是连接控制器,它负责维护连接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求。
    • Acceptor负责接收连接,默认是1个线程来执行,将请求的事件注册到事件列表
    • Poller来负责轮询上述产生的事件。Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller将就绪的事件生成SocketProcessor,然后交给Excutor去执行。
    • SocketProcessor继承了SocketProcessorBase,实现了Runnable接口,可以提交给线程池Excutor来执行。它里面的doRun()方法,封装了读写Socket、完成Container调用的逻辑
    • Excutor线程池是一个Tomcat线程池。用来执行Poller创建的SocketProcessor。Excutor线程池的大小就是我们在Connector节点配置的maxThreads的值。


    SocketProcessor被一个线程执行的时候,会完成从socket中读取http request,解析成HttpServletRequest对象,分派到相应的servlet并完成逻辑,然后将response通过socket发回client。在从socket中读数据和往socket中写数据的过程,并没有像典型的非阻塞的NIO的那样,注册OP_READ或OP_WRITE事件到主Selector,而是直接通过socket完成读写,这时是阻塞完成的,但是在timeout控制上,使用了NIO的Selector机制,但是这个Selector并不是Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector,实现可见org.apache.coyote.http11.Http11InputBuffer#fill。


    了解了NioEndPoint的数据结构之后,可以看一下它们的关系图

    NioEndpoint组件关系图



    以上过程就以同步非阻塞的方式完成了网络IO。


    其实是一个Reactor模型:

    • 一个Acceptor(当然多个也行,不过一般场景一个够了)负责accept事件,把接收到SocketChannel注册到按某种算法从Reactor池中取出的一个Reactor上,注册的事件为读,写等,之后这个Socket Channel的所有IO事件都和Acceptor没关系,都由被注册到的那个Reactor来负责。
    • 每个Acceptor和每个Reactor都各自持有一个Selector
    • 当然每个Acceptor和Reactor都是一个线程


    这里的Poller池其实就是一个Reactor池,可以是多个线程。



    NioEndPoint实现


    工作原理简单了解了一下,接下来看一下具体的代码实现吧。先上一个NioEndpoint的UML图:


    NioEndPoint启动


    AbstractEndpoint里实现了一些EndPoint的抽象的通用的方法,其中主要的一个入口方法是org.apache.tomcat.util.net.AbstractEndpoint#start方法

    public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }

    其中,bind()方法和startInternal()方法,由其子类具体实现。

    bind()方法用于初始化endpoint,绑定监听端口等、设置最大线程数、ssl等。

    startInternal()方法在EndPoint初始化完毕后,创建pollers轮询线程以及acceptors线程并开启。

    /**
     * 开启 NIO endpoint, 创建pollers轮询线程以及acceptors线程
     */
    @Override
    public void startInternal() throws Exception {
    
        if (!running) {
            running = true;
            paused = false;
    		//SocketProcessor缓存。若缓存没有,则创建新的SocketProcessor
            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
    		//poller事件缓存
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
    		//nioChannels缓存。
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());
    
            // Create worker collection 创建线程池
            if ( getExecutor() == null ) {
                createExecutor();
            }
    
            initializeConnectionLatch();
    
            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }
    
    		//start acceptor threads
            startAcceptorThreads();
        }
    }
    
    
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];
    
    
        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

    NioEndPoint时序图


    看完了开启EndPoint的过程,再来详细看一下NioEndpoint处理的的时序图:

    通过上面的时序图,结合代码来详细了解一下Acceptor和Poller的工作方式。


    Acceptor接收请求


    NioEndPoint中的Acceptor方法实现了Runnable接口,主要干的活就是上述图中的3,4,5,6,7


    @Override
    public void run() {
    
        int errorDelay = 0;
    
        // 循环,直到收到一个关闭的命令
        while (running) {
    
            // 如果EndPoint被暂停,则循环sleep
            while (paused && running) {
                state = AcceptorState.PAUSED;
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    // Ignore
                }
            }
    
            if (!running) {
                break;
            }
            state = AcceptorState.RUNNING;
    
            try {
                //如果达到了最大连接数,则等待
                countUpOrAwaitConnection();
    
                SocketChannel socket = null;
                try {
                    // 创建一个socketChannel,接收下一个从服务器进来的连接
                    socket = serverSock.accept();
                } catch (IOException ioe) {
                    // We didn't get a socket
                    countDownConnection();
                    if (running) {
                        // Introduce delay if necessary
                        errorDelay = handleExceptionWithDelay(errorDelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // 成功接收,重置error delay
                errorDelay = 0;
    
                // 如果处于EndPoint处于running状态并且没有没暂停,Configure the socket
                if (running && !paused) {
                    // setSocketOptions()将把socket传递给适当的处理器。如果成功,会关闭socket。
                    // 否则,在这里关闭socket
                    if (!setSocketOptions(socket)) {
                        closeSocket(socket);
                    }
                } else {
                    closeSocket(socket);
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("endpoint.accept.fail"), t);
            }
        }
        state = AcceptorState.ENDED;
    }

    看的出来,Acceptor使用serverSock.accept()阻塞的监听端口,如果有连接进来,拿到了socket,并且EndPoint处于正常运行状态,则调用NioEndPoint的setSocketOptions方法,一顿操作。

    至于setSocketOptions做了什么,概括来说就是根据socket构建一个NioChannel,然后把这个的NioChannel注册到Poller的事件列表里面,等待poller轮询。


    看下setSocketOptions的代码:

    /**
     * 处理指定的连接
     * @param socket The socket channel
     * @return  
     *  如果socket配置正确,并且可能会继续处理,返回true 
     *  如果socket需要立即关闭,则返回false
     */
    protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        try {
            //非阻塞模式
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);
    		
    		//从缓存中拿一个nioChannel  若没有,则创建一个。将socket传进去
            NioChannel channel = nioChannels.pop();
            if (channel == null) {
                SocketBufferHandler bufhandler = new SocketBufferHandler(
                        socketProperties.getAppReadBufSize(),
                        socketProperties.getAppWriteBufSize(),
                        socketProperties.getDirectBuffer());
                if (isSSLEnabled()) {
                    channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
                } else {
                    channel = new NioChannel(socket, bufhandler);
                }
            } else {
                channel.setIOChannel(socket);
                channel.reset();
            }
    		//从pollers数组中获取一个Poller对象,注册这个nioChannel
            getPoller0().register(channel);
        } catch (Throwable t) {
            ExceptionUtils.handleThrowable(t);
            try {
                log.error("",t);
            } catch (Throwable tt) {
                ExceptionUtils.handleThrowable(tt);
            }
            // Tell to close the socket
            return false;
        }
        return true;
    }

    显然,下面的重点就是register这个方法了。这个方法是NioEndPoint中的Poller实现的,主要干的事就是在Poller注册新创建的套接字。
    /**
     * 使用轮询器注册新创建的socket
     *
     * @param socket    新创建的socket
     */
    public void register(final NioChannel socket) {
        socket.setPoller(this);
    	//创建一个NioSocketWrapper,包装一下socket。然后一顿设置。
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getSocketProperties().getSoTimeout());
        ka.setWriteTimeout(getSocketProperties().getSoTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
    
    
    	//从缓存中取出一个PollerEvent对象,若没有则创建一个。将socket和NioSocketWrapper设置进去
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
    
    
    	//添到到该Poller的事件列表
        addEvent(r);
    }

    总结一下,从Acceptor接收到请求,它做了这么些工作:

    • 如果达到了最大连接数,则等待。否则,阻塞监听端口。
    • 监听到有连接,则创建一个socketChannel。若服务正常运行,则把socket传递给适当的处理器。如果成功,会关闭socket。


    在这里,适当的处理是指调用NioEndPoint的setSocketOptions方法,处理指定的连接:

    • 将socket设置为非阻塞
    • 从缓存中拿一个nioChannel  若没有,则创建一个。将socket传进去。
    • 从pollers数组中获取一个Poller对象,把nioChannel注册到该Poller中。


    其中最后一步注册的过程,是调用Poller的register()方法:

    • 创建一个NioSocketWrapper,包装socket。然后配置相关属性,设置感兴趣的操作为SelectionKey.OP_READ
    • PollerEvent。PollerEvent可以是从缓存中取出来的,若没有则创建一个。初始化或者重置此Event对象,设置感兴趣的操作为OP_REGISTER (Poller轮询时会用到)
    • 将新的PollerEvent添加到这个Poller的事件列表events,等待Poller线程轮询。


    Poller轮询


    其实上面已经提到了Poller将一个事件注册到事件队列的过程。接下来便是Poller线程如何处理这些事件了,这就是Poller线程的工作机制。

    Poller作为一个线程,实现了Runnable接口的run方法,在run方法中会轮询事件队列events,将每个PollerEvent中的SocketChannel感兴趣的事件注册到Selector中,然后将PollerEvent从队列里移除。之后就是SocketChanel通过Selector调度来进行非阻塞的读写数据了。

    看下Poller.run()代码:

    /**
     * The background thread that adds sockets to the Poller, checks the
     * poller for triggered events and hands the associated socket off to an
     * appropriate processor as events occur.
     */
    @Override
    public void run() {
        // 循环直到 destroy() 被调用
        while (true) {
    
            boolean hasEvents = false;
    
            try {
                if (!close) {
    				//将events队列,将每个事件中的通道感兴趣的事件注册到Selector中
                    hasEvents = events();
                    if (wakeupCounter.getAndSet(-1) > 0) {
                        //如果走到了这里,代表已经有就绪的IO通道
                        //调用非阻塞的select方法,直接返回就绪通道的数量
                        keyCount = selector.selectNow();
                    } else {
    					//阻塞等待操作系统返回 数据已经就绪的通道,然后被唤醒
                        keyCount = selector.select(selectorTimeout);
                    }
                    wakeupCounter.set(0);
                }
                if (close) {
                    events();
                    timeout(0, false);
                    try {
                        selector.close();
                    } catch (IOException ioe) {
                        log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                    }
                    break;
                }
            } catch (Throwable x) {
                ExceptionUtils.handleThrowable(x);
                log.error("",x);
                continue;
            }
            //如果上面select方法超时,或者被唤醒,先将events队列中的通道注册到Selector上。
            if ( keyCount == 0 ) hasEvents = (hasEvents | events());
    
            Iterator<SelectionKey> iterator =
                keyCount > 0 ? selector.selectedKeys().iterator() : null;
            // 遍历已就绪的通道,并调用processKey来处理该Socket的IO。
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // 如果其它线程已调用,则Attachment可能为空
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
    				//创建一个SocketProcessor,放入Tomcat线程池去执行
                    processKey(sk, attachment);
                }
            }//while
    
            //process timeouts
            timeout(keyCount,hasEvents);
        }//while
    
        getStopLatch().countDown();
    }

    读取已就绪通道的部分,是常见的Java NIO的用法,Selector调用selectedKeys(),获取IO数据已经就绪的通道,遍历并调用processKey方法来处理每一个通道就绪的事件。而processKey方法会创建一个SocketProcessor,然后丢到Tomcat线程池中去执行。

    其中需要注意的一个点是,events()方法,用来处理PollerEvent事件,执行PollerEvent.run(),然后将PollerEvent重置再次放入缓存中,以便对象复用。

    /**
     * Processes events in the event queue of the Poller.
     *
     * @return <code>true</code> if some events were processed,
     *   <code>false</code> if queue was empty
     */
    public boolean events() {
        boolean result = false;
    
        PollerEvent pe = null;
        while ( (pe = events.poll()) != null ) {
            result = true;
            try {
    			//把SocketChannel感兴趣的事件注册到Selector中
                pe.run();
                pe.reset();
                if (running && !paused) {
                    eventCache.push(pe);
                }
            } catch ( Throwable x ) {
                log.error("",x);
            }
        }
    
        return result;
    }
    可以看出,PollerEvent.run()方法才是重点:
    public void run() {
    	//Acceptor调用Poller.register()方法时,创建的PollerEvent感兴趣的事件为OP_REGISTER,因此走这个分支
        if (interestOps == OP_REGISTER) {
            try {
    			//将SocketChannel的读事件注册到Poller线程的Selector中,使用Selector来调度IO。
                socket.getIOChannel().register(
                        socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
            } catch (Exception x) {
                log.error(sm.getString("endpoint.nio.registerFail"), x);
            }
        } else {
            final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
            try {
                if (key == null) {
                    // The key was cancelled (e.g. due to socket closure)
                    // and removed from the selector while it was being
                    // processed. Count down the connections at this point
                    // since it won't have been counted down when the socket
                    // closed.
                    socket.socketWrapper.getEndpoint().countDownConnection();
                } else {
                    final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                    if (socketWrapper != null) {
                        //we are registering the key to start with, reset the fairness counter.
                        int ops = key.interestOps() | interestOps;
                        socketWrapper.interestOps(ops);
                        key.interestOps(ops);
                    } else {
                        socket.getPoller().cancelledKey(key);
                    }
                }
            } catch (CancelledKeyException ckx) {
                try {
                    socket.getPoller().cancelledKey(key);
                } catch (Exception ignore) {}
            }
        }
    }

    至此,可以看出Poller线程的作用

    • 将Acceptor接收到的请求注册到Poller的事件队列中
    • Poller轮询事件队列中,处理到达的事件,将PollerEvent中的通道注册到Poller的Selector中
    • 轮询已就绪的通道,对每个就绪通道创建一个SocketProcessor,交个Tomcat线程池去处理


    剩下的事情,就是SocketProcessor怎么适配客户端发来请求的数据、然后怎样交给Tomcat容器去处理了。


    SocketProcessor处理请求


    简单提一下SocketProcessor的处理过程,不是这篇文章的重点。通过上面可以知道,具体处理一个请求,是在SocketProcessor通过线程池去执行的。执行一次请求的时序图




    SocketProcessor中通过Http11ConnectionHandler,取到Htpp11Processor,Htpp11Processor调用prepareRequest方法,准备好请求数据。然后调用CoyoteAdapter的service方法进行request和response的适配,之后交给容器进行处理。


    在CoyoteAdapter的service方法中,主要干了2件事:

    • org.apache.coyote.Request -> org.apache.catalina.connector.Request extends HttpServletRequest,org.apache.coyote.Response -> org.apache.catalina.connector. Response extends HttpServletResponse
    • 将请求交给StandardEngineValue处理


    将请求交给Tomcat容器处理后,后将请求一层一层传递到Engin、Host、Context、Wrapper,最终经过一系列Filter,来到了Servlet,执行我们自己具体的代码逻辑。其中,容器之间数据的传递用到了管道流的机制。这里就不在赘述,以后有时间专门写一篇Tomcat容器的工作原理。


    参考文章:

    《Tomcat内核设计剖析》

    深度解读Tomcat中的NIO模型


    展开全文
  • tomcat:Java使用nio模式实现tomcat
  • tomcat NIO

    2019-04-19 11:13:09
    TomcatNIO是基于I/O复用来实现的。对这点一定要清楚,不然我们的讨论就不在一个逻辑线上。I/O模型一共有阻塞式I/O,非阻塞式I/O,I/O复用(select/poll/epoll),信号驱动式I/O和异步I/O。首先看下IO的五种模型。...

           Tomcat的NIO是基于I/O复用来实现的。对这点一定要清楚,不然我们的讨论就不在一个逻辑线上。I/O模型一共有阻塞式I/O,非阻塞式I/O,I/O复用(select/poll/epoll),信号驱动式I/O和异步I/O。首先看下IO的五种模型。下面是摘录前辈的文章 ,分析的不错,仅做收藏

          

    1.1 五种I/O模型  

    1)阻塞I/O

    2)非阻塞I/O

    3)I/O复用

    4)事件(信号)驱动I/O

    5)异步I/O

     

    1.2 为什么要发起系统调用? 

    因为进程想要获取磁盘中的数据,而能和磁盘打交道的只能是内核, 进程通知内核,说要磁盘中的数据

    此过程就是系统调用 

     

    1.3 一次I/O完成的步骤

    当进程发起系统调用时候,这个系统调用就进入内核模式, 然后开始I/O操作

    I/O操作分为俩个步骤: 

             1) 磁盘把数据装载进内核的内存空间

             2) 内核的内存空间的数据copy到用户的内存空间中(此过程才是真正I/O发生的地方) 

    注意: io调用大多数都是阻塞的

     过程分析

           整个过程:此进程需要对磁盘中的数据进行操作,则会向内核发起一个系统调用,然后此进程,将会被切换出去,

    此进程会被挂起或者进入睡眠状态,也叫不可中 断的睡眠,因为数据还没有得到,只有等到系统调用的结果完成后,

    则进程会被唤醒,继续接下来的操作,从系统调用的开始到系统调用结束经过的步骤:

    ①进程向内核发起一个系统调用,

    ②内核接收到系统调用,知道是对文件的请求,于是告诉磁盘,把文件读取出来

    ③磁盘接收到来着内核的命令后,把文件载入到内核的内存空间里面

    ④内核的内存空间接收到数据之后,把数据copy到用户进程的内存空间(此过程是I/O发生的地方)

    ⑤进程内存空间得到数据后,给内核发送通知

    ⑥内核把接收到的通知回复给进程,此过程为唤醒进程,然后进程得到数据,进行下一步操作

    2.1 阻塞

        是指调用结果返回之前,当前线程会被挂起(线程进入睡眠状态) 函数只有在得到结果之后,才会返回,才能继续执行

    阻塞I/O系统怎么通知进程? 

    I/O 完成后, 系统直接通知进程, 则进程被唤醒   

    第一阶段是指磁盘把数据装载到内核的内存中空间中

    第二阶段是指内核的内存空间的数据copy到用户的内存空间 (这个才是真实I/O操作)

     

    2.2 非阻塞

      非阻塞:进程发起I/O调用,I/O自己知道需过一段时间完成,就立即通知进程进行别的操作,则为非阻塞I/O

    非阻塞I/O,系统怎么通知进程?

    每隔一段时间,问内核数据是否准备完成,系统完成后,则进程获取数据,继续执行(此过程也称盲等待)

    缺点: 无法处理多个I/O,比如用户打开文件,ctrl+C想终止这个操作,是无法停掉的

     

                第一阶段是指磁盘把数据装载到内核的内存中空间中

     

     

                第二阶段是指内核的内存空间的数据copy到用户的内存空间 (这个才是真实I/O操作)

     

    2.3 I/O多路复用 select

    为什么要用I/O多路复用

      某个进程阻塞多个io上 ,一个进程即要等待从键盘输入信息, 另一个准备从硬盘装入信息

    比如通过read这样的命令, 调用了来个io操作,一个io完成了,一个io没有完成, 阻塞着键盘io,磁盘io完成了 ,

    这个进程也是不能响应, 因为键盘io还没有完成,还在阻塞着 , 这个进程还在睡眠状态 ,这个时候怎么办 ?

    由此需要I/O多路复用。

    执行过程

           以后进程在调用io的时候, 不是直接调用io的功能,在系统内核中, 新增了一个系统调用, 帮助进程监控多个io,

    一旦一个进程需要系统调用的时候, 向内核的一个特殊的系统调用,发起申请时,这个进程会被阻塞在这个复用器的调用上,

    所以复用这个功能会监控这些io操作,任何一个io完成了,它都会告诉进程,其中某个io完成,如果进程依赖某个io操作,

    那么这个时候,进程就可以继续后面的操作. 能够帮组进程监控这些io的工具叫做io复用器

    Linux中  I/O 复用器

    select: 就是一种实现,进程需要调用的时候,把请求发送给select ,可以发起多个,但是最多只能支持1024个,先天性的限制  

    poll: 没有限制,但是多余1024个性能会下降

    所以早期的apache 本身prefork mpm模型,主进程在接受多个用户请求的时候,在线请求数超过1024个,就不工作了.

    那么io复用会比前俩种好吗?

      本来进程和系统内核直接沟通的 ,在中间加一个i/o复用select, 如果是传话,找人传话,那么这个传话最后会是什么样的呢?

    虽然解决了多个系统调用的问题,多路io复用本身的后半段依然是阻塞的,阻塞在select 上, 而不是阻塞在系统调用上,

    但是他第二段仍然是阻塞的,由于要扫描所有多个io操作, 多了一个处理机制,性能未必上升, 性能上也许不会有太大的改观

     

            
                        第一阶段是指磁盘把数据装载到内核的内存中空间中

                                            第二阶段是指内核的内存空间的数据copy到用户的内存空间 (这个才是真实I/O操作) 

    2.4 事件驱动 

    进程发起调用,通过回调函数, 内核会记住是那个进程申请的,一旦第一段完成了,就可以向这个进程发起通知,

    这样第一段就是非阻塞的,进程不需要盲等了, 但是第二段依然是阻塞的

     

    事件驱动机制(event-driven)

    正是由于事件驱动机制 ,才能同时相应多个请求的

    比如: 一个web服务器. 一个进程响应多个用户请求

    缺陷: 第二段仍然是阻塞的

    俩种机制

    如果一个事件通知一个进程,进程正在忙, 进程没有听见, 这个怎么办?

    水平触发机制: 内核通知进程来读取数据,进程没来读取数据,内核需要一次一次的通知进程

    边缘触发机制: 内核只通知一次让进程来取数据,进程在超时时间内,随时可以来取数据, 把这个事件信息状态发给进程,好比发个短息给进程,

    nginx

         nginx默认采用了边缘触发驱动机制 

     

        

     

    第一阶段是指磁盘把数据装载到内核的内存中空间中

    第二阶段是指内核的内存空间的数据copy到用户的内存空间 (这个才是真实I/O操作)

     

     

    2.5 异步AIO

    无论第一第二段, 不再向系统调用提出任何反馈, 只有数据完全复制到服务进程内存中后, 才向服务进程返回ok的信息,其它时间,

    进程可以随意做自己的事情,直到内核通知ok信息

    注意: 只在文件中可以实现AIO, 网络异步IO 不可能实现

    nginx:

    nginxfile IO 文件异步请求的
    一个进程响应N个请求
    静态文件界别: 支持sendfile   
    避免浪费复制时间:  mmap 支持内存映射,内核内存复制到进程内存这个过程, 不需要复制了, 直接映射到进程内存中
    支持边缘触发
    支持异步io
    解决了c10k的问题
    c10k : 有一万个同时的并发连接
    c100k: 你懂得

     

     

    第一阶段是指磁盘把数据装载到内核的内存中空间中

    第二阶段是指内核的内存空间的数据copy到用户的内存空间 (这个才是真实I/O操作)

     

    前四种I/O模型属于同步操作,最后一个AIO则属于异步操作 

    2.6 五种模型比较

     

    同步阻塞

    俩段都是阻塞的,所有数据准备完成后,才响应

    同步非阻塞

    磁盘从磁盘复制到内核内存中的时候, 不停询问内核数据是否准备完成. 盲等

    性能有可能更差 ,看上去他可以做别的事情了, 但是其实他在不停的循环. 

    但还是有一定的灵活性的

    缺点: 无法处理多个I/O,比如用户打开文件,ctrl+C想终止这个操作,是无法停掉的

    同步IO

    如果第二段是阻塞的 ,代表是同步的

    第一种,第二种,io复用,事件驱动,都是同步的.  

    异步IO

    内核后台自己处理 ,把大量时间拿来处理用户请求。

    下面看下tomcat NIO的请求流程。

     

         这里先来说下用户态和内核态,直白来讲,如果线程执行的是用户代码,当前线程处在用户态,如果线程执行的是内核里面的代码,当前线程处在内核态。更深层来讲,操作系统为代码所处的特权级别分了4个级别。不过现代操作系统只用到了0和3两个级别。0和3的切换就是用户态和内核态的切换。更详细的可参照《深入理解计算机操作系统》。I/O复用模型,是同步非阻塞,这里的非阻塞是指I/O读写,对应的是recvfrom操作,因为数据报文已经准备好,无需阻塞。说它是同步,是因为,这个执行是在一个线程里面执行的。有时候,还会说它又是阻塞的,实际上是指阻塞在select上面,必须等到读就绪、写就绪等网络事件。有时候我们又说I/O复用是多路复用,这里的多路是指N个连接,每一个连接对应一个channel,或者说多路就是多个channel。复用,是指多个连接复用了一个线程或者少量线程(在Tomcat中是Math.min(2,Runtime.getRuntime().availableProcessors()))。

     

    上面提到的网络事件有连接就绪,接收就绪,读就绪,写就绪四个网络事件。I/O复用主要是通过Selector复用器来实现的,可以结合下面这个图理解上面的叙述。

    Selector图解.png

    二、TOMCAT对IO模型的支持

    tomcat支持IO类型图.png

    tomcat从6以后开始支持NIO模型,实现是基于JDK的java.nio包。这里可以看到对read body 和response body是Blocking的。关于这点在第6.3节源代码阅读有重点介绍。

    三、TOMCAT中NIO的配置与使用

    在Connector节点配置protocol="org.apache.coyote.http11.Http11NioProtocol",Http11NioProtocol协议下默认最大连接数是10000,也可以重新修改maxConnections的值,同时我们可以设置最大线程数maxThreads,这里设置的最大线程数就是Excutor的线程池的大小。在BIO模式下实际上是没有maxConnections,即使配置也不会生效,BIO模式下的maxConnections是保持跟maxThreads大小一致,因为它是一请求一线程模式。

    四、NioEndpoint组件关系图解读

    tomcatnio组成.png

    我们要理解tomcat的nio最主要就是对NioEndpoint的理解。它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5个部分。LimitLatch是连接控制器,它负责维护连接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝连接请求。Acceptor负责接收连接,默认是1个线程来执行,将请求的事件注册到事件列表。有Poller来负责轮询,Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller将就绪的事件生成SocketProcessor同时交给Excutor去执行。Excutor线程池的大小就是我们在Connector节点配置的maxThreads的值。在Excutor的线程中,会完成从socket中读取http request,解析成HttpServletRequest对象,分派到相应的servlet并完成逻辑,然后将response通过socket发回client。在从socket中读数据和往socket中写数据的过程,并没有像典型的非阻塞的NIO的那样,注册OP_READ或OP_WRITE事件到主Selector,而是直接通过socket完成读写,这时是阻塞完成的,但是在timeout控制上,使用了NIO的Selector机制,但是这个Selector并不是Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector。详细源代码可以参照 第6.3节。

    五、NioEndpoint执行序列图

    tomcatnio序列图.png

    在下一小节NioEndpoint源码解读中我们将对步骤1-步骤11依次找到对应的代码来说明。

    六、NioEndpoint源码解读

    6.1、初始化

    无论是BIO还是NIO,开始都会初始化连接限制,不可能无限增大,NIO模式下默认是10000。

    public void startInternal() throws Exception {
    
            if (!running) {
                //省略代码...
                initializeConnectionLatch();
                //省略代码...
            }
        }
    protected LimitLatch initializeConnectionLatch() {
            if (maxConnections==-1) return null;
            if (connectionLimitLatch==null) {
                connectionLimitLatch = new LimitLatch(getMaxConnections());
            }
            return connectionLimitLatch;
        }
    

    6.2、步骤解读

    下面我们着重叙述跟NIO相关的流程,共分为11个步骤,分别对应上面序列图中的步骤。
    步骤1:绑定IP地址及端口,将ServerSocketChannel设置为阻塞。
    这里为什么要设置成阻塞呢,我们一直都在说非阻塞。Tomcat的设计初衷主要是为了操作方便。这样这里就跟BIO模式下一样了。只不过在BIO下这里返回的是Socket,NIO下这里返回的是SocketChannel。

    public void bind() throws Exception {
    
            //省略代码...
            serverSock.socket().bind(addr,getBacklog());
            serverSock.configureBlocking(true); 
            //省略代码...
            selectorPool.open();
        }
    

    步骤2:启动接收线程

    public void startInternal() throws Exception {
    
            if (!running) {
                //省略代码...
                startAcceptorThreads();
            }
        }
    
    //这个方法实际是在它的超类AbstractEndpoint里面    
    protected final void startAcceptorThreads() {
            int count = getAcceptorThreadCount();
            acceptors = new Acceptor[count];
    
            for (int i = 0; i < count; i++) {
                acceptors[i] = createAcceptor();
                Thread t = new Thread(acceptors[i], getName() + "-Acceptor-" + i);
                t.setPriority(getAcceptorThreadPriority());
                t.setDaemon(getDaemon());
                t.start();
            }
        }   
    

    步骤3:ServerSocketChannel.accept()接收新连接

    protected class Acceptor extends AbstractEndpoint.Acceptor {
    
            @Override
            public void run() {
                while (running) {
                    
                    try {
                        //省略代码...
                        SocketChannel socket = null;
                        try {                        
                            socket = serverSock.accept();//接收新连接
                        } catch (IOException ioe) {
                            //省略代码...
                            throw ioe;
                        }
                        //省略代码...
                        if (running && !paused) {
                            if (!setSocketOptions(socket)) {
                                //省略代码...
                            }
                        } else {
                            //省略代码...
                        }
                    } catch (SocketTimeoutException sx) {
                        
                    } catch (IOException x) {
                        //省略代码...
                    } catch (OutOfMemoryError oom) {
                        //省略代码...
                    } catch (Throwable t) {
                        //省略代码...
                    }
                }
               
            }
        }
    

    步骤4:将接收到的链接通道设置为非阻塞
    步骤5:构造NioChannel对象
    步骤6:register注册到轮询线程

    protected boolean setSocketOptions(SocketChannel socket) {
            
            try {
                
                socket.configureBlocking(false);//将连接通道设置为非阻塞
                Socket sock = socket.socket();
                socketProperties.setProperties(sock);
    
                NioChannel channel = nioChannels.poll();//构造NioChannel对象
                //省略代码...
                getPoller0().register(channel);//register注册到轮询线程
            } catch (Throwable t) {
               //省略代码...
            }
            //省略代码...
        }
    

    步骤7:构造PollerEvent,并添加到事件队列

    protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();
    public void register(final NioChannel socket)
            {
                //省略代码...
                PollerEvent r = eventCache.poll();
                //省略代码...
                addEvent(r);
            }
    

    步骤8:启动轮询线程

    public void startInternal() throws Exception {
    
            if (!running) {
                //省略代码...
                // Start poller threads
                pollers = new Poller[getPollerThreadCount()];
                for (int i=0; i<pollers.length; i++) {
                    pollers[i] = new Poller();
                    Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                    pollerThread.setPriority(threadPriority);
                    pollerThread.setDaemon(true);
                    pollerThread.start();
                }
                //省略代码...
            }
        }
    

    步骤9:取出队列中新增的PollerEvent并注册到Selector

    public static class PollerEvent implements Runnable {
    
            //省略代码...
    
            @Override
            public void run() {
                if ( interestOps == OP_REGISTER ) {
                    try {
                        socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
                    } catch (Exception x) {
                        log.error("", x);
                    }
                } else {
                    //省略代码...
                }//end if
            }//run
            //省略代码...
        }
    

    步骤10:Selector.select()

    public void run() {
                // Loop until destroy() is called
                while (true) {
                    try {
                        //省略代码...
                        try {
                            if ( !close ) {
                                if (wakeupCounter.getAndSet(-1) > 0) {
                                    keyCount = selector.selectNow();
                                } else {
                                    keyCount = selector.select(selectorTimeout);
                                }
                                //省略代码...
                            }
                            //省略代码...
                        } catch ( NullPointerException x ) {
                            //省略代码...
                        } catch ( CancelledKeyException x ) {
                            //省略代码...
                        } catch (Throwable x) {
                            //省略代码...
                        }
                        //省略代码...
    
                        Iterator<SelectionKey> iterator =
                            keyCount > 0 ? selector.selectedKeys().iterator() : null;
                        
                        while (iterator != null && iterator.hasNext()) {
                            SelectionKey sk = iterator.next();
                            KeyAttachment attachment = (KeyAttachment)sk.attachment();
                            
                            if (attachment == null) {
                                iterator.remove();
                            } else {
                                attachment.access();
                                iterator.remove();
                                processKey(sk, attachment);//此方法跟下去就是把SocketProcessor交给Excutor去执行
                            }
                        }//while
    
                        //省略代码...
                    } catch (OutOfMemoryError oom) {
                        //省略代码...
                    }
                }//while
                //省略代码...
            }
    

    步骤11:根据选择的SelectionKey构造SocketProcessor提交到请求处理线程

    public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
            try {
                //省略代码...
                SocketProcessor sc = processorCache.poll();
                if ( sc == null ) sc = new SocketProcessor(socket,status);
                else sc.reset(socket,status);
                if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
                else sc.run();
            } catch (RejectedExecutionException rx) {
                //省略代码...
            } catch (Throwable t) {
                //省略代码...
            }
            //省略代码...
        }
    

    6.3、NioBlockingSelector和BlockPoller介绍

    上面的序列图有个地方我没有描述,就是NioSelectorPool这个内部类,是因为在整体理解tomcat的nio上面在序列图里面不包括它更好理解。在有了上面的基础后,我们在来说下NioSelectorPool这个类,对更深层了解Tomcat的NIO一定要知道它的作用。NioEndpoint对象中维护了一个NioSelecPool对象,这个NioSelectorPool中又维护了一个BlockPoller线程,这个线程就是基于辅Selector进行NIO的逻辑。以执行servlet后,得到response,往socket中写数据为例,最终写的过程调用NioBlockingSelector的write方法。代码如下:

    public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException {  
            SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());  
            if ( key == null ) throw new IOException("Key no longer registered");  
            KeyAttachment att = (KeyAttachment) key.attachment();  
            int written = 0;  
            boolean timedout = false;  
            int keycount = 1; //assume we can write  
            long time = System.currentTimeMillis(); //start the timeout timer  
            try {  
                while ( (!timedout) && buf.hasRemaining()) {  
                    if (keycount > 0) { //only write if we were registered for a write  
                        //直接往socket中写数据  
                        int cnt = socket.write(buf); //write the data  
                        lastWrite.set(cnt);  
                        if (cnt == -1)  
                            throw new EOFException();  
                        written += cnt;  
                        //写数据成功,直接进入下一次循环,继续写  
                        if (cnt > 0) {  
                            time = System.currentTimeMillis(); //reset our timeout timer  
                            continue; //we successfully wrote, try again without a selector  
                        }  
                    }  
                    //如果写数据返回值cnt等于0,通常是网络不稳定造成的写数据失败  
                    try {  
                        //开始一个倒数计数器   
                        if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1);  
                        //将socket注册到辅Selector,这里poller就是BlockSelector线程  
                        poller.add(att,SelectionKey.OP_WRITE);  
                        //阻塞,直至超时时间唤醒,或者在还没有达到超时时间,在BlockSelector中唤醒  
                        att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS);  
                    }catch (InterruptedException ignore) {  
                        Thread.interrupted();  
                    }  
                    if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) {  
                        keycount = 0;  
                    }else {  
                        //还没超时就唤醒,说明网络状态恢复,继续下一次循环,完成写socket  
                        keycount = 1;  
                        att.resetWriteLatch();  
                    }  
      
                    if (writeTimeout > 0 && (keycount == 0))  
                        timedout = (System.currentTimeMillis() - time) >= writeTimeout;  
                } //while  
                if (timedout)   
                    throw new SocketTimeoutException();  
            } finally {  
                poller.remove(att,SelectionKey.OP_WRITE);  
                if (timedout && key != null) {  
                    poller.cancelKey(socket, key);  
                }  
            }  
            return written;  
        }
    

    也就是说当socket.write()返回0时,说明网络状态不稳定,这时将socket注册OP_WRITE事件到辅Selector,由BlockPoller线程不断轮询这个辅Selector,直到发现这个socket的写状态恢复了,通过那个倒数计数器,通知Worker线程继续写socket动作。看一下BlockSelector线程的代码逻辑:

    public void run() {  
                while (run) {  
                    try {  
                        ......  
      
                        Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;  
                        while (run && iterator != null && iterator.hasNext()) {  
                            SelectionKey sk = (SelectionKey) iterator.next();  
                            KeyAttachment attachment = (KeyAttachment)sk.attachment();  
                            try {  
                                attachment.access();  
                                iterator.remove(); ;  
                                sk.interestOps(sk.interestOps() & (~sk.readyOps()));  
                                if ( sk.isReadable() ) {  
                                    countDown(attachment.getReadLatch());  
                                }  
                                //发现socket可写状态恢复,将倒数计数器置位,通知Worker线程继续  
                                if (sk.isWritable()) {  
                                    countDown(attachment.getWriteLatch());  
                                }  
                            }catch (CancelledKeyException ckx) {  
                                if (sk!=null) sk.cancel();  
                                countDown(attachment.getReadLatch());  
                                countDown(attachment.getWriteLatch());  
                            }  
                        }//while  
                    }catch ( Throwable t ) {  
                        log.error("",t);  
                    }  
                }  
                events.clear();  
                try {  
                    selector.selectNow();//cancel all remaining keys  
                }catch( Exception ignore ) {  
                    if (log.isDebugEnabled())log.debug("",ignore);  
                }  
            } 
    
    

    使用这个辅Selector主要是减少线程间的切换,同时还可减轻主Selector的负担。

    七、关于性能

    下面这份报告是我们压测的一个结果,跟想象的是不是不太一样?几乎没有差别,实际上NIO优化的是I/O的读写,如果瓶颈不在这里的话,比如传输字节数很小的情况下,BIO和NIO实际上是没有差别的。NIO的优势更在于用少量的线程hold住大量的连接。还有一点,我们在压测的过程中,遇到在NIO模式下刚开始的一小段时间内容,会有错误,这是因为一般的压测工具是基于一种长连接,也就是说比如模拟1000并发,那么同时建立1000个连接,下一时刻再发送请求就是基于先前的这1000个连接来发送,还有TOMCAT的NIO处理是有POLLER线程来接管的,它的线程数一般等于CPU的核数,如果一瞬间有大量并发过来,POLLER也会顿时处理不过来。

    压测1.jpeg

    压测2.jpeg

    八、总结

    NIO只是优化了网络IO的读写,如果系统的瓶颈不在这里,比如每次读取的字节说都是500b,那么BIO和NIO在性能上没有区别。NIO模式是最大化压榨CPU,把时间片都更好利用起来。对于操作系统来说,线程之间上下文切换的开销很大,而且每个线程都要占用系统的一些资源如内存,有关线程资源可参照这篇文章《一台java服务器可以跑多少个线程》。因此,使用的线程越少越好。而I/O复用模型正是利用少量的线程来管理大量的连接。在对于维护大量长连接的应用里面更适合用基于I/O复用模型NIO,比如web qq这样的应用。所以我们要清楚系统的瓶颈是I/O还是CPU的计算



    转载:https://www.jianshu.com/p/76ff17bc6dea

    展开全文
  • Tomcat nio

    2017-09-23 16:52:57
    tomcat的运行模式有3种.修改他们的运行模式.3种模式的运行是否成功,可以看他的启动控制台,或者启动日志.或者登录他们的默认页面http://localhost:8080/查看其中的服务器状态。 1)bio 默认的模式,性能非常低下,...

    tomcat的运行模式有3种.修改他们的运行模式.3种模式的运行是否成功,可以看他的启动控制台,或者启动日志.或者登录他们的默认页面http://localhost:8080/查看其中的服务器状态。

    1)bio

    默认的模式,性能非常低下,没有经过任何优化处理和支持.

    2)nio

    利用java的异步io护理技术,no blocking IO技术.

    想运行在该模式下,直接修改server.xml里的Connector节点,修改protocol为

     <Connector port="80" protocol="org.apache.coyote.http11.Http11NioProtocol" 
    	connectionTimeout="20000" 
    	URIEncoding="UTF-8" 
    	useBodyEncodingForURI="true" 
    	enableLookups="false" 
    	redirectPort="8443" /> 

    启动后,就可以生效。

    3)apr

    安装起来最困难,但是从操作系统级别来解决异步的IO问题,大幅度的提高性能.

    必须要安装apr和native,直接启动就支持apr。下面的修改纯属多余,仅供大家扩充知识,但仍然需要安装apr和native

    如nio修改模式,修改protocol为org.apache.coyote.http11.Http11AprProtocol

     

    Tomcat 6.X实现了JCP的Servlet 2.5和JSP2.1的规范,并且包括其它很多有用的功能,使它成为开发
    和部署web应用和web服务的坚实平台。
           NIO (No-blocking I/O)从JDK 1.4起,NIO API作为一个基于缓冲区,并能提供非阻塞I/O操作的API
    被引入。 


           作为开源web服务器的java实现,tomcat几乎就是web开发者开发、测试的首选,有很多其他商业服务
    器的开发者也会优先选择tomcat作为开发时候使用,而在部署的时候,把应用发布在商业服务器上。也有
    许多商业应用部署在tomcat上,tomcat承载着其核心的应用。但是很多开发者很迷惑,为什么在自己的应
    用里使用tomcat作为平台的时候,而并发用户超过一定数量,服务器就变的非常繁忙,而且很快就出现了
    connection refuse的错误。但是很多商业应用部署在tomcat上运行却安然无恙。

          其中有个很大的原因就是,配置良好的tomcat都会使用APR(Apache Portable Runtime),APR是
    Apache HTTP Server2.x的核心,它是高度可移植的本地库,它使用高性能的UXIN I/O操作,低性能的
    java io操作,但是APR对很多Java开发者而言可能稍稍有点难度,在很多OS平台上,你可能需要重新编
    译APR。但是从Tomcat6.0以后, Java开发者很容易就可以是用NIO的技术来提升tomcat的并发处理能力。
    但是为什么NIO可以提升tomcat的并发处理能力呢,我们先来看一下java 传统io与 java NIO的差别。
         
    Java 传统的IO操作都是阻塞式的(blocking I/O), 如果有socket的编程基础,你会接触过堵塞socket和
    非堵塞socket,堵塞socket就是在accept、read、write等IO操作的的时候,如果没有可用符合条件的资
    源,不马上返回,一直等待直到有资源为止。而非堵塞socket则是在执行select的时候,当没有资源的时
    候堵塞,当有符合资源的时候,返回一个信号,然后程序就可以执行accept、read、write等操作,一般来
    说,如果使用堵塞socket,通常我们通常开一个线程accept socket,当读完这次socket请求的时候,开一
    个单独的线程处理这个socket请求;如果使用非堵塞socket,通常是只有一个线程,一开始是select状,
    当有信号的时候可以通过 可以通过多路复用(Multiplexing)技术传递给一个指定的线程池来处理请求,然
    后原来的线程继续select状态。 最简单的多路复用技术可以通过java管道(Pipe)来实现。换句话说,如果
    客户端的并发请求很大的时候,我们可以使用少于客户端并发请求的线程数来处理这些请求,而这些来不
    及立即处理的请求会被阻塞在java管道或者队列里面,等待线程池的处理。请求 听起来很复杂,在这个架
    构当道的java 世界里,现在已经有很多优秀的NIO的架构方便开发者使用,比如Grizzly,Apache Mina等
    等,如果你对如何编写高性能的网络服务器有兴趣,你可以研读这些源代码。

          简单说一下,在web服务器上阻塞IO(BIO)与NIO一个比较重要的不同是,我们使用BIO的时候往往会
    为每一个web请求引入多线程,每个web请求一个单独的线程,所以并发量一旦上去了,线程数就上去
    了,CPU就忙着线程切换,所以BIO不合适高吞吐量、高可伸缩的web服务器;而NIO则是使用单线程(单
    个CPU)或者只使用少量的多线程(多CPU)来接受Socket,而由线程池来处理堵塞在pipe或者队列里的请
    求.这样的话,只要OS可以接受TCP的连接,web服务器就可以处理该请求。大大提高了web服务器的可
    伸缩性。

        我们来看一下配置,你只需要在server.xml里把 HTTP Connector做如下更改,

        <Connector port="8080" protocol="HTTP/1.1" 
                   connectionTimeout="20000" 
                   redirectPort="8443" />
        改为
        <Connector port="8080" protocol="org.apache.coyote.http11.Http11NioProtocol" 
                   connectionTimeout="20000" 
                   redirectPort="8443" />

    然后启动服务器,你会看到org.apache.coyote.http11.Http11NioProtocol start的信息,表示NIO已经启动。其他的配置请参考官方配置文档。

    Enjoy it.

    最后贴上官方文档上对tomcat的三种Connector的方式做一个简单比较,
        

    Java Blocking Connector       Java Nio Blocking Connector       APR Connector
    
    Classname         Http11Protocol                  Http11NioProtocol         Http11AprProtocol
    
    Tomcat Version   3.x 4.x 5.x 6.x                       6.x                     5.5.x 6.x
    
    Support Polling         NO                             YES                        YES
    
    Polling Size           N/A                   Unlimited - Restricted by mem        Unlimited
    
    Read HTTP Request     Blocking                     Blocking                       Blocking
    
    Read HTTP Body        Blocking                     Blocking                       Blocking
    
    Write HTTP Response   Blocking                     Blocking                       Blocking
    
    SSL Support           Java SSL                     Java SSL                       OpenSSL
    
    SSL Handshake         Blocking                     Non blocking                   Blocking
    
    Max Connections       maxThreads                   See polling size               See polling size
     
     
    如果读者有socket的编程基础,应该会接触过堵塞socket和非堵塞socket,堵塞socket就是在accept、read、write等IO操作的的时候,如果没有可用符合条件的资源,不马上返回,一直等待直到有资源为止。而非堵塞socket则是在执行select的时候,当没有资源的时候堵塞,当有符合资源的时候,返回一个信号,然后程序就可以执行accept、read、write等操作,这个时候,这些操作是马上完成,并且马上返回。而windows的winsock则有所不同,可以绑定到一个EventHandle里,也可以绑定到一个HWND里,当有资源到达时,发出事件,这时执行的io操作也是马上完成、马上返回的。一般来说,如果使用堵塞socket,通常我们时开一个线程accept socket,当有socket链接的时候,开一个单独的线程处理这个socket;如果使用非堵塞socket,通常是只有一个线程,一开始是select状态,当有信号的时候马上处理,然后继续select状态。 
       
      按照大多数人的说法,堵塞socket比非堵塞socket的性能要好。不过也有小部分人并不是这样认为的,例如Indy项目(Delphi一个比较出色的网络包),它就是使用多线程+堵塞socket模式的。另外,堵塞socket比非堵塞socket容易理解,符合一般人的思维,编程相对比较容易。 
       
      nio其实也是类似上面的情况。在JDK1.4,sun公司大范围提升Java的性能,其中NIO就是其中一项。Java的IO操作集中在java.io这个包中,是基于流的阻塞API(即BIO,Block IO)。对于大多数应用来说,这样的API使用很方便,然而,一些对性能要求较高的应用,尤其是服务端应用,往往需要一个更为有效的方式来处理IO。从JDK 1.4起,NIO API作为一个基于缓冲区,并能提供非阻塞O操作的API(即NIO,non-blocking IO)被引入。 
       
      BIO与NIO一个比较重要的不同,是我们使用BIO的时候往往会引入多线程,每个连接一个单独的线程;而NIO则是使用单线程或者只使用少量的多线程,每个连接共用一个线程。  
       
      这个时候,问题就出来了:我们非常多的java应用是使用ThreadLocal的,例如JSF的FaceContext、Hibernate的session管理、Struts2的Context的管理等等,几乎所有框架都或多或少地应用ThreadLocal。如果存在冲突,那岂不惊天动地? 
       
      后来终于在Tomcat6的文档(http://tomcat.apache.org/tomcat-6.0-doc/aio.html)找到答案。根据上面说明,应该Tomcat6应用nio只是用在处理发送、接收信息的时候用到,也就是说,tomcat6还是传统的多线程Servlet,我画了下面两个图来列出区别: 
       
      tomcat5:客户端连接到达 -> 传统的SeverSocket.accept接收连接 -> 从线程池取出一个线程 -> 在该线程读取文本并且解析HTTP协议 -> 在该线程生成ServletRequest、ServletResponse,取出请求的Servlet -> 在该线程执行这个Servlet -> 在该线程把ServletResponse的内容发送到客户端连接 -> 关闭连接。 
       
      我以前理解的使用nio后的tomcat6:客户端连接到达 -> nio接收连接 -> nio使用轮询方式读取文本并且解析HTTP协议(单线程) -> 生成ServletRequest、ServletResponse,取出请求的Servlet -> 直接在本线程执行这个Servlet -> 把ServletResponse的内容发送到客户端连接 -> 关闭连接。 
       
      实际的tomcat6:客户端连接到达 -> nio接收连接 -> nio使用轮询方式读取文本并且解析HTTP协议(单线程) -> 生成ServletRequest、ServletResponse,取出请求的Servlet -> 从线程池取出线程,并在该线程执行这个Servlet -> 把ServletResponse的内容发送到客户端连接 -> 关闭连接。   
       
       
      从上图可以看出,BIO与NIO的不同,也导致进入客户端处理线程的时刻有所不同:tomcat5在接受连接后马上进入客户端线程,在客户端线程里解析HTTP协议,而tomcat6则是解析完HTTP协议后才进入多线程,另外,tomcat6也比5早脱离客户端线程的环境。 
       
      实际的tomcat6与我之前猜想的差别主要集中在如何处理servlet的问题上。实际上即使抛开ThreadLocal的问题,我之前理解tomcat6只使用一个线程处理的想法其实是行不同的。大家都有经验:servlet是基于BIO的,执行期间会存在堵塞的,例如读取文件、数据库操作等等。tomcat6使用了nio,但不可能要求servlet里面要使用nio,而一旦存在堵塞,效率自然会锐降。 
        
       
      所以,最终的结论当然是tomcat6的servlet里面,ThreadLocal照样可以使用,不存在冲突。 
    http://www.cnblogs.com/sunwei2012/archive/2010/03/05/1679299.html
    展开全文
  • 本文使用jdk1.8.0_45和spring boot 2.1.4.RELEASE涉及源码都放在https://github.com/sabersword/Nio前因这周遇到一个连接断开的问题,便沿着这条线学习了一下Java NIO,顺便验证一下Tomcat作为spring boot默认的web...
  • 很早就听说tomcat6使用nio了,这几天突然想到一个问题,使用nio代替传统的bio,ThreadLocal岂不是会存在冲突?     首先,何谓nio?     如果读者有socket的编程基础,应该会接触过堵塞socket和非堵塞...
  • Tomcat NIO 基本架构

    2017-12-13 18:21:21
    Tomcat NIO 基本架构图示
  • tomcat NIO配置

    2018-03-23 17:39:00
    1.tomcat NIO配置 今天在查看日志时发现tomcat的Socket连接方式为bio,于是我想既然有bio那肯定有nio。果然,一查就发现tomcat在6.0之后就可以配置nio的方式。nio方式比bio具有更好的并发性,如果Web应用需要更好的...
  • NIOTomcat中的应用

    2019-11-09 12:20:43
    NIO的理解 个人单方面认为,NIO与BIO的最大区别在于主动和被动,使用BIO的方式需要等待被调用方返回数据,很明显此时调用者是被动的。 举个例子 阻塞IO 假设你是一个胆小又害羞的男孩子,你约了隔壁测试的妹子...
  • 浅析tomcat nio 配置

    2019-02-22 13:39:52
    浅析tomcat nio 配置
  • Java NIOTomcat 原理理解

    千次阅读 2015-12-11 11:47:17
    tomcat的运行模式有3种.修改他们的运行模式.3种模式的运行是否成功,可以看他的启动控制台,或者启动日志.或者登录他们的默认页面http://localhost:8080/查看其中的服务器状态。 1)bio 默认的模式,性能非常低下,没有...
  • NIO (No-blocking I/O)从JDK 1.4起,NIO API作为一个基于缓冲区,并能提供非阻塞I/O操作的API被引入。作为开源web服务器的java实现,tomcat几乎就是web开发者开发、测试的***,有很多其他商业服务...
  • Nio Bio Netty TomcatNIO

    2017-12-04 12:18:00
    上面说了,nio的ServerSocketAdaptor是对bio的ServerSocket的封装,nio的ServerSocketChannelImpl和bio的SocketImpl是差不多的东西,都是对文件的被描述。nio的channel是对selector的适配 而netty的nio...
  • 1、Java NIO 基本介绍 Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的 输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的 NIO 相关类都被放在 ...
  • nio不太好理解,tomcat6在nio方面做了不小的支持,从那里可以学习到如何用好nio,下面是几张在网上找到的关于nio的架构图,可以帮助理解 BIO与NIO的区别: tomcat 5与tomcat 6 的区别: tomcat5: ...
  • Tomcat配置NIO

    2019-09-29 22:31:46
    tomcat的运行模式有3种.修改他们的运行模式.3种模式的运行是否成功,可以看他的启动控制台,或者启动日志.或者登录他们的默认页面http://localhost:8080/查看其中的服务器状态。 1)bio 默认的模式,性能非常低下,没有...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,218
精华内容 1,287
关键字:

niotomcat