精华内容
下载资源
问答
  • Netty原理

    2021-04-26 16:59:24
    Netty原理1 概述1.1 netty是什么?1.2 特点2 原理2.1 IO模型2.1.1 阻塞IO2.1.2 IO多路复用2.1.3 异步IO2.2 netty模型3 架构4 实现reactor线程模型Reactor 多线程单线程 reactorFuture 和 Promise事件模型Pipleline零...

    1 概述

    netty的基本了解

    1.1 netty是什么?

    快速简单的网络编程框架,极大的简化了基于TCP UDP协议的网络编程,简单,灵活,高性能,稳定

    1.2 特点

    • 设计

      支持多种IO模型,阻塞或非阻塞
      基于灵活、可扩展的事件模型,更专注于业务实现
      定制化的线程模型,单线程,线程池或多线程池,如SEDA
      真正的无连接数据报,socket支持

    • 性能

      更高的吞吐量,更小的延迟
      消耗少量系统资源
      很少的内存Copy

    • 安全

      支持SSL/TLS,StartTLS

    2 原理

    2.1 IO模型

    先了解下操作系统相关IO通信模型

    2.1.1 阻塞IO

    同步阻塞IO
    线程在读取网络数据时,分为两个阶段,等待数据和复制数据阶段,等待数据阶段等待网卡读到数据并存入网卡对应的内存缓存区,复制数据阶段是将网卡缓冲区的的数据复制到用户空间,即用户程序可以访问的内存区域。
    同步阻塞IO线程在等待数据和复制数据两个阶段都阻塞,具体是在系统调用recvfrom的过程中阻塞,等待数据复制到用户空间后,才处理数据。如下图:
    在这里插入图片描述
    同步非阻塞IO
    线程在等待数据期间不阻塞,而是通过持续的发送recvfrom向内核请求数据状态,内核立即返回而不阻塞,这样线程灵活一些,线程不阻塞,但是耗费资源,没有数据时要持续的调用。而在等待数据复制到内核空间这个时间段阻塞,复制完成后处理数据。
    在这里插入图片描述

    2.1.2 IO多路复用

    也称作NIO,一个线程监听多个socket,处理多个socket连接的IO事件,IO多路复用也是特殊的阻塞IO
    select
    等待数据阶段,线程阻塞,用户程序调用select系统调用,操作系统监听所有select负责的socket,一旦有一个socket数据准备好了,操作系统即返回数据可用,用户再去调用recvfrom系统调用,将数据从内核空间读到用户空间。
    复制数据阶段,线程阻塞,等待复制完成后处理数据。select一个线程处理多个socket连接,所以socket连接少时性能没有阻塞线程好,但是一个线程可以处理多个socket连接的IO事件,适合处理大量并发连接的场景。

    在这里插入图片描述

    poll
    和select,区别在于监听的socket对应的文件描述符,使用链表存储,没有数量限制。
    epoll
    等待数据阶段,线程不阻塞,用户程序注册一个信号handler来处理对应的socket事件,然后线程返回继续做后续的事情,当内核数据准备好了会发送一个信号,程序调用recvfrom系统调用,将数据从内核空间拷贝到用户空间。
    复制数据阶段,线程阻塞,等待数据复制到用户空间后,处理数据。
    在这里插入图片描述
    select,poll,epoll比较

    • 单个进程能够监视的文件描述符的数量存在最大限制1024。select处理连接时,用户进程每次把所有负责的socket连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,再让用户应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll能处理的并发连接有限。select模型下socket连接越多性能越差。
    • poll有select同样的问题
    • epoll没有循环处理文件描述符的问题,通过回调的方式实现,不需要将所有句柄复制到内核态去轮询。

    2.1.3 异步IO

    异步IO也称作AIO,等待数据阶段和数据复制阶段都不阻塞,系统发起aio_read调用,立即返回,等待数据复制完成之后,向用户程序发出信号,进行信号处理,用户程序处理数据。整过过程线程不阻塞,Java1.7以后提供了AIO支持。
    在这里插入图片描述

    2.2 netty模型

    netty提供了select,epoll,kqueue的IO多路复用模型NIO的实现,通过提供NioEventLoopGroup,EpollEventLoopGroup,KqueueEventLoopGroup线程模型来实现。

    3 架构

    3.1 功能架构

    通过官方文档提供的架构图来了解

    在这里插入图片描述
    netty在底层具有零拷贝能力的丰富字节缓冲的基础上,提供通用的通信API,可扩展的事件模型,支持TCP,UDP通信,HTTP tunnel传输,In-VM pipe方式的传输服务。支持http、websocket通信协议,SSL和StartTLS通信协议,Google protobuf数据编解码协议,zlib、gzip压缩协议,大文件传输,RTSP协议等传输协议。

    3.2 reactor模型

    以NioEventLoopGroup为例,来学习reactor线程模型流程。

    在这里插入图片描述
    客户端连接时,Boss线程组处理连接事件,处理方式为EventLoopGroup轮询处理,step1先select,拿到就绪的socket列表,接下来step2处理就绪的channel key,将channel注册到work EventLoopGroup,step3执行队列中任务,完成后继续轮询处理连接事件

    通信通道channel注册到work组中的某一个NioEventLoop后,NioEventLoop来处理channel的读写事件,同样是轮询处理,step1先select,拿到就绪的socket列表,接下来step2处理就绪的channel key,执行流水线处理程序,通常在这里执行业务数据处理程序,step3执行任务,完成后继续轮询处理IO事件
    这个流程图简单描述了netty的核心流程。

    3.3 高性能

    netty的一下特点来支撑Netty的高性能

    • 基于I/O多路复用模型
    • 零拷贝
    • 基于NIO的Buffer
    • 基于内存池的方式,循环重用缓冲区,避免缓冲区的重建销毁损耗性能。
    • 无锁化的串行设计理念
    • I/O操作的异步处理
    • 提供对protobuf等高性能序列化协议支持

    4 实现

    通过源码,学习netty如何实现的。
    先大体了解Netty所涉及的各个组件:

    • NioEventLoopGroup,多线程模式处理基于NIO Selector的channel的时间,通常Netty需要两个NioEventLoopGroup对象bossGroup和- -workGroup,分别处理不同的事件,select模型的具体实现。同时netty提供了epoll,kqueue和本地IO模型的实现
    • NioEventLoopGroup,基于select实现核心的reactor线程模型,轮询select事件,处理事件,并执行其他任务。netty特提供了其他模型的实现。
    • Channel,与socket联系,或者与能够进行I / O操作(例如读取,写入,连接和绑定)的组件的联系,IO操作的通道。
    • Selector,多路复用器,执行select操作,由jdk底层实现。支持select操作的Channel注册到Selector,selector进行select操作,找到发生IO事件的channel,通过selectedKey的方式返回给调用程序。
    • Pipieline,IO事件处理流水线,多个处理程序按照顺序进行数据、业务逻辑处理。
    • ChannelHandler,数据或业务逻辑处理程序。
    • BootStrap,客户端启动程序,通过简单的方式启动一个客户端。
    • ServerBootStrap,服务器端启动程序,通过简单的方式启动一个服务端。
    • FastThreadLocalThread,netty执行任务的线程。
      先看一下线程模型相关核心接口和类的关系图如下:

    在这里插入图片描述
    顶级接口定义的方法能力包括:

    • Executor顶级接口定义了execute处理
    • Iterable定义了轮询遍历
    • ExecutorService定义了多任务场景下,任务提交,调用
    • ScheduleExecutorService定义了任务定时、延迟调度
    • EventExecutorGroup,定义了一组EventExecutor场景下,通过next()选择一个EventExecutor
    • EventExecutor,通过parent()选择父group,
    • EventLoopGroup,定义了注册Channel,ChannelPromise
    • EventLoop,处理注册到EventLoop的Channel对应的IO操作,子类实现reactor线程模型

    4.1 多线程模型

    EventLoopGroup的子类MultithreadEventLoopGroup抽象类定义了多线程的线程模型,MultithreadEventLoopGroup有5个实现类分别是:

    • NioEventLoopGroup,处理基于NIO IO模型的通道
    • DefaultEventLoopGroup,用于本地传输
    • EpollEventLoopGroup,使用epoll IO模型,运行在linux上
    • KQueueEventLoopGroup,使用kqueue IO模型,运行在mac上
    • LocalEventLoopGroup,已过期,被DefaultEventLoopGroup代替

    新版本中LocalEventLoopGroup被DefaultEventLoopGroup,也就是共4中实现,4种实现区别是处理事件的EventExecutor执行器不同,但是多线程模型是相同的,下面以NioEventLoopGroup为例来学习一下多线程模型

    NioEventLoopGroup
    NioEventLoopGroup实现了Execute,ExecuteService,ScheduledExecuteService顶级接口,实现了execute,submit,schuedule等处理Runable、Callable任务的能力,通过代码发现,NioEventLoopGroup内部管理多个EventExecutor,NioEventLoopGroup处理任务是委托给内部的一个EventExecutor来处理的,直接上代码,next()方法返回了NioEventLoopGroup管理的一个EventExecutor对象

    public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
        @Override
        public Future<?> submit(Runnable task) {
            return next().submit(task);
        }
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return next().submit(task);
        }
    
        @Override
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return next().schedule(command, delay, unit);
        }
        ... ...
    }
    

    MultithreadEventExecutorGroup中定义了一个名叫children的数组,数组中存放EventExecutor对象,在构造MultithreadEventExecutorGroup时,根据线程数量,初始化对应数量的EventExecutor存在children中,这个操作在MultithreadEventExecutorGroup的构造器中完成

    children = new EventExecutor[nThreads];
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }
                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    

    同时NioEventLoopGroup的newChild()方法中实现了EventExecutor的创建,可以看到newChild方法返回了EventLoop,EventLoop继承于EventExecutor,四种实现的区别在于返回的EventLoop不同,也就是事件处理器不同,对于NioEventLoop的创建我们在NioEventLoop时再深入学习。

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
    

    next()方法中实现了选择EventExecutor的算法。
    NioEventLoopGroup实现了EventExecutorGroup,EventExecutorGroup定义了next()方法,next()方法返回一个EventExecutorGroup管理的EventExecutor,然后EventExecutor用来处理事件,在NioEventLoopGroup的父类MultithreadEventExecutorGroup中完成了next()实现

    @Override
    public EventExecutor next() {
        return chooser.next();
    }
    

    chooser是EventExecutorChooser的对象,在MultithreadEventExecutorGroup的构造方法中初始化,使用DefaultEventExecutorChooserFactory工厂模式进行创建并初始化

    // 初始化chooser
    chooser = chooserFactory.newChooser(children);
    // 初始化chooserFactory
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    

    DefaultEventExecutoryChooserFactory在初始化chooser是,根据EventExecutor的数量进行不同,使用不同的EventExecutorChooser算法选择具体的EventExecutor,如果EventExecutor的数量是2的幂次方,使用位运算,否则使用算数取模的运算方式,最终实现轮询选择多个EventExecutor的算法

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        //executor的数量 是2的幂
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }
    

    PowerOfTwoEventExecutorChooser选择方式,数量是2的幂,按位与,最终按照executors的顺序轮询返回一个EventExecutor

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
    

    GenericEventExecutorChooser选择方式,算数取模,最终按照executors的顺序轮询返回一个EventExecutor

    @Override
    public EventExecutor next() {
        return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    }
    

    线程相关

    1,线程数量

    NioEventLoopGroup在初始化时,调用父类了MultithreadEventLoopGroup的构造方法
    线程数量在MultithreadEventLoopGroup构造方法中指定,如果是0就使用默认常量DEFAULT_EVENT_LOOP_THREADS

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    并在static代码块中初始化了DEFAULT_EVENT_LOOP_THRADS的数量,如果系统启动参数指定了,就是用系统启动参数指定的线程数量,否则使用cpu核心数*2作为线程数量

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    

    2,线程对象

    NioEventLoopGroup初始化在确定线程数量后,又调用了父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup的构造方法,初始化执行任务的executor为ThreadPerTaskExecutor,在此处指定了初始化执行任务的executor为ThreadPerTaskExecutor创建线程使用的ThreadFactory

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
        ...
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        ...
    }
    

    ThreadPerTaskExecutor执行任务的方式是,为每个任务创建新的线程来执行任务,如下:

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start(); // 创建新线程来执行Runnable任务
        }
    }
    

    创建ThreadFactory过程,可以看到创建并返回了DefaultThreadFactory

    protected ThreadFactory newDefaultThreadFactory() {
       return new DefaultThreadFactory(getClass());
    }
    

    DefaultThreadFactory的创建过程, 初始化了线程的标识,是否后台线程,线程优先级

    public DefaultThreadFactory(Class<?> poolType) {
        this(poolType, false, Thread.NORM_PRIORITY);
    }
    
    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
       this(toPoolName(poolType), daemon, priority);
    }
    
    public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
        this(poolName, daemon, priority, null);
    }
     
    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
        ObjectUtil.checkNotNull(poolName, "poolName");
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException(
                    "priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
        }
        prefix = poolName + '-' + poolId.incrementAndGet() + '-';
        this.daemon = daemon;
        this.priority = priority;
        this.threadGroup = threadGroup;
    }
    

    下面看一下,DefaultThreadFactory创建线程的过程,可以看到创建了FastThreadLocalRunnable的线程

    @Override
    public Thread newThread(Runnable r) {
        // 创建线程,FastThreadLocalRunnable对普通Runnable任务进行了包装
        Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
        try {
            if (t.isDaemon() != daemon) {
                t.setDaemon(daemon);
            }
            if (t.getPriority() != priority) {
                t.setPriority(priority);
            }
        } catch (Exception ignored) {
            // Doesn't matter even if failed to set.
        }
        return t;
    }
    
    protected Thread newThread(Runnable r, String name) {
        return new FastThreadLocalThread(threadGroup, r, name);
    }
    

    至此我们了解了NioEventLoopGroup创建时的线程数量和线程类型。

    4.2 reactor线程模型

    在了解EventLoopGroup了解到EventLoopGroup有多个线程,每个线程对应EventExecutor,也就是EventLoop,实现了select轮询的流程,EventLoop同样有多个实现,也就是SingleThreadEventLoop抽象类的多个实现类,包括:

    1. DefaultEventLoop
    2. EpollEventLoop
    3. KQueueEventLoop
    4. NioEventLoop
    5. ThreadPerChannelEventLoop

    同样,我们先以NioEventLoop为例学习,NioEventLoopGroup创建执行器时,创建的是NioEventLoop对象,也就是newChild方法的逻辑

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
    

    SelectorProvider

    SelectorProvider是一个Java SPI顶级抽象类,定义了Selector和可选择Channel,SelectProvider的具体实现都继承自SelectorProvider,同过SelectorProvider调用jdk底层初始化具体的Selector对象。

    SelectStrategy
    select策略,不指定时使用使用默认的的DefaultSelectStrategy,默认DefaultSelectStrategy实现如下,

    final class DefaultSelectStrategy implements SelectStrategy {
        static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
    
        private DefaultSelectStrategy() { }
    
        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    }
    

    NioEventLoopGroup在执行任务是,先调用next()找到EventExecotur也就是NioEventLoop,通过NioEventLoop的execute方法执行任务

        @Override
        public void execute(Runnable command) {
            next().execute(command);
        }
    

    调用NioEventLoop的execute方法后,先启动线程,启动线程时会调用NioEventLoop的run方法

    private void doStartThread() {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    ......
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        ... ...
                    }
                }
            });
        }
    

    NioEventLoop中的run方法实现reactor轮询,核心源码如下:

    @Override
        protected void run() {
            int selectCnt = 0;
            for (;;) {
                try {
                    int strategy;
                    try {
    						... ... select策略
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) {
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                        }
                    } catch (IOException e) {
                        ... ...
                    }
                        try {
                            if (strategy > 0) {
                                processSelectedKeys();
                            }
                        } finally {
                            // Ensure we always run tasks.
                            ranTasks = runAllTasks();
                        }
                } catch (CancelledKeyException e) {
                    ... ...
                }
            }
        }
    

    这段代码实现了NioEventLoop的核心,即select事件、处理事件、执行任务的流程,即:
    在这里插入图片描述
    下面深入看一下源码实现
    select
    select之前是select策略,前面看到默认使用DefaultSelectStrategy,首先判断队列中有没有任务task,如果没有任务task,则执行正常select逻辑;如果有任务,则执行selectNow()方法对应不阻塞立即返回select操作,返回准备进行IO操作的channel数量,这种情况下,不进入正常的阻塞select逻辑。

    select之前,先拿到下次调度任务的调度时间,设置selector的wakeup时间(selector的wakeup()方法可以使正在阻塞的select操作立即返回),并且队列没有任务时,进入select(),select如下:

        private int select(long deadlineNanos) throws IOException {
            if (deadlineNanos == NONE) {
                return selector.select();
            }
            // Timeout will only be 0 if deadline is within 5 microsecs
            long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
            return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
        }
    

    如果没有需要调度的任务,直接调用selector.select(),如果有任务,计算select timeout时间,如果调度剩余时间小于5微妙则timeout时间为0,调度剩余时间大于等于5微秒则向上转为毫秒取整,如6微秒,则超时时间为1毫秒,100微秒也为1毫秒。

    计算好timeout时间后,如果timeout时间小于等于0,执行selectNow()非阻塞select,否则执行select(timeout)阻塞select,阻塞timeout毫秒

    processSelectedKeys
    在select()或者selectNow()之后,NioEventLoop通过processSelectedKeys处理select的结果,首先根据设置的IO所占时间比即ioRatio(默认50%)来确定执行策略。

    • 如果ioRatio 100%,直接按照顺序执行processSelectedKeys和runAllTasks,
    • 否则如果select或selectNow的结果大于零时先执行processSelectedKeys,完成后根据ioRatio计算runAllTasks的时间,指定runAllTask的超时时间,执行runAllTasks
    • 如果select为0,直接执行runAllTasks,指定超时时间为0
      代码如下
    if (ioRatio == 100) {
        try {
            if (strategy > 0) {
                processSelectedKeys();
            }
        } finally {
            // 保证每次执行任务.
            ranTasks = runAllTasks();
        }
    } else if (strategy > 0) {
        final long ioStartTime = System.nanoTime();
        try {
            processSelectedKeys();
        } finally {
            // Ensure we always run tasks.
            final long ioTime = System.nanoTime() - ioStartTime;
            ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    } else {
        ranTasks = runAllTasks(0); // This will run the minimum number of tasks
    }
    

    processSelectedKeys的逻辑如下,当selectedKeys不为空时,执行netty对selectedKeySet做过优化的处理逻辑processSelectedKeysOptimized(),否则执行processSelectedKeysPlain()。

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    selectedKeys 优化
    netty对selectedKeySet的优化,在NioEventLoop的构造方法中调用的openSelector()方法初始化Selector时开始,openSelector()中的关键代码如下:

    private SelectorTuple openSelector() {
            ......
            // 1,调用jdk底层初始化selector,且不需要优化时,直接返回构建的SelectorTuple
            ......
    		// 2, 加载SelectorImpl
    		......
    		// 3,加载异常处理
    		......
    		// 4,初始化selectedKeySet 
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            // selectedKeySet为SelectedSelectionKeySet类型的对象
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                    	// 5,反射的方式,设置给unwrappedSelector的selectedKeysField和publicSelectedKeysField字段selectedKeySet对象
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    					......
    					// java9适配,设置反射属性可访问
                    	......
    					// 通过反射将selectedKeySet设置到unwrappedSelector,代替原来的selectedKeysField,publicSelectedKeysField
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
           	...... //异常处理,返回
            selectedKeys = selectedKeySet;
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
    

    看来优化在于SelectedSelectionKeySet,再跟进SelectedSelectionKeySet,发现SelectedSelectionKeySet继承AbstractSet,替换掉了原来的HashSet,使用数组存储key,和原来的HashSet相比,极端情况下的On复杂度变为O1,性能提升。

    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
        SelectionKey[] keys;
        int size;
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
        ... ...
    }
    

    继续processSelectedKeys,所以,默认配置下,NioEventLoop时轮询时selectedKeys 不为空,所以代码再跟进processSelectedKeysOptimized()中,看到遍历数组处理selectedKey,通过SelectionKey 拿到附件,也就是最终要处理k的对象,是AbstractNioChannel或者NioTask对象,最终在processSelectedKey()方法中,附件a来处理IO事件。

        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    			// 需要重新select时,重新select
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    再看processSelectedKey中的逻辑,首先校验k有效性,如果k无效关闭channel后返回,否则继续处理如下逻辑:

    1. 获取key的就绪操作代码
    2. 首先处理connect事件,如果没有处理connect事件就进行读写操作,JDK会抛出异常
    3. 处理IO写事件
    4. 处理IO读、accept事件
        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            // .... 省略k异常逻辑
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    在processSelectedKeysOptimized中,处理完成后,判断是否needsToSelectAgain,来重新select,什么情况下需要重新select?通过查询发吗发现,在Channel取消从EventLoop注册时,EventLoop取消Channel的key时,判断取消的key数量大于256时,将needsToSelectAgain置为true,触发重新select。如下:

        void cancel(SelectionKey key) {
            key.cancel();
            cancelledKeys ++;
            if (cancelledKeys >= CLEANUP_INTERVAL) {//常量256
                cancelledKeys = 0;
                needsToSelectAgain = true;
            }
        }
    

    runAllTasks
    在processSelectedKeys之后,执行runAllTask执行队列中的任务和需要调度执行的任务,完成后再通过afterRunningAllTasks来执行tailTask队列中的任务。

        protected boolean runAllTasks() {
            assert inEventLoop();
            boolean fetchedAll;
            boolean ranAtLeastOne = false;
    
            do {
            	// 从scheduledTaskQueue中取任务,放到taskQueue中
                fetchedAll = fetchFromScheduledTaskQueue();
                // 执行taskQueue中的任务
                if (runAllTasksFrom(taskQueue)) {
                    ranAtLeastOne = true;
                }
                // 循环从scheduledTaskQueue中取任务
            } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
            if (ranAtLeastOne) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
            }
            // 执行完成后,执行tailTask队列中的任务
            afterRunningAllTasks();
            return ranAtLeastOne;
        
    

    上面了解了,NioEventLoop的核心轮询流程

    4.3 Future 和 Promise

    Future
    JDK Future代表一个异步处理的结果,Future顶级接口提供了一些方法,包括异步是否处理完成的检测方法,等待异步处理完成方法,取回异步处理结果的的方法。

    • 提供个get()方法得到异步处理结果,结果只有在异步处理完成时通过get()方法取回,调用get()方法时,在处理完成之前get()方法阻塞。
    • 提供了cancel()取消方法,来指定异步处理正常执行完成还是取消,但是当一个异步处理已经完成时,就不能取消,
    • 同时JMM的happens-before规则,规定了一下顺序,异步处理的结果happens-before另一个线程执行get()后的所有操作。

    JDK FutureTask,Future的一个实现,实现了了Future定义的各个方法。同时实现了Runable,可以提交到线程池执行
    Netty Future, 继承自JDK Future,在JDK Future的基础上进行了扩展,增加了非阻塞getNow()方法,await等待异步执行完成,可超时的await()方法,同步等待sync()方法,添加Listener删除listner的addListener()方法和removeListener()方法

    ChannelFuture

    ChannelFuture,继承了Netty Future,定义了异步Channel IO操作的结果
    Netty中的所有IO操作都是异步的,这意味着,Netty中,IO请求会立即返回,这样在请求返回时就不能保证IO操作已经完成,在这种情况下,Netty提供了ChannelFuture,在IO请求时返回ChannelFuture的实例,包含了IO操作的状态或者IO操作的结果信息。
    一个ChannelFuture对象,是完成或者未完成状态,当一个IO操作开始时,创建ChannelFuture对象,初始化为未完成状态,当IO操作完成时(完成的结果可能成功、失败、或者取消,都称为完成状态),ChannelFuture变为完成状态。

    • 提供了多个方法来检测IO操作是否完成,等待完成,获取IO操作结果
    • 可以添加ChannelFutureListener来在IO操作完成时,得到通知。Listener不阻塞,可以达到最大的性能和资源利用率,基于事件的变成模式。
    • await()是一个阻塞操作,调用await时,调用线程阻塞,直到IO操作完成,可以很简单的实现串行逻辑。但是调用线程在IO操作完成之前其实没有必要阻塞,并且线程间的通知开销比较高,而且在特定情况下还会出现死锁。
    • ChannelFuture中包含当前IO操作的Channel对象引用,

    ChannelFuture死锁问题
    在ChannelHandler中不要调用await方法
    EventHandler中的事件处理方法,通常被IO线程调用,如果await被EventHandler中的方法调用,根据调用关系的传递性,那么await也是被IO线程调用,那这样,IO线程调用永远也不会结束了,导致死锁。如:

    // 死锁
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        future = ctx.channel.close();
        future.awaitUninterruptibly()
        // ... ... 其他逻辑
    }
    

    通常使用listener来完成这种操作,避免出现死锁,如下:

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        future = ctx.channel.close();
        future.addListener(new ChannelFutureListener(){
           public void operationComplete(ChannelFuture future) {
               // ... ... 其他逻辑
           }
        });
    }
    

    Promise
    继承Netty Future,指定Future是可写的,关键定义方法如下:

    方法描述
    Promise setSuccess(V result)标记Future为成功,并通知所有listener
    boolean trySuccess(V result)标记Future为成功,并通知所有listener,因为Future或许已经被标记,所以可能返回false
    Promise setFailure(Throwable e)标记Future为失败,并通知所有listener
    boolean tryFailure(Throwable e)标记Future为失败,通知所有listener,因为Future或许已经被标记,可能返回false
    boolean setUncancellable()标记此Future不能被取消

    ChannelPromise
    指定ChannelFuture是可写的,并且ChannelPromise中包含Channel对象。

    netty借用Future Promise实现了异步操作等待、通过listener异步回调、通知

    4.4 Pipleline

    ChannelPipiline,一个ChannelHandler的集合,ChannelHandler是处理或拦截Channel读入数据事件、写出数据操作,ChannelPipiline实现了先进的拦截过滤模式,用户可以控制事件的处理方式,并自定义ChannelHandler之间的协作。
    事件流转
    通常情况下,I/O事件在ChannelPipiline中流转并被各个ChannelHandler处理的过程如下图:
    在这里插入图片描述
    构建一个ChannelPipeline
    可以通过如下的方式来构建一个pipeline

    ChannelPipeline} p = ...
    p.addLast("1", new InboundHandlerA());
    p.addLast("2", new InboundHandlerB());
    p.addLast("3", new OutboundHandlerA());
    p.addLast("4", new OutboundHandlerB());
    p.addLast("5", new InboundOutboundHandlerX());
    

    构建后,InboundHandler的顺序为125,OutboundHandler的顺序为543,OutboundHandler addLast添加的Hander最后一个限制性,而InboundHandler顺序相反。
    通常要一个或多个ChannelHanlder收到读、写、关闭I/O事件,例如一个传统的Server在每个pipiline有以下handler:
    - 协议解码-----Decoder,将传输的二进制数据转为Java对象
    - 协议编码------Encoder,将Java对象转为二进制数据进行传输
    - 业务逻辑Handler-----执行业务逻辑
    Pipeline通过fireXXX方法,将时间传递到下一个handher
    线程安全
    ChannelHandler可以在任何时间添加和删除,因为ChannelPipiline是线程安全的,例如:当敏感信息传输交换时,可以添加一个加密作用的Handler,在传输交换完成后再删除掉加密Handler
    Pipeline的创建
    ServerSocketChannel,SocketChannel在创建时就创建了流水线,如NioServerSocketChannel在通过反射创建是,pipeline的创建方式为:

        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    	// 调用到abstact channel
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId();
            unsafe = newUnsafe();
            pipeline = newChannelPipeline();
        }
    
    	// 创建pipline
        protected DefaultChannelPipeline newChannelPipeline() {
            return new DefaultChannelPipeline(this);
        }
    

    这样,在ServerSocketChannel或者SocketChannel创建时,就完成Pipeline的创建,返回DefaultChannelPipeline类型的对象
    DefaultChannelPipeline的结构
    DefaultChannelPipeline中包括一个TailContext和HeadContext,一个双向链表head.next = tail;tail.prev = head;
    在通过addLast()增加Hander时,将Handler包装为AbstractChannelHandlerContext,加入双向链表:

    	// 包装为AbstractChannelHandlerContext
    	newCtx = newContext(group, filterName(name, handler), handler);
    	// 调用addLast0方法
    	addLast0(newCtx);
    	// addLast0() 将handler添加到双向链表中
        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    

    对于读事件,从head开始调用,对于写事件从tail开始调用执行,通过这样的方式完成事件流转
    ChannelPipeline调用
    首先,在processSelectedKey时,以读IO事件为例,读数据byteBuf 后,通过pipeline.fireChannelReadComplete(),传递读完成事件到pipeline的各个InboundHandler,如下:(省略了非不不要代码)

    		@Override
            public final void read() {
                // .... 获取配置,需要中断时中断读
                // 从channel或者ChannelPipeline事件处理流水线
                final ChannelPipeline pipeline = pipeline();
                // 获取数据缓冲区分配器
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                // 轮询读数据,直到读完,即lastBytesRead为-1
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    					// 当前读数据次数+1
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        // 传递读到的数据到 pipeline 中的各个InboundHandler
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    				// 读完成
                    allocHandle.readComplete();
                    // 传递读完成事件到pipeline的各个InboundHandler
                    pipeline.fireChannelReadComplete();
    				
    				// 读完成 关闭流水线
                    if (close) {
                        closeOnRead(pipeline);
                    }
                // 异常处理
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // 修改selectedkey状态
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
    

    pipeline处理事件
    对pipeline的结构了解,对于被动的read,connect事件从head开始执行,对于主动发起的bind,connect,write事件从tail开始执行。

    • 从tail开始执行的主动事件,从pipeline开始调用(如:pipeline.write),pipeline最终调用到unsafe来完成事件
    • 被动事件,如read,finish connect事件,由eventloop来检测,交给unsafe处理,再fire事件到pileline,pipeline中的多个Hander再按照顺序进行业务处理和数据处理

    读数据上面已经拆解过,下面以write数据为例,看一下netty如何通过pipeline来写数据,从channel.writeAndFlush()定位到如下代码:

        @Override
        public ChannelFuture write(Object msg) {
        	// 直接调用pipeline的write
            return pipeline.write(msg);
        }
    

    pipeline从tail开始执行

        @Override
        public final ChannelFuture write(Object msg) {
        	// 从tail开始执行
            return tail.write(msg);
        }
    

    调用到AbstractChannelHandlerContext的write方法

    	private void write(Object msg, boolean flush, ChannelPromise promise) {
    		// 找到第一个outBoundHandler
    	    AbstractChannelHandlerContext next = findContextOutbound();
    	    final Object m = pipeline.touch(msg, next);
    	    EventExecutor executor = next.executor();
    	    if (executor.inEventLoop()) {
    	    // 同步执行
    	        if (flush) {
    	            next.invokeWriteAndFlush(m, promise);
    	        } else {
    	            next.invokeWrite(m, promise);
    	        }
    	    } else {
    	    // 当前线程不是EventExecutor线程,异步执行
    	        AbstractWriteTask task;
    	        if (flush) {
    	            task = WriteAndFlushTask.newInstance(next, m, promise);
    	        }  else {
    	            task = WriteTask.newInstance(next, m, promise);
    	        }
    	        safeExecute(executor, task, promise, m);
    	    }
    	}
    

    invokeWrite()方法中调用了invokeWrite0(),再看看invokeWrite0()方法,一般的hander中,写数据又会调用ctx.write()方法,完成事件在pipeline()中的传播

        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
            // 通过调用ctx.write(),又会找到下一个handler,以此完成事件在pipeline中的流转
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    

    最终会调用到HeadContext的write()方法,再调用unsafe.write()完成写数据操作。

            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                unsafe.write(msg, promise);
            }
    

    PS:channel.writeAndFlush()和channelHandlerContext.write()的实现是不一样的
    channel.writeAndFlush(),从头调用pipeline(),保证所有的OutBoundHandler都能处理数据
    channelHandlerContext.write()是查找下一个channelHandlerContext来处理,不是所有OutBoundHandler都可以处理数据,用来实现pipeline流水线中的写事件的流转。

    4.5 ServerBootstrap

    通过一段简单服务器启动代码,来看服务器是怎么启动起来的。如下

        public void run () {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        channel.pipeline().addLast(new SimpleServerHandler());
                    }
                }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture f = serverBootstrap.bind(this.port).sync();
    
                f.channel().closeFuture().sync();
            } catch (Exception e) {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
            }
        }
    
    • EventLoopGroup ,多线程,每个线程对应EventLoop,不停检测bind accept read write connect disconnect IO事件,处理IO事件,执行任务。
    • serverBootstrap.group(bossGroup, workerGroup),指定bossGroup和workGroup,boss和worker功能不同,boss负责accept新的连接,并丢给worker线程去处理,work处理连接的IO读写事件
    • channel(NioServerSocketChannel.class),指定socket服务端为NioServerSocketChannel
    • .childHandler(),指定流水线中的数据业务处理Handler
    • option,serverBootstrap,channel的配置参数
    • bind(xxx),监听xxx端口
    • sync(),等待bind完成
    • f.channel().closeFuture().sync(),bind完成后等待channel关闭
    • shutdownGracefully(), EventLoopGroup停止轮询检测事件
      netty怎么通过这段代码启动并且accept客户端连接的?从上面代码看,bind()之前的代码是创建组件,指定配置,不在赘述。直接从bind开始入手,根据调用关系,直接找到了doBind()方法,如下:(省略掉了非关键代码)
        private ChannelFuture doBind(final SocketAddress localAddress) {
        	// 创建ServerSocketChannel,并注册到bossGroup
            final ChannelFuture regFuture = initAndRegister();
            // ...注册异常处理,省略...
            if (regFuture.isDone()) {
                // 创建ServerSocketChannel并注册成功.
                ChannelPromise promise = channel.newPromise();
                // 继续bind逻辑
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {
                // ServerSocketChannel没有注册成功.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                // 注册监听事件
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    						// 继续bind逻辑
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    

    doBind为主流程,继续。在initAndRegister()中创建了ServerSocketChannel,初始化channel,并注册到bossGroup处理线程组

        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
            	// 创建channel对象,创建流水线
                channel = channelFactory.newChannel();
                // 设置配置参数,创建事件处理流水线,配置流水线等。
                init(channel);
            } catch (Throwable t) {
                // ...省略异常处理...
            }
            // 注册到boss group
            ChannelFuture regFuture = config().group().register(channel);
            // ...省略异常处理...
            return regFuture;
        }
    

    channelFactory.newChannel(),通过反射的方式创建ServerSocketChannel对象,如下:(省略了非关键代码)

    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
        private final Constructor<? extends T> constructor;
    
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            this.constructor = clazz.getConstructor();
        }
    
        @Override
        public T newChannel() {
            return constructor.newInstance();
        }
    }
    

    init(channel),初始化channel,如下:

        @Override
        void init(Channel channel) {
        	// 设置配置参数到channel
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, newAttributesArray());
    		// 创建流水线
            ChannelPipeline p = channel.pipeline();
    		
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            // 包装socketChannel创建的参数,也就是accept创建新连接的参数
            final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    		// 流水线添加Handher,
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    // 添加config handler
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    				// 异步添加 ServerBootstrapAcceptor handler,任务添加到channel的eventLoop任务队列中,select之前执行
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    继续绑定逻辑dobind0(),

        private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // 在触发channelRegistered()之前调用方法。 使用户处理程序有机会在其channelRegistered()实现中设置管道。
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                    	// 执行最终的bind操作,调用jdk bind绑定端口,并产生channel active事件
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    

    这样Nio Socket Server就启动并绑定了指定端口,但是如何accept处理新的连接,并交给worker线程组处理呢?从processSelectedKey入手,根据调用关系,定位到AbstractNioMessageChannel中的NioMessageUnsafe类的read方法,如下:(删除了非关键代码)

    @Override
    public void read() {
    	// 读到需要accept的channel
    	do {
    	    int localRead = doReadMessages(readBuf);
    	    if (localRead == 0) {
    	        break;
    	    }
    	    if (localRead < 0) {
    	        closed = true;
    	        break;
    	    }
    	
    	    allocHandle.incMessagesRead(localRead);
    	} while (continueReading(allocHandle));
    	
    	int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
        	// 流水线循环发出读到channel事件
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        // 流水线发出杜万成事件
        pipeline.fireChannelReadComplete();
    }
    

    流水线发出读到新channel的事件后,就到了流水线中ServerBootstrap启动时注册的ServerBootstrapAcceptor Handler来处理了,关键代码下:

            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
            	// 获取拿到 channel
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
    			// channel 设置配置
                setChannelOptions(child, childOptions, logger);
                setAttributes(child, childAttrs);
    
                try {
                // 注册到worker group 轮询处理IO事件
                    childGroup.register(child).addListener(new ChannelFutureListener() {
                    	//注册监听事件
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
                } catch (Throwable t) {
               		// 异常时关闭 channel
                    forceClose(child, t);
                }
            }
    

    至此,ServerBootstarp启动,并可以接受客户端的连接,处理连接,交给worker 线程组处理连接的IO事件。

    4.6 Bootstrap

    我们再解剖客户端的启动过程,下面是一个简单的netty客户端启动代码:

        public static void main(String[] args) throws Exception {
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap(); // (1)
                b.group(workerGroup); // (2)
                b.channel(NioSocketChannel.class); // (3)
                b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
                b.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new SimpleClientHandler());
                    }
                });
                
                // Start the client.
                ChannelFuture f = b.connect(host, port).sync(); // (5)
    
                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                workerGroup.shutdownGracefully();
            }
        }
    

    关于组件的初始化、配置,和Socket socket类似,这里不在描述,直接从connect开始读代码,追踪到doResolveAndConnect()方法,如下:

        private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        	// 创建NioSocketChannel,创建相关组件,并初始化,注册到worker group,和Server启动类似
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
       		// 继续connect
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    

    继续往下追踪,追踪到下面的代码,调用channel来进行connect

    	private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            final Channel channel = connectPromise.channel();
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    

    最终调用AbstrackNioUnsafe完成connect:

          @Override
            public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
           			... ...
           			// 调用jdk底层 connect
                    if (doConnect(remoteAddress, localAddress)) {
                        fulfillConnectPromise(promise, wasActive);
                    } else {
                    //超时处理,关闭
                            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                                @Override
                                public void run() {
                                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                    if (connectPromise != null && !connectPromise.isDone()
                                            && connectPromise.tryFailure(new ConnectTimeoutException(
                                                    "connection timed out: " + remoteAddress))) {
                                        close(voidPromise());
                                    }
                                }
                            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                        }
                    }
                } catch (Throwable t) {
                    ......异常处理
                }
            }
    

    4.7 zero-copy

    4.7.1 操作系统zero-copy

    以读取磁盘文件,通过socket发送远程为例,如果不是零拷贝,通过传统的read文件,write文件的方式,逻辑需要四步copy:

    1. 将磁盘文件读到内核空间.
    2. 文件数据从内核空间copy到用户空间.
    3. 从用户空间copy到socket缓冲区。
    4. 从socket缓冲区copy到网卡缓冲区。
      四个步骤的拷贝,如下图:
      在这里插入图片描述
      zero-copy零拷贝技术,是在linux2.1引入操作系统sendFile()系统调用的支持下,分为以下三步拷贝:
    • 1 文件读入到内核缓冲区,不再将文件复制到用户态,用户态程序不参与文件传输
    • 2 数据复制到socket缓冲区。
    • 3 数据在复制到网卡缓冲区。
      这样就少了一次数据copy,提高性能。如下图:
      在这里插入图片描述

    上图对Read Write过程做了很好的优化,但是看起来第二步有些多余,linux2.4优化了sendFile()系统调用,进一步优化:

    • 1 文件数据copy到kernel buffer
    • 2 kernel buffer中的数据的起始位置和偏移量写到socket buffer
    • 3 根据socket buffer中的起始位置和偏移量,直接将kernel buffer中的数据复制到网卡缓冲区
      如下图:
      在这里插入图片描述
      过上述过程,数据只经过了2次copy就从磁盘传送出去了。在操作系统的支持下,Java和netty也实现了各自的Zero-copy。

    4.7.2 Java zero-copy

    FileChannel提供了transferTo()方法调用,将文件通过零copy的方式传输到另一个Channel,FileChannel或者SocketChannel,如传输到socketChannel

    //使用sendfile:读取磁盘文件,并网络发送
    FileChannel sourceChannel = new RandomAccessFile(source, "rw").getChannel();
    SocketChannel socketChannel = SocketChannel.open(.....);
    sourceChannel.transferTo(0, sourceChannel.size(), socketChannel);
    

    基于零copy实现文件copy

        public void copy(String source, String target) throws Exception {
            FileChannel sourceFile = new FileInputStream(source).getChannel();
            FileChannel targetFile = new FileInputStream(target).getChannel();
            sourceFile.transferTo(0, sourceFile.size(), targetFile);
        }
    

    4.7.3 netty zero-copy

    netty的零拷贝,既有应用层面的zero-copy,也有操作系统zero-copy的应用,前者是广义的zero-copy,后者是狭义的zero-copy,都避免了数据不必要的copy,netty的zero-copy体现在:

    • Netty提供了CompositeByteBuf类,它可以将多个ByteBuf合并为一个逻辑上的ByteBuf,避免了各个ByteBuf之间的拷贝。
    • 通过wrap操作,我们可以将byte[]数组、ByteBuf、 ByteBuffer 等包装成一个 Netty ByteBuf对象,进而避免了拷贝操作。
    • ByteBuf支持slice 操作,因此可以将ByteBuf分解为多个共享同一个存储区域的ByteBuf,避免了内存的拷贝。
    • 通过FileRegion包装的FileChannel.tranferTo实现文件传输,可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题。
      可以看到FileRegion是netty对linux操作系统zero-copy系统调用sendFile()的应用,其余是netty在应用层面对数据读写的优化,避免不必要的复制。
      CompositeByteBuf
      CompositeByteBuf是一个虚拟的buf缓冲区,将多个独立的缓冲区合成一个虚拟的缓冲区,一般使用ByteBufAllocator的compositeBuffer()的方式或者Unpooled.wrappedBuffer(ByteBuf…)的方式创建,一般不适用构造函数创建。

    将多个ByteBuffer合并,一般做法是将多个ByteBuffer复制到一个新的大缓冲区中,而netty是使用CompositeByteBuf将多个ByteBuffer进行逻辑上的合并,并不真实的复制合并到一个大ByteBuf中,而是维护一个ByteBuf数组,数组中存储合并的多个ByteBuf,对外作为一个整体的ByteBuf,从而避免了数据copy,提高性能。
    Unpooled wrap
    对于将一个byte[]数组转为ByteBuf的需求,一般做法是是将byte[] copy到新的ByteBuf中,需要copy 数据,netty的做法是使用ByteBuf对byte[]进行包装,不copy数据,直接返回包装byte[]的Bytebuf。

    //传统方式,byte[]转为ByteBuf
    byte[] bytes = ......;
    ByteBuf byteBuf = Unpooled.buffer();
    byteBuf.write(bytes);
    //netty,byte[]转为ByteBuf
    byte[] bytes = ......;
    ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
    

    ByteBuf slice
    netty使用自己的NIO ByteBuf,netty ByteBuf支持slice操作,即返回ByteBuf的某一段数据,而不需要copy,如byteBuf.slice(int index, int length)返回byteBuf读数据段从index开始到length长度的数据,返回的数据是直接共享byteBuf数据,而不是copy一份返回,避免了数据copy数据带来的消耗。

        /**
         * 返回此缓冲区的子区域的一部分。 修改返回的缓冲区或父缓冲区的内容会影响彼此的内容,同时它们会维护单独的索引和标记。 此方法不会修改父缓冲区的{@code readerIndex}或{@code writerIndex}
         */
    public abstract ByteBuf slice(int index, int length);
    

    4.8 其他设计

    4.8.1 ReferenceCounted

    引用计数对象,需要显式的回收分配内存。

    实例化新的ReferenceCounted时,引用计数值为1,开始计数。 retain()增加引用计数,而release()减少引用计数。 如果引用计数减少到0,则将显式释放对象,访问已释放对象通常会导致访问异常。

    如果实现ReferenceCounted的对象是容器,其中包含多个其他实现ReferenceCounted的对象,则当容器的引用计数变为0时,包含的对象也将通过release()释放。

    通过引用计数来支持显式的内存回收。

    4.8.2 ByteBuf

    netty的数据缓冲,ByteBuf实现了ReferenceCounted,支持引用计数,可以显示的回收内存,提高GC成效,防止内存泄漏。

    Netty ByteBuf 对比JDK ByteBuffer

    • ByteBuffer长度固定,扩展ByteBuffer长度,会导致重新创建更大的ByteBuffer,并发生数据copy,性能低
    • ByteBuffer只用了一个position指针来标识位置,读写模式切换时需要调用flip()函数和rewind()函数
    • 存储字节的数组是动态的,最大是Integer.MAX_VALUE。这里的动态性存在write操作中,write时得知buffer不够时,会自动扩容。
    • netty ByteBuf的读写索引分离。
    • netty ByteBuf支持引用计数

    HeapByteBuffer
    NIO 在 JDK 1.4 中引入的 ByteBuffer 类允许 JVM 实现通过本地调用来分配内存。这主要是为了避免在每次调用本地 I/O 操作之前(或者之后)将缓冲区的内容复 制到一个中间缓冲区(或者从中间缓冲区把内容复制到缓冲区),省略一次数据copy。
    DirectByteBuffer
    1,在使用Socket传递数据时性能很好,由于数据直接在直接缓冲区中,不需要从用户空间copy数据到直接缓冲区的过程,性能好。
    2,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。不支持业务处理,需要业务处理时,不得不进行一 次复制。

    再看看几个ByteBuf的具体实现:
    UnpooledHeapByteBuf,非池化的基于堆的字节缓冲区
    PooledHeapByteBuf,池化的基于对的字节缓冲区
    UnpooledDirectByteBuf ,非池化的直接字节缓冲区
    PooledDirectByteBuf,池化的直接字节缓冲区

    内存分配可参考jemalloc,后续学习补充。

    ObjectPool,轻量级的内存池,定义了get方法获取池中对象。
    RecyclerObjectPool,ObjectPool的实现,通过Recycler实现存储在ThreadLocal的栈,FILO,完成对象池的基本操作。

    4.8.3 Unpooled

    工具类,通过1,分配内存、2,包装已有的byte[]数组,bytebuffer,3,copy已有的byte[]数组,bytebuffer,String三种方式来创建ByteBuf
    1,直接分配内存
    buffer(int ),分配指定容量的堆 buffer
    directBuffer(int),分配指定容量的直接buffer
    2,创建包装buffer
    通过零拷贝的方式,将byte arrays 和 byte buffers包装为一个逻辑的ByteBuf,而不发生数据copy。
    3,copy
    将一个或多个存在的bytes[],bytebuffer,String深拷贝为一个物理存在的新bytebuf

    4.8.4 FastThreadLocalThread

    EventLoop的执行线程是FastThreadLocalThread,对Thread的扩展,根据字面意思就可以了解,支持FastThreadLocal,FastThreadLocal是不是比传统ThreadLocal更快一些呢?从代码角度分析一下。
    FastThreadLocalThread继承Thread,使用内部的InternalThreadLocalMap存储数据

    public class FastThreadLocalThread extends Thread {
        // This will be set to true if we have a chance to wrap the Runnable.
        private final boolean cleanupFastThreadLocals;
    
        private InternalThreadLocalMap threadLocalMap;
        ... ...
    }
    

    在创建FastThreadLocal时,返回InternalThreadLocalMap生成的index,代码如下

        public FastThreadLocal() {
            index = InternalThreadLocalMap.nextVariableIndex();
        }
    

    下面通过get,set,remove方法来了解FastThreadLocal到底快在哪里,get时:

        public final V get() {
        	// 从InternalThreadLocalMap获取对象存储容器InternalThreadLocalMap 
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 根据初始化时返回的index,获取对象
            Object v = threadLocalMap.indexedVariable(index);
            // 如果对象不为空
            if (v != InternalThreadLocalMap.UNSET) {
            	// 返回对象
                return (V) v;
            }
    		// 如果对象为空,初始化对象
            return initialize(threadLocalMap);
        }
    

    set时:

        public final void set(V value) {
        	// value不为空
            if (value != InternalThreadLocalMap.UNSET) {
            	// 从InternalThreadLocalMap获取对象存储容器InternalThreadLocalMap
                InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
                // 见setKnownNotUnset方法,对象存储到一个set中,以此来支持removeAll操作
                setKnownNotUnset(threadLocalMap, value);
            } else {
            	// value等于UNSET,删除,下次get时,会自动重新调用initialValue()方法初始化值
                remove();
            }
        }
       
        private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        	// 根据index将value存入threadLocalMap
            if (threadLocalMap.setIndexedVariable(index, value)) {
            // 对象存储到一个set中,以此来支持removeAll操作
                addToVariablesToRemove(threadLocalMap, this);
            }
        }
    

    remove时:

        public final void remove(InternalThreadLocalMap threadLocalMap) {
            ...省略空判断
    		// 根据index从removeIndexedVariable中删除
            Object v = threadLocalMap.removeIndexedVariable(index);
            // 从待删除set中删除this,避免removeALL时重复删除
            removeFromVariablesToRemove(threadLocalMap, this);
    
            if (v != InternalThreadLocalMap.UNSET) {
                try {
                	// remove时v对象时回调
                    onRemoval((V) v);
                } catch (Exception e) {
                    PlatformDependent.throwException(e);
                }
            }
        }
    

    由以上可见,FastThreadLocal是由InternalThreadLocalMap来进行对象管理,从InternalThreadLocalMap再深入了解。在FastThreadLocal中获取InternalThreadLocalMap时,通过getIfSet或者get方法来获取,代码如下:

        public static InternalThreadLocalMap get() {
            Thread thread = Thread.currentThread();
            // 当前线程是FastThreadLocalThread
            if (thread instanceof FastThreadLocalThread) {
            	// 初始化fastThreadLocal
                return fastGet((FastThreadLocalThread) thread);
            } else {
            	// 初始化slowThreadLocal
                return slowGet();
            }
        }
    
        private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
            InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
            // 等于null时初始化InternalThreadLocalMap
            if (threadLocalMap == null) {
                thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
            }
            return threadLocalMap;
        }
    
        private static InternalThreadLocalMap slowGet() {
            InternalThreadLocalMap ret = slowThreadLocalMap.get();
            // 等于null时,初始化InternalThreadLocalMap,存入线程普通ThreadLocal,变慢
            if (ret == null) {
                ret = new InternalThreadLocalMap();
                slowThreadLocalMap.set(ret);
            }
            return ret;
        }
        
        public static InternalThreadLocalMap getIfSet() {
            Thread thread = Thread.currentThread();
            // 当前线程是FastThreadLocalThread,返回FastThreadLocalThread的InternalThreadLocalMap属性,有可能是空
            if (thread instanceof FastThreadLocalThread) {
                return ((FastThreadLocalThread) thread).threadLocalMap();
            }
            // 否则返回普通ThreadLocal中存储的InternalThreadLocalMap 
            return slowThreadLocalMap.get();
        }
    

    再看看获取nextVariableIndex,get,set,remove对象时的逻辑

    	// 获取index
        public static int nextVariableIndex() {
        	// 维护支持CAS原子操作AtomicInteger类型的对象nextIndex,来获取连续的index
            int index = nextIndex.getAndIncrement();
            if (index < 0) {
            	// index越界,抛出异常
                nextIndex.decrementAndGet();
                throw new IllegalStateException("too many thread-local indexed variables");
            }
            return index;
        }
    
    	// 根据index,读取Object对象
        public Object indexedVariable(int index) {
            Object[] lookup = indexedVariables;
            // 从数组中返回对象
            return index < lookup.length? lookup[index] : UNSET;
        }
    
    	// 存放对象value到index
        public boolean setIndexedVariable(int index, Object value) {
            Object[] lookup = indexedVariables;
            // 当前index小于lookup的边界
            if (index < lookup.length) {
                Object oldValue = lookup[index];
                lookup[index] = value;
                return oldValue == UNSET;
            } else {
            	// 扩容并存放到lookup数组中
                expandIndexedVariableTableAndSet(index, value);
                return true;
            }
        }
    
    	// 根据index删除,并返回v
        public Object removeIndexedVariable(int index) {
            Object[] lookup = indexedVariables;
            // 小于时,直接返回
            if (index < lookup.length) {
                Object v = lookup[index];
                lookup[index] = UNSET;
                return v;
            } else {
                return UNSET;
            }
        }
    

    可以看到核心原理是将ThreadLocal中原来存储对象的HashMap变为数组来存取,以此来提高性能。
    需要注意的是,FastThreadLocal需要配合FastThreadLocalThread使用,否则会适得其反,变慢。

    (完^_^)

    展开全文
  • netty原理

    千次阅读 2017-09-04 16:20:02
    Netty Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知...

    Netty

    Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

    作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建。

    Netty架构分析

    Netty 采用了比较典型的三层网络架构进行设计,逻辑架构图如下所示:


    第一层:Reactor 通信调度层,它由一系列辅助类完成,包括 Reactor 线程 NioEventLoop 以及其父类、NioSocketChannel/NioServerSocketChannel 以及其父类、ByteBuffer 以及由其衍生出来的各种 Buffer、Unsafe 以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓冲区中,然后触发各种网络事件,例如连接创建、连接激活、读事件、写事件等等,将这些事件触发到 PipeLine 中,由 PipeLine 充当的职责链来进行后续的处理。

    第二层:职责链 PipeLine,它负责事件在职责链中的有序传播,同时负责动态的编排职责链,职责链可以选择监听和处理自己关心的事件,它可以拦截处理和向后/向前传播事件,不同的应用的 Handler 节点的功能也不同,通常情况下,往往会开发编解码 Hanlder 用于消息的编解码,它可以将外部的协议消息转换成内部的 POJO 对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。

    第三层:业务逻辑处理层,可以分为两类:

    1.纯粹的业务逻辑处理,例如订单处理。

    2.应用层协议管理,例如HTTP协议、FTP协议等。

     

    接下来,我从影响通信性能的三个方面(I/O模型、线程调度模型、序列化方式)来谈谈Netty的架构。

    IO模型

    Netty的I/O模型基于非阻塞I/O实现,底层依赖的是JDK NIO框架的Selector。

    Selector提供选择已经就绪的任务的能力。简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

    线程调度模型

    常用的Reactor线程模型有三种,分别如下:

    1.Reactor单线程模型:Reactor单线程模型,指的是所有的I/O操作都在同一个NIO线程上面完成。对于一些小容量应用场景,可以使用单线程模型。


    2.Reactor多线程模型:Rector多线程模型与单线程模型最大的区别就是有一组NIO线程处理I/O操作。主要用于高并发、大业务量场景。


    3.主从Reactor多线程模型:主从Reactor线程模型的特点是服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。利用主从NIO线程模型,可以解决1个服务端监听线程无法有效处理所有客户端连接的性能不足问题。


    序列化方式

    影响序列化性能的关键因素总结如下:

    1.序列化后的码流大小(网络带宽占用)

    2.序列化&反序列化的性能(CPU资源占用)

    3.并发调用的性能表现:稳定性、线性增长、偶现的时延毛刺等

    链路有效性检测

    心跳检测机制分为三个层面:

    1.TCP层面的心跳检测,即TCP的Keep-Alive机制,它的作用域是整个TCP协议栈;

    2.协议层的心跳检测,主要存在于长连接协议中。例如SMPP协议;

    3.应用层的心跳检测,它主要由各业务产品通过约定方式定时给对方发送心跳消息实现。

    心跳检测的目的就是确认当前链路可用,对方活着并且能够正常接收和发送消息。作为高可靠的NIO框架,Netty也提供了基于链路空闲的心跳检测机制:

    1.读空闲,链路持续时间t没有读取到任何消息;

    2.写空闲,链路持续时间t没有发送任何消息;

    3.读写空闲,链路持续时间t没有接收或者发送任何消息。

    零拷贝

    • “零拷贝”是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源。而它通常是指计算机在网络上发送文件时,不需要文件内容拷贝到用户空间(User Space直接在内核空间(Kernel Space)中传输到网络的方式
    • Netty的“零拷贝”主要体现在三个方面
      • Netty的接收和发送ByteBuffer采用DIRECT BUFFERS,使用堆外直接内存进行Socket读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行Socket读写,JVM会将堆内存Buffer拷贝一份到直接内存中,然后才写入Socket中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝
        • 读取直接从堆外直接内存,不像传统的堆内存和直接内存拷贝
        • ByteBufAllocator 通过ioBuffer分配堆外内存
      • Netty提供了组合Buffer对象,可以聚合多个ByteBuffer对象,用户可以像操作一个Buffer那样方便的对组合Buffer进行操作,避免了传统通过内存拷贝的方式将几个小Buffer合并成一个大的Buffer
        • Netty允许我们将多段数据合并为一整段虚拟数据供用户使用,而过程中不需要对数据进行拷贝操作
        • 组合Buffer对象,避免了内存拷贝
        • ChannelBuffer接口:Netty为需要传输的数据制定了统一的ChannelBuffer接口

    ·       使用getByte(int index)方法来实现随机访问

    ·       使用双指针的方式实现顺序访问

    ·       Netty主要实现了HeapChannelBufferByteBufferBackedChannelBufferZero Copy直接相关的CompositeChannelBuffer

        • CompositeChannelBuffer
          • CompositeChannelBuffer类的作用是将多个ChannelBuffer组成一个虚拟的ChannelBuffer来进行操作
          • 为什么说是虚拟的呢,因为CompositeChannelBuffer并没有将多个ChannelBuffer真正的组合起来,而只是保存了他们的引用,这样就避免了数据的拷贝,实现了Zero Copy,内部实现
            • 其中readerIndex既读指针和writerIndex既写指针是从AbstractChannelBuffer继承而来的
            • components是一个ChannelBuffer的数组,他保存了组成这个虚拟Buffer的所有子Buffer
            • indices是一个int类型的数组,它保存的是各个Buffer的索引值
            • lastAccessedComponentId是一个int值,它记录了最后一次访问时的子Buffer ID
          • CompositeChannelBuffer实际上就是将一系列的Buffer通过数组保存起来,然后实现了ChannelBuffer 的接口,使得在上层看来,操作这些Buffer就像是操作一个单独的Buffer一样
      • Netty的文件传输采用了transferTo方法,它可以直接将文件缓冲区的数据发送到目标Channel,避免了传统通过循环write方式导致的内存拷贝问题
        • Linux中的sendfile()以及Java NIO中的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也通过在FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝

    Netty 的 Zero-copy 体现在如下几个个方面:

    l  Netty 提供了 CompositeByteBuf 类, 它可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf, 避免了各个 ByteBuf 之间的拷贝。

    l  通过 wrap 操作, 我们可以将byte[] 数组、ByteBuf、ByteBuffer等包装成一个 Netty ByteBuf 对象, 进而避免了拷贝操作。

    l  ByteBuf 支持 slice 操作,因此可以将 ByteBuf 分解为多个共享同一个存储区域的ByteBuf, 避免了内存的拷贝。

    l  通过 FileRegion 包装的FileChannel.tranferTo 实现文件传输, 可以直接将文件缓冲区的数据发送到目标 Channel, 避免了传统通过循环 write 方式导致的内存拷贝问题。

    展开全文
  • Netty 原理

    2020-05-20 10:47:26
    Netty3(3.x)版本是比较旧的版本。 Netty4(4.x)版本是当前官方推荐的,目前一直在维护中。跟3.x版本相比变化比较大,特别是API。 Netty5(5.x)是被舍弃的版本,官方不推荐使用! Netty5舍弃的官方解释: 1. netty5...

    版本说明

    Netty3(3.x)版本是比较旧的版本。

    Netty4(4.x)版本是当前官方推荐的,目前一直在维护中。跟3.x版本相比变化比较大,特别是API。

    Netty5(5.x)是被舍弃的版本,官方不推荐使用!

    Netty5舍弃的官方解释:

    1. netty5 中使用了 ForkJoinPool,增加了代码的复杂度,但是对性能的改善却不明显

    2. 多个分支的代码同步工作量很大

    3. 作者觉得当下还不到发布一个新版本的时候

    4. 在发布版本之前,还有更多问题需要调查一下,比如是否应该废弃 exceptionCaught,是否暴露EventExecutorChooser等等。

    参考:https://github.com/netty/netty/issues/4466

     

    1. Netty基本概念

    Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

    作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,一些业界著名的开源组件也基于Netty的NIO框架构建。如:Dubbo、 RocketMQ、Hadoop的Avro、Spark等。

    Netty需要学习的内容: 编解码器、TCP粘包/拆包及Netty如何解决、ByteBuf、Channel和Unsafe、ChannelPipeline和ChannelHandler、EventLoop和EventLoopGroup、Future等。

    编解码器:

    Java序列化的目的主要有两个:

    网络传输

    对象持久化

    Java序列化仅仅是Java编解码技术的一种,由于他的种种缺陷,衍生出多种编解码技术和框架。

    Java序列化的缺点:

    无法跨语言

    序列化后的码流太大

    序列化性能太低

    业界主流的编解码框架:

    Google Protobuf:支持Java、C++、Python三种语言,高效的编码性能,结构化数据存储格式(XML,JSON等)

    Facebook Thrift:适用于静态的数据交换,需要先确定好它的数据结构。结构变化后需要重新编译IDL文件,这也是Thrift的弱项。

    JBoss Marshalling:是一个Java对象的序列化API,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容。

    Hessian:一个轻量级的remoting onhttp工具,使用简单的方法提供了RMI的功能,采用的是二进制RPC协议。

    TCP粘包/拆包:

    TCP是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓存区的实际情况进行包的划分,所以在业务上的一个完整的包,可能被TCP拆分成多个包进行发送,也可能把多个小包封装成一个大的数据包发送,这就是TCP粘包和拆包问题。

    有如下几种情况:

    正常情况

    粘包

    粘包和拆包同时发生


    2. Netty线程模型

    在JAVA NIO方面Selector给Reactor模式提供了基础,Netty结合Selector和Reactor模式设计了高效的线程模型。

    关于Java NIO 构造Reator模式,Doug Lea在《Scalable IO in Java》中给了很好的阐述,这里截取PPT对Reator模式的实现进行说明。

    (1)Reactor单线程模型

    单线程模型

    这是最简单的Reactor单线程模型,由于Reactor模式使用的是异步非阻塞IO,所有的IO操作都不会被阻塞,理论上一个线程可以独立处理所有的IO操作。这时Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分发请求到处理链中。

    对于一些小容量应用场景,可以使用到单线程模型。但对于高负载,大并发的应用却不合适,主要原因如下:

    1. 当一个NIO线程同时处理成百上千的链路,性能上无法支撑,即使NIO线程的CPU负荷达到100%,也无法完全处理消息。

    2. 当NIO线程负载过重后,处理速度会变慢,会导致大量客户端连接超时,超时之后往往会重发,更加重了NIO线程的负载。

    3. 可靠性低,一个线程意外死循环,会导致整个通信系统不可用。

    为了解决这些问题,出现了Reactor多线程模型。

    (2)Reactor多线程模型

    多线程模型

    相比上一种模式,该模型在处理链部分采用了多线程(线程池)。

    在绝大多数场景下,该模型都能满足性能需求。但是,在一些特殊的应用场景下,如服务器会对客户端的握手消息进行安全认证。这类场景下,单独的一个Acceptor线程可能会存在性能不足的问题。为了解决这些问题,产生了第三种Reactor线程模型。

    (3)Reactor主从模型 

    主从模型

    该模型相比第二种模型,是将Reactor分成两部分,mainReactor负责监听server socket,accept新连接;并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,对业务处理功能,其扔给worker线程池完成。通常,subReactor个数上可与CPU个数等同。

    利用主从NIO线程模型,可以解决一个服务端监听线程无法有效处理所有客户端连接的性能不足问题,因此,在Netty的官方Demo中,推荐使用该线程模型。

    (4)Netty模型

    Netty的线程模型并不是一成不变,它实际取决于用户的启动参数配置。通过设置不同的启动参数,Netty可以同时支持Reactor单线程模型、多线程模型和主从模型。

    前面介绍完 Netty 相关一些理论,下面从功能特性、模块组件来介绍 Netty 的架构设计。

     

    3. Netty功能特性

    Netty的功能特性

    4. Netty模块组件

    Netty主要有下面一些组件:

    Selector

    NioEventLoop

    NioEventLoopGroup

    ChannelHandler

    ChannelHandlerContext

    ChannelPipeline

     

    Selector

    Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。

    NioEventLoop

    其中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务。

    NioEventLoopGroup

    主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。

    ChannelHandler

    是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。

    ChannelHandlerContext

    保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。

    ChannelPipeline 是保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。

    ChannelPipeline对事件流的拦截和处理流程:

    ChannelPipeline入站出站图

    Netty中的事件分为Inbond事件和Outbound事件。

    Inbound事件通常由I/O线程触发,如TCP链路建立事件、链路关闭事件、读事件、异常通知事件等。

    Outbound事件通常是用户主动发起的网络I/O操作,如用户发起的连接操作、绑定操作、消息发送等。

     

    在 Netty中,Channel 、ChannelHandler、ChannelHandlerContext、 ChannelPipeline的关系如下图:

    各组件关系图

    一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。

    入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。

     

    展开全文
  • netty 原理分析

    2018-03-16 15:19:11
    之前在github上发现了一篇非常棒的 netty 原理说明,分享一下 netty 原理分析

    之前在github上发现了一篇非常棒的 netty 原理说明,分享一下
    netty 原理分析

    展开全文
  • Netty原理与基础

    2019-11-23 16:09:44
    文章目录Netty原理与基础Netty简介第一个Netty的案例实践DiscardServer案例功能Netty项目依赖第一个Netty服务器端程序NettyDiscardHandler解密Netty中的Reactor反应器模式Reactor反应器的IO事件的处理流程Netty中的...
  • Netty原理架构解析

    2019-09-24 07:33:03
    Netty原理架构解析 Netty原理架构解析 转载自:http://www.sohu.com/a/272879207_463994本文转载关于Netty的原理架构解析,方便之后巩固复习 Netty是一个异步事件驱动的...
  • Netty原理和使用

    2019-03-04 13:46:04
    Netty原理和使用  Netty是一个高性能 事件驱动的异步的非堵塞的IO(NIO)框架,用于建立TCP等底层的连接,基于Netty可以建立高性能的Http服务器。支持HTTP、 WebSocket 、Protobuf、 Binary TCP |和UDP,Netty已经被...
  • Netty原理 & 手写简化版Netty背景&目的手写简化版NettyNetty 原理Reactor 线程模型简化为两个线程组Selector 事件注册事件响应与处理Channel 通道NettyChannel 通道Pipeline 职责链ByteBuf 复用机制总结 ...
  • Netty原理剖析

    2018-12-07 09:45:00
    Netty原理剖析 https://blog.csdn.net/excellentyuxiao/article/details/53390408 1. Netty简介Netty是一个高性能、异步事件驱动的NIO框架,基于JAVA NIO提供的API实现。它提供了对TCP、UDP和文件传输的支持,...
  • 前言:netty之初学总结,主要是对netty原理的梳理及netty相关实例的整合。总结如下:一张图,一个项目。手动biu一下自己,好好学习,天天向上 原理图 这张原理图旨在提供一个netty的全局学习思路,需自行深入学习每...
  • netty原理之蔚蓝天空

    2020-03-03 12:43:03
    netty原理分析 Netty简介 Netty是一个高性能、异步事件驱动的NIO框架,基于JAVA NIO提供的API实现。它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,通过Future-...
  • Netty原理与应用

    2018-07-04 12:28:47
    本文档35页,从netty底层实现的各种协议原理开始介绍,到netty的各种具体实现方式,都进行了详细描述。
  • Netty原理及代码实现

    2020-03-10 19:45:39
    一.Netty原理分析 RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点 ,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送...
  • netty原理的讲解

    2019-02-28 17:22:21
    从IO流到BIO,再到NIO,AIO,以及Reactor模式和Proactor模式来分析netty
  • 通过上一篇博客《Netty原理解析系列(五)—Netty线程模型》中了解了Netty的线程模型,对Netty的整体架构有了一定的认识。这篇文章将介绍Netty相关的核心组件,并通过一些简单的demo来了解如何使用Netty。 2.Netty核心...
  • 这可能是目前最透彻的Netty原理架构解析 本文基于 Netty 4.1 展开介绍相关理论模型,使用场景,基本组件、整体架构,知其然且知其所以然,希望给大家在实际开发实践、学习开源项目方面提供参考。 Netty 是一个...
  • Netty原理解析

    2018-06-10 20:34:39
    Netty是JBoss出品的高效的Java NIO开发框架,关于其使用,可参考我的另一篇文章 netty使用初步。本文将主要分析Netty实现方面的东西,由于精力有限,本人并没有对其源码做了极细致的研 究。如果下面的内容有错误或...
  • Netty原理简介

    2019-12-16 16:03:18
    本文主要介绍Netty实现原理和设计。 NIO模式 在介绍NIO模式前,需要说明一下BIO模式它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 43,679
精华内容 17,471
关键字:

netty原理