精华内容
下载资源
问答
  • Netty 线程池

    万次阅读 2021-08-09 09:49:58
    Netty线程池有什么样的特性 Java 原生线程池 Java 原生的线程池主要有三种:ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool。 ThreadPoolExecutor 是最古老的类,我们通常说的线程池,也是指这...

    Netty的线程池有什么样的特性

    Java 原生线程池

    在这里插入图片描述
    Java 原生的线程池主要有三种:ThreadPoolExecutorScheduledThreadPoolExecutorForkJoinPool

    ThreadPoolExecutor 是最古老的类,我们通常说的线程池,也是指这个类。

    ScheduledThreadPoolExecutor 是用来执行定时任务的线程池。

    ForkJoinPool 是 Java7 新增的类,它使用的是工作窃取的算法实现的一种高效的线程池,非常适合解决大任务不断地拆成小任务,小任务再最终合并成结果的场景,比如,归并排序,等等.

    Java 还提供了一个工具类 Executors 专门用于生成各种不同的线程池,不过阿里巴马开发者手册中,强制禁止使用此工具类创建线程池。

    你知道 Executors 可以创建哪些线程池吗?
    在这里插入图片描述
    Executors 主要提供了这么六种方法用来创建线程池,当然,针对每个方法可能还有重载方法,**但是,为什么阿里巴巴又禁止使用呢?**看完下面的分析,你可以想想这个问题。

    为什么阿里巴巴禁止使用 Executors 来创建线程池呢???

    简而言之:
    1.任务队列 或者说 工作队列 过大,造成OOM
    2.线程数量创建过多,造成OOM

    原因在于:(摘自阿里编码规约) 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors各个方法的弊端: 1)newFixedThreadPool和 newSingleThreadExecutor: 主要问题是堆积的请求处理队列可能会耗费非常大的内存,甚至OOM。 2)newCachedThreadPool和 newScheduledThreadPool: 主要问题是线程数最大数是Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至OOM。

    ThreadPoolExecutor
    ThreadPoolExecutor 的主要属性:

    public class ThreadPoolExecutor extends AbstractExecutorService {
    
        // 控制变量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        
        // 任务队列
        private final BlockingQueue<Runnable> workQueue;
        
    }
    

    ThreadPoolExecutor 的主要属性就是这么两个:

    ctl,控制变量,高 3 位存储的是线程池的状态,这些状态有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED,代表了线程池的生命周期,低 29 位存储的是工作线程的数量。
    在这里插入图片描述
    workQueue,工作队列(任务队列),它是一个阻塞队列,即在多线程环境下是安全的,有人认为叫作 taskQueue 可能更合适,或许工作队列更加抽象吧,任务更加具体化。

    先分享一下,工作中,使用线程池的例子:
    下面的例子就是:异步创建子线程去同步数据到ES

    @Aspect
    @Component
    public class SynEsAspect {
    
    	private static final Logger LOGGER = LoggerFactory.getLogger(SynEsAspect.class);
    
    	@Resource ElasticSearchCreateService elasticSearchCreateService;
    
    	private ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 30, TimeUnit.SECONDS,
    			new LinkedBlockingDeque<Runnable>(3000),new ProtoBufThreadFactory(CommonConstant.ThreadName.THREAD_NAME_ASYNC_ASPECT_ES));
    
    	@Pointcut("execution(* com.***.retail.order..*(..)) && @annotation(com.***.retail.order.annotate.SynEsOpt)")
    	public void synPointCut() {
    
    	}
    
    	@AfterReturning(returning="object",pointcut = "synPointCut()")
    	public void doAfterReturning(JoinPoint pjp,Object object) throws Throwable {
    			executor.submit(new Runnable() {
    				@Override
    				public void run() {
    					try {
    						sysEs(pjp);
    						String threadName = Thread.currentThread().getName();
    						LOGGER.info("线程名称是:" + threadName + "异步创建或更新ES数据");
    					} catch( Exception e ) {
    						LOGGER.error("synEsAspect doAfterReturning exception ={}",e.getMessage());
    					}
    				}
    			});
    	}
    
    	private void sysEs(JoinPoint pjp) throws Exception {
    		for (Method method : Class.forName(pjp.getTarget().getClass().getName()).getMethods()) {
    			if (method.getName().equals(pjp.getSignature().getName())) {
    				Class[] clazzs = method.getParameterTypes();
    				if (clazzs.length == pjp.getArgs().length) {
    					create(method,pjp);
    					break;
    				}
    			}
    		}
    	}
    
    	private void create(Method method,JoinPoint pjp){
    		SynEsOpt synEsOpt = method.getAnnotation(SynEsOpt.class);
    		Object arg = null;
    		Object[] args = pjp.getArgs();
    		if(args!=null && args[0]!=null){
    			arg = args[0];
    		}
    		if(arg!=null){
    			switch( synEsOpt.operationType()){
    			case UPDATE:
    				elasticSearchCreateService.updateSynEs(arg,synEsOpt.indexName(),synEsOpt.module(),null);
    				break;
    			case INSERT:
    				elasticSearchCreateService.createSynEs(arg,synEsOpt.indexName(),synEsOpt.module(),null);
    				break;
    			default:
    			}
    		}
    
    	}
    
    }
    

    好文章分享 Java线程池实现原理及其在美团业务中的实践

    我们再来看一看 ThreadPoolExecutor 的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        ...
    }
    

    面试中也经常会问到:请你说一说线程池的参数?

    针对 ThreadPoolExecutor,它的构造方法一共有七个参数:
    corePoolSize,核心线程数,默认情况下,这部分线程不会被销毁
    maximumPoolSize,最大线程数,最大可以创建多少个线程
    keepAliveTime,线程保持时间,线程等待多长时间还没有任务就销毁
    unit,线程保持时间的单位
    workQueue,任务队列,存放任务的队列
    threadFactory,创建线程的工厂
    handler,拒绝策略,当线程池无法再承载更多的任务时如何拒绝

    任务流转
    上面介绍了七个参数,那么,当我们提交一个任务到线程池的时候,这个任务又经历了怎样的历程呢?也就是任务的流转,这部分逻辑主要是在 execute () 方法中:

    public void execute(Runnable command) {
        // 判空
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        // 工作线程的数量小于核心线程数时
        if (workerCountOf(c) < corePoolSize) {
            // 添加一个工作线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 工作线程的数量达到核心线程数时,任务入队
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 任务队列满了,添加一个工作线程
        else if (!addWorker(command, false))
            // 添加工作线程失败(达到了最大线程数),执行拒绝策略
            reject(command);
    }
    

    任务的流转主要分为五个阶段:

    1 当工作线程数小于核心线程数时,直接创建一个工作线程来执行任务;
    2 当工作线程数达到核心线程数时,尝试入队,入队成功则进入任务队列中,等待被执行;
    3 如果入队失败,表示队列满了,则尝试创建一个工作线程来执行任务;
    4 如果创建工作线程失败,则执行拒绝策略;
    5 对于在任务队列中等待的任务,待有空闲线程时,它们会从队列中被提取出来执行。

    用一张图来描述这整个过程:
    在这里插入图片描述通过代码,可以看到,在 execute () 的方法中,只涉及到了 3 个参数,另外四个是在哪里使用的呢?
    1 maximuPoolSize,最大线程数,这个是在 addWorker () 方法中使用到的,如果达到了最大线程数,则会创建失败;

    2 threadFactory,线程工厂,这个也是在 addWorker () 方法中,具体点是在 new Worker () 的构造方法中,用来创建一个线程与 Worker 对象进行绑定;

    3 keepAliveTime 和 unit,线程保持时间,这一对参数是在 getTask () 方法中,表示从任务队列中取任务时,阻塞多长时间没有取到任务则结束阻塞,此时返回的任务为 null,工作线程会自然消亡

    从任务的流转过程来看,似乎很完美,但是,这个线程池有什么缺陷呢?

    缺点 性能不够完美

    其实,对于我们日常的工作来说,Java 原生的线程池对于我们来说已经足够完美,

    但是,对于追求高性能的 Netty 看来,性能这块还是有点欠缺啊。

    性能瓶颈在于任务队列的设计上去

    这里说的性能主要体现在任务队列的设计上,我们想像一下,当线程数量特别多的时候,多个线程都去竞争这一个队列,势必会导致性能的下降。

    那么,有没有更好的设计呢?
    有,ForkJoinPool,它使用工作窃取的算法,
    将队列分成了全局队列和线程私有队列,
    总体性能有了很大的提高。

    但是,Netty 的场景似乎又不太一样,它特殊在哪里呢?

    Netty 的场景 —— 事件循环机制
    让我们一起进入 Netty 的世界探寻它的线程池吧。
    Netty 线程池 场景场景,场景 更适用于 Netty 的场景 —— 事件循环机制
    Netty 线程池,是对 Java 原生线程池的一种增强,它的实现方式与 Java 原生线程池完全不一样,它的实现方式更适用于 Netty 的场景 —— 事件循环机制,所以,它的线程池又叫作事件循环线程池,即 EventLoopGroup。

    同样地,对于 Netty 线程池的分析,我们也遵循从宏观到微观的分析方法。
    在这里插入图片描述
    我们可以把这个图分为上下两部分,

    上半部分是 Java 原生的接口,包括线程池的接口以及迭代器的接口,
    下半部分是 Netty 扩展的接口。

    Netty 主要扩展了两层接口:
    第一层是 EventExecutorGroup,它扩展了 Java 原生的 ScheduledExecutorService 接口和 Iterable 接口,说明它同时具有定时执行任务的能力,以及迭代的能力,是不是很奇怪,它迭代的是什么?对于 EventExecutorGroup,目前,它只有一个纯的实现类 DefaultEventExecutorGroup,而且还没有被使用到。

    第二层是 EventLoopGroup,它扩展了 EventExecutorGroup,同时,它添加了一些跟 Channel 绑定的方法,说明它是一个跟网络请求息息相关的接口,

    目前,Netty 中使用的都是基于 EventLoopGroup 的线程池。

    既然,目前,Netty 都没有使用到只跟 EventExecutorGroup 相关的实现类,那么,把 EventExecutorGroup 和 EventLoopGroup 合并行不行呢?其实,也是可以的,不过,这样就破坏了接口隔离的原则而且,对于以后的扩展也是不友好的,比如,在后面的版本中,就是需要某种线程池,它不是处理网络请求的,这时候我们只要实现 EventExecutorGroup 就可以了,而不再需要实现 EventLoopGroup。

    同样地,针对这两层线程池的接口,Netty 也扩展出了两个工作线程的接口:
    在这里插入图片描述
    从图中可以看到,这两个工作线程的接口就是 EventExecutor 和 EventLoop,它们类似于 Java 原生线程池中的 Worker,它们本身不是线程,但是,它们维护了一个线程用来执行任务。

    不过,你可能也发现了,EventExecutor 竟然继承自 EventExecutorGroup,且 EventLoop 继承自 EventLoopGroup,为什么设计得如此复杂呢?

    那是因为,在 Netty 看来,EventExecutor 它就是一种特殊的 EventExecutorGroup,可以理解成,它是只包含一个线程的线程池,所以,在 Netty 中,你可以把任何 EventExecutor 的实现当作单线程的线程池使用,类似于 Executors 工具类提供 newSingleThreadExecutor () 方法,同样地,EventLoop 也是一样的道理。

    调试用例
    从继承体系中,我们想找到蛛丝马迹来编写调试用例是不太容易的,此时,我们打开 EventLoop 和 EventLoopGroup 的实现类,会发现,除了 DefaultEventLoop 和 DefaultEventLoopGroup,其它的实现类都是跟 Channel 强相关的,里面都是为了处理 Channel 的 IO 事件,其实,它们也正是为了不同平台而编写的多路利用的事件处理器。

    如果一上来就看这些类,我们势必要迷失在 Netty 的线程池中而不能自拔,所以,我们选择足够简单的 DefaultEventLoop 和 DefaultEventLoopGroup 来作为我们的研究对象,编写调试用例。
    既然,它就是线程池,那我们就按线程池的写法来写调试用例就好了,
    下面是我写的调试用例:

    public class DefaultEventLoopGroupTest {
        public static void main(String[] args) {
            DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(5);
    
            for (int i = 0; i < 10; i++) {
                eventLoopGroup.execute(() -> {
                    System.out.println("thread: " + Thread.currentThread().getName());
                });
            }
        }
    }
    

    源码剖析
    创建 DefaultEventLoopGroup
    让我们来看这段代码的运行逻辑,在创建 DefaultEventLoopGroup 的位置打一个断点,跟踪进去:

    public DefaultEventLoopGroup(int nThreads) {
        this(nThreads, (ThreadFactory) null);
    }
    // 省略中间的构造方法
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        if (executor == null) {
            // key1,使用了一个叫作ThreadPerTaskExecutor
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        // 初始化工作线程数组
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // key2,创建工作线程
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    // 创建失败的处理,相当于回滚
                    // 省略这部分代码
                }
            }
        }
    
        // key3,创建选择器
        chooser = chooserFactory.newChooser(children);
    
        // 省略其它代码
    }
    

    在构造方法中,主要是初始化一些属性,这里有三个比较重要的点:

    1 在 key1 处创建一个 ThreadPerTaskExecutor,并在创建 key2 处创建工作线程的时候当作参数传进去了,这个 ThreadPerTaskExecutor 是什么,它的作用是什么?

    2.key2 处的 EventExecutor 是如何创建的?

    3.key3 处的选择器的作用是什么?

    我们先说第三点吧,这个选择器是什么呢?其实,它是 DefaultEventLoopGroup 用来选择哪一个 DefaultEventLoop 来执行任务时使用的,在 Netty 内部,有两种选择器,分别为 PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,它们本质上来说没有什么区别,主要的区别在于如果数量为 2 的 N 次方,会使用 PowerOfTwoEventExecutorChooser 按 & 操作来计算下一个 EventExecutor,而 GenericEventExecutorChooser 则按 % 运算来计算下一个 EventExecutor,本质上都是取模运算,显然直接使用 & 操作效率更高一些,这是 Netty 优化到极致的一个表现:

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;
        @Override
        public EventExecutor next() {
            // &操作,减法操作优先级高于&操作
            // 轮询地获取EventExecutor(DefaultEventLoop)
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }
    
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;
        // &操作
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
    

    好了,我们再回头看第一点,ThreadPerTaskExecutor,从名字看表示每个任务一个线程的执行器,请看它的真面目:

    public final class ThreadPerTaskExecutor implements Executor {
    
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        }
    
        @Override
        public void execute(Runnable command) {
            // 使用线程工厂创建一个线程并启动这个线程
            threadFactory.newThread(command).start();
        }
    }
    

    这个类非常简单,只有一个 execute () 方法,在被调用的时候使用线程工厂创建一个线程并启动这个线程,所以**,它有一个问题**,就是 execute () 方法每被调用一次就创建一个线程,这也是它的名字的由来,来一个任务创建一个线程。

    好了,我们接着看第二点,EventExecutor 是如何被创建的,这里是调用了 newChild () 的方法,这个方法实际上是位于 DefaultEventLoopGroup 中:

    
    // io.netty.channel.DefaultEventLoopGroup#newChild
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new DefaultEventLoop(this, executor);
    }
    
    // 省略中间的构造方法
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        // key1, 包装了一下传进来的ThreadPerTaskExecutor
        // 注意第二个参数
        this.executor = ThreadExecutorMap.apply(executor, this);
        // key2, 任务队列,默认使用的是LinkedBlockingQueue
        taskQueue = newTaskQueue(this.maxPendingTasks);
        // 拒绝策略
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    
    // 创建任务队列(默认的),子类可重写此方法
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
    }
    
    

    这段代码,最终,创建了一个 DefaultEventLoop,且这个 DefaultEventLoop 绑定了一个

    1 executor 和一个任务队列,请注意这里的包含的信息: executor,它是被包装之后的 ThreadPerTaskExecutor,如果被多次执行,那就会创建多个线程,所以,这个 executor 是不是只能执行一次 execute () 方法呢?

    2 taskQueue,这个 DefaultEventLoop 包含一个任务队列,如果上面的 1 成立,也就是说一个 DefaultEventLoop 只有一个线程,那这个任务队列就是这个线程独享的,
    所以,它的出队操作不存在竞争,还记得我们前面介绍的多生产者单消费者队列 ——MpscArrayQueue 吗?
    我们先卖个关子,直接看任务的执行流程。

    任务的执行流程
    待上面的 DefaultEventLoopGroup 创建完毕后,程序又回到的 main () 方法中,我们在任务执行的地方跟踪进去,请注意每个方法的类名:

    
    
    // io.netty.util.concurrent.AbstractEventExecutorGroup#execute
    @Override
    public void execute(Runnable command) {
        // 调用选择器选择一个DefaultEventLoop
        // 根据上面选择器的源码,可知,使用的是轮询方式
        next().execute(command);
    }
    
    
    // io.netty.util.concurrent.SingleThreadEventExecutor#execute
    @Override
    public void execute(Runnable task) {
        ObjectUtil.checkNotNull(task, "task");
        // 调用下面的私有方法
        execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
    }
    
    
    // io.netty.util.concurrent.SingleThreadEventExecutor#execute
    private void execute(Runnable task, boolean immediate) {
        // 当前线程是main,所以不在eventLoop中
        boolean inEventLoop = inEventLoop();
        // 添加任务到当前这个DefaultEventLoop的任务队列中
        // 如果添加失败会执行拒绝策略
        addTask(task);
        // 非不在,所以进入条件
        if (!inEventLoop) {
            // 启动线程
            startThread();
            // 省略其它代码
        }
    
    }
    

    到这里还很好理解,先把任务扔到一个队列中,再启动一个线程来运行它,关键是这个线程是如何启动的,继续跟踪到 startThread () 方法中:

    
    // io.netty.util.concurrent.SingleThreadEventExecutor#startThread
    private void startThread() {
    
        // 如果当前DefaultEventLoop的状态是未启动,才执行下面的内容
        // 也就是说对于一个DefaultEventLoop来说,这个判断下方的内容只会执行一次
        // 也就是说一个DefaultEventLoop只会创建一个线程!
        if (state == ST_NOT_STARTED) {
        
            // 原子更新状态变量,又使用到了前面介绍过的AtomicIntegerFieldUpdater这种方式
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                
                    // 又一层调用
                    doStartThread();
                    success = true;
                    
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }
    

    好了,到这里大致的逻辑已经很清晰了,通过上面的注释,一个 DefaultEventLoop 不管执行多少次任务,只会启动一个线程,我们再接着看 doStartThread () 的内部逻辑,这个方法有七八十行,我把干扰代码都删除了:

    // io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
    private void doStartThread() {
        assert thread == null;
        
        // 这个executor是什么?
        // 它就是上面我们没介绍的被包装之后的ThreadPerTaskExecutor
        
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    // SingleThreadEventExecutor.this表示的是DefaultEventLoop对象
                    // 所以,会调用到DefaultEventLoop的run()方法
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                } finally {
                }
            }
        });
        
    }
    

    好烦啊,这里又把任务包装了一层,然后,调用了被包装之后的 ThreadPerTaskExecutor 的 execute () 方法,好了,下面就是揭开这个包装类真面目的时候了,上面 execute () 方法指向的是下面我加了标记的那行:

    public final class ThreadExecutorMap {
    
        // 一个FastThreadLocal,存储着一个EventExecutor
        private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
        
        // 第一个参数是ThreadPerTaskExecutor
        // 第二个参数是DefaultEventLoop
        public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
            ObjectUtil.checkNotNull(executor, "executor");
            ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
            // 返回一个Executor的匿名对象
            return new Executor() {
                @Override
                public void execute(final Runnable command) {
                    // ***************这行****************
                    // 调用下面的apply
                    // 这个executor就是真正的ThreadPerTaskExecutor了
                    executor.execute(apply(command, eventExecutor));
                }
            };
        }
        
        // 第一个参数是任务
        // 第二个参数是DefaultEventLoop
        public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
            ObjectUtil.checkNotNull(command, "command");
            ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
            return new Runnable() {
                @Override
                public void run() {
                    // 设置DefaultEventLoop到FastThreadLocal中
                    // 这样任务执行的过程中,都可以随时获取到这个DefaultEventLoop
                    setCurrentEventExecutor(eventExecutor);
                    try {
                        command.run();
                    } finally {
                        // 执行完了移除
                        setCurrentEventExecutor(null);
                    }
                }
            };
        }
        
        private static void setCurrentEventExecutor(EventExecutor executor) {
            mappings.set(executor);
        }
    }
    

    这里不管是对 ThreadPerTaskExecutor 的包装还是对任务的包装,都是为了找个地方把 DefaultEventLoop 存储到线程本地变量中去,以便任务在执行的过程中随时可以使用 DefaultEventLoop。
    好了,程序接着走就到 ThreadPerTaskExecutor 的 execute () 方法中了:

    // io.netty.util.concurrent.ThreadPerTaskExecutor#execute
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
    

    这里就调用线程工厂创建一个线程了,当然,这个线程自然是 FastThreadLocalThread,然后,启动这个线程。

    此时,这个任务已经被包装了 N 层了,所以,在跳过这行之前,先在 main () 方法中任务内部打一个断点,即下面的 System.out.println () 处:

    public class DefaultEventLoopGroupTest {
    
        public static void main(String[] args) {
        
            DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(5);
            for (int i = 0; i < 10; i++) {
                eventLoopGroup.execute(() -> {
                    System.out.println("thread: " + Thread.currentThread().getName());
                });
            }
        }
    }
    
    // io.netty.channel.DefaultEventLoop#run
    @Override
    protected void run() {
        for (;;) {
            // 从队列中取任务
            Runnable task = takeTask();
            if (task != null) {
                // 执行任务
                task.run();
                updateLastExecutionTime();
            }
    
            if (confirmShutdown()) {
                break;
            }
        }
    }
    

    这个 run () 方法位于 DefaultEventLoop 中,可以看到,这是一个死循环,不断地从任务队列中取任务,然后执行,一直重复着这个动作。

    结合我们前面的分析,一个 DefaultEventLoop 只会启动一个线程,而这个 DefaultEventLoop 又有自己专属的队列,所以,我们很容易就可以得出下面的线程模型:
    在这里插入图片描述
    在这个图中,我加入了 Provider 的概念,它就是任务的生产者,生产者可以有多个,所以,这就衍生出了一种多生产单消费者的任务队列,根据我们前面的学习,把这里的任务队列直接换成 MPSC 家族的队列是不是就能极大地提高效率呢?

    没错,你可以看看 NioEventLoop 中重写的 newTaskQueue () 方法:

    // io.netty.channel.nio.NioEventLoop#newTaskQueue(int)
    @Override
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return newTaskQueue0(maxPendingTasks);
    }
    
    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
            : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    }
    

    但是,这种线程模型有个致命的缺陷 —— 千万不要在任务中执行耗时的操作,否则这个线程对应的任务队列中的任务将全部都会处于排队状态,即使整个线程池中有其它空闲的线程,它们也不会从不是自己的任务队列中挪任务过来执行。关于这一点,你可以使用下面的例子证明:

    public class DefaultEventLoopGroupTest {
        public static void main(String[] args) {
            DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(2);
    
            for (int i = 0; i < 10; i++) {
                if (i%2 == 0) {
                    eventLoopGroup.execute(() -> {
                        System.out.println("thread: " + Thread.currentThread().getName());
                    });
                } else {
                    eventLoopGroup.execute(() -> {
                        LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
                        System.out.println("thread: " + Thread.currentThread().getName());
                    });
                }
            }
        }
    }
    

    运行此程序,你会发现,一号线程早早地就执行完毕了所有 5 个任务,而二号线程则是 1 秒执行一个任务,一号线程是不会借二号线程的任务执行的,这也是 Netty 线程池与 ForkJoinPool 线程池的最大区别。

    不过,这都不是个问题,Netty 线程池这样设计的目的也不是给我们的耗时业务使用的,如果有耗时的业务逻辑处理,请使用自定义的线程池进行处理,千万不要使用 Netty 的线程池

    好了,到这里,关于 Netty 的线程基本上就分析完毕了,有了这节的基础,相信你去看 NioEventLoop 的代码一定也会非常轻松的 ^^

    本节,我们从宏观和微观两个层面深入剖析了 Netty 的线程池,我们一定要记住一点:

    Netty 的线程池中坚决不允许执行耗时操作。

    随着本节内容的结束,所有源码的分析就到这里了,但是,这只是一个起点,
    有了关于
    ByteBuf、
    内存池、
    对象池、
    FastThreadLocal、
    MpscArrayQueue、
    Future、
    线程池的这些源码分析,我希望你可以回过头再把服务启动过程、
    数据接收写出过程的源码再仔细分析一遍,这样才能真正地达到从源码级别理解 Netty

    展开全文
  • 关于资源, Java线程的线程栈所占用的内存是在Java堆外的,所以是不受java程序控制的,只受系统资源限制,默认一个线程的线程栈大小是1M(当然这个可以通过设置-Xss属性设置,但是要注意栈溢出问题),但是,如果每个...

    1、前言 线程开销

    a、创建线程
    • 关于时间,创建线程使用是直接向系统申请资源的,这里调用系统函数进行分配资源的话耗时不好说。
    • 关于资源, Java线程的线程栈所占用的内存是在Java堆外的,所以是不受java程序控制的,只受系统资源限制,默认一个线程的线程栈大小是1M(当然这个可以通过设置-Xss属性设置,但是要注意栈溢出问题),但是,如果每个用户请求都新建线程的话,1024个用户光线程就占用了1个G的内存,如果系统比较大的话,一下子系统资源就不够用了,最后程序就崩溃了。
    • 对操作系统来说,创建一个线程的代价是十分昂贵的, 需要给它分配内存、列入调度,同时在线程切换的时候还要执行内存换页,CPU 的缓存被 清空,切换回来的时候还要重新从内存中读取信息,破坏了数据的局部性。
    b、上下文切换
    • 什么是上下文切换?
      举个栗子:医院有固定的科室,以及一定数量的医护人员,如果平时病人不多的话,接诊是很轻松的事,倘若因为某天很多人都生病了,医院肯定忙不过来,这天,赵四去瞧病,刚拍完片子,医生突然接到通知要去处理一个急诊病人,由于人手不够,他只能先记录一下当前赵四都看了什么,然后告诉赵四:刘能腿折了,我先去看他,一会再回来看你
    • 上下文切换的实际开销会随着平台的不同而变化,就比如每个医院的看病效率会不同,按照经验来看:在多数通用的处理器中,这个开销相当于5000~10000个始终周期,差不多有几微秒。
    展开全文
  • Netty业务线程池问题

    2021-06-21 22:51:43
    问题是这样的Netty4启动一个http的server,handler线程池是使用Netty提供的 EventExecutorGroup group=new DefaultEventExecutorGroup(3); 假设启动三个线程 ch.pipeline().addLast(group,new ...
  • 接下来的时间灯塔君持续更新Netty系列一共九篇 Netty 源码解析(一): 开始 Netty 源码解析(二): Netty 的 Channel Netty 源码解析(三): Netty的 Future 和 Promise Netty 源码解析(四): Netty 的 ...

    在这里插入图片描述

    今天是猿灯塔“365篇原创计划”第五篇。 接下来的时间灯塔君持续更新Netty系列一共九篇

    Netty 源码解析(一): 开始

    Netty 源码解析(二): Netty 的 Channel

    Netty 源码解析(三): Netty的 Future 和 Promise

    Netty 源码解析(四): Netty 的 ChannelPipeline

    当前:Netty 源码解析(五): Netty 的线程池分析

    Netty 源码解析(六): Channel 的 register 操作

    Netty 源码解析(七): NioEventLoop 工作流程

    Netty 源码解析(八): 回到 Channel 的 register 操作

    Netty 源码解析(九): connect 过程和 bind 过程分析

    今天呢!灯塔君跟大家讲:

    Netty 的线程池分析

    Netty 中的线程池 EventLoopGroup
    接下来,我们来分析 Netty 中的线程池。Netty 中的线程池比较不好理解,因为它的类比较多,而且它们之间的关系错综复杂。看下图,感受下 NioEventLoop 类和 NioEventLoopGroup 类的继承结构:

    在这里插入图片描述

    这张图我按照继承关系整理而来,大家仔细看一下就会发现,涉及到的类确实挺多的。本节来给大家理理清楚这部分内容。

    首先,我们说的 Netty 的线程池,指的就是 NioEventLoopGroup 的实例;线程池中的单个线程,指的是右边 NioEventLoop 的实例。

    回顾下我们第一节介绍的 Echo 例子,客户端和服务端的启动代码中,最开始我们总是先实例化 NioEventLoopGroup:

    // EchoClient 代码最开始:
    EventLoopGroup group = new NioEventLoopGroup();
    
    // EchoServer 代码最开始:
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    下面,我们就从 NioEventLoopGroup 的源码开始进行分析。

    我们打开 NioEventLoopGroup 的源码,可以看到,NioEventLoopGroup 有多个构造方法用于参数设置,最简单地,我们采用无参构造函数,或仅仅设置线程数量就可以了,其他的参数采用默认值。

    比如上面的代码中,我们只在实例化 bossGroup 的时候指定了参数,代表该线程池需要一个线程。

    public NioEventLoopGroup() {
        this(0);
    }
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }
    
    ...
      
    // 参数最全的构造方法
    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        // 调用父类的构造方法
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }
    

    我们来稍微看一下构造方法中的各个参数:

    • nThreads:这个最简单,就是线程池中的线程数,也就是 NioEventLoop 的实例数量。
    • executor:我们知道,我们本身就是要构造一个线程池(Executor),为什么这里传一个 executor 实例呢?它其实不是给线程池用的,而是给 NioEventLoop 用的,以后再说。
    • chooserFactory:当我们提交一个任务到线程池的时候,线程池需要选择(choose)其中的一个线程来执行这个任务,这个就是用来实现选择策略的。
    • selectorProvider:这个简单,我们需要通过它来实例化 JDK 的 Selector,可以看到每个线程池都持有一个 selectorProvider 实例。
    • selectStrategyFactory:这个涉及到的是线程池中线程的工作流程,在介绍 NioEventLoop 的时候会说。
    • rejectedExecutionHandler:这个也是线程池的好朋友了,用于处理线程池中没有可用的线程来执行任务的情况。在 Netty 中稍微有一点点不一样,这个是给 NioEventLoop 实例用的,以后我们再详细介绍。

    这里介绍这些参数是希望大家有个印象而已,大家发现没有,在构造 NioEventLoopGroup 实例时的好几个参数,都是用来构造 NioEventLoop 用的。

    下面,我们从 NioEventLoopGroup 的无参构造方法开始,跟着源码走:

    public NioEventLoopGroup() {
        this(0);
    }
    

    然后一步步走下去,到这个构造方法:

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
        
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    

    大家自己要去跟一下源码,这样才知道中间设置了哪些默认值,下面这几个参数都被设置了默认值:

    • selectorProvider = SelectorProvider.provider()

      这个没什么好说的,调用了 JDK 提供的方法

    • selectStrategyFactory = DefaultSelectStrategyFactory.INSTANCE

      这个涉及到的是线程在做 select 操作和执行任务过程中的策略选择问题,在介绍 NioEventLoop 的时候会用到。

    • rejectedExecutionHandler = RejectedExecutionHandlers.reject()

      大家进去看一下 reject() 方法,也就是说,Netty 选择的默认拒绝策略是:抛出异常

    跟着源码走,我们会来到父类 MultithreadEventLoopGroup 的构造方法中:

    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }
    

    这里我们发现,如果采用无参构造函数,那么到这里的时候,默认地 nThreads 会被设置为 *CPU 核心数 2。大家可以看下 DEFAULT_EVENT_LOOP_THREADS 的默认值,以及 static 代码块的设值逻辑。

    我们继续往下走:

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }
    

    到这一步的时候,new ThreadPerTaskExecutor(threadFactory) 会构造一个 executor。

    我们现在还不知道这个 executor 怎么用。这里我们先看下它的源码:

    public final class ThreadPerTaskExecutor implements Executor {
     private final ThreadFactory threadFactory;
    
     public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
         if (threadFactory == null) {
             throw new NullPointerException("threadFactory");
         }
         this.threadFactory = threadFactory;
     }
    
     @Override
     public void execute(Runnable command) {
         // 为每个任务新建一个线程
         threadFactory.newThread(command).start();
     }
    }
    

    Executor 作为线程池的最顶层接口, 我们知道,它只有一个 execute(runnable) 方法,从上面我们可以看到,实现类 ThreadPerTaskExecutor 的逻辑就是每来一个任务,新建一个线程

    我们先记住这个,前面也说了,它是给 NioEventLoop 用的,不是给 NioEventLoopGroup 用的。

    上一步设置完了 executor,我们继续往下看:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    

    这一步设置了 chooserFactory,用来实现从线程池中选择一个线程的选择策略。

    ChooserFactory 的逻辑比较简单,我们看下 DefaultEventExecutorChooserFactory 的实现:

    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
     if (isPowerOfTwo(executors.length)) {
         return new PowerOfTwoEventExecutorChooser(executors);
     } else {
         return new GenericEventExecutorChooser(executors);
     }
    }
    

    这里设置的策略也很简单:

    1、如果线程池的线程数量是 2^n,采用下面的方式会高效一些:

    @Override
    public EventExecutor next() {
     return executors[idx.getAndIncrement() & executors.length - 1];
    }
    

    2、如果不是,用取模的方式:

    @Override
    public EventExecutor next() {
     return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
    

    走了这么久,我们终于到了一个干实事的构造方法中了:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        // executor 如果是 null,做一次和前面一样的默认设置。
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        // 这里的 children 数组非常重要,它就是线程池中的线程数组,这么说不太严谨,但是就大概这个意思
        children = new EventExecutor[nThreads];
    
        // 下面这个 for 循环将实例化 children 数组中的每一个元素
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 实例化!!!!!!
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 如果有一个 child 实例化失败,那么 success 就会为 false,然后进入下面的失败处理逻辑
                if (!success) {
                    // 把已经成功实例化的“线程” shutdown,shutdown 是异步操作
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
    
                    // 等待这些线程成功 shutdown
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // 把中断状态设置回去,交给关心的线程来处理.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
        // ================================================
        // === 到这里,就是代表上面的实例化所有线程已经成功结束 ===
        // ================================================
    
        // 通过之前设置的 chooserFactory 来实例化 Chooser,把线程池数组传进去,
        //     这就不必再说了吧,实现线程选择策略
        chooser = chooserFactory.newChooser(children);
        
        // 设置一个 Listener 用来监听该线程池的 termination 事件
        // 下面的代码逻辑是:给池中每一个线程都设置这个 listener,当监听到所有线程都 terminate 以后,这个线程池就算真正的 terminate 了。
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        // 设置 readonlyChildren,它是只读集合,以后用到再说
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    

    上面的代码非常简单吧,没有什么需要特别说的,接下来,我们来看看 newChild() 这个方法,这个方法非常重要,它将创建线程池中的线程。

    我上面已经用过很多次"线程"这个词了,它可不是 Thread 的意思,而是指池中的个体,后面我们会看到每个"线程"在什么时候会真正创建 Thread 实例。反正每个 NioEventLoop 实例内部都会有一个自己的 Thread 实例,所以把这两个概念混在一起也无所谓吧。

    newChild(…) 方法在 NioEventLoopGroup 中覆写了,上面说的"线程"其实就是 NioEventLoop:

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    

    它调用了 NioEventLoop 的构造方法:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // 调用父类构造器
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        // 开启 NIO 中最重要的组件:Selector
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
    

    我们先粗略观察一下,然后再往下看:

    • 在 Netty 中,NioEventLoopGroup 代表线程池,NioEventLoop 就是其中的线程。
    • 线程池 NioEventLoopGroup 是池中的线程 NioEventLoop 的 parent,从上面的代码中的取名可以看出。
    • 每个 NioEventLoop 都有自己的 Selector,上面的代码也反应了这一点,这和 Tomcat 中的 NIO 模型有点区别。
    • executor、selectStrategy 和 rejectedExecutionHandler 从 NioEventLoopGroup 中一路传到了 NioEventLoop 中。

    这个时候,我们来看一下 NioEventLoop 类的属性都有哪些,我们先忽略它继承自父类的属性,单单看它自己的:

    private Selector selector;
    private Selector unwrappedSelector;
    private SelectedSelectionKeySet selectedKeys;
    
    private final SelectorProvider provider;
    
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    
    private final SelectStrategy selectStrategy;
    
    private volatile int ioRatio = 50;
    private int cancelledKeys;
    private boolean needsToSelectAgain;
    
    

    结合它的构造方法我们来总结一下:

    • provider:它由 NioEventLoopGroup 传进来,前面我们说了一个线程池有一个 selectorProvider,用于创建 Selector 实例

    • selector:虽然我们还没看创建 selector 的代码,但我们已经知道,在 Netty 中 Selector 是跟着线程池中的线程走的。也就是说,并非一个线程池一个 Selector 实例,而是线程池中每一个线程都有一个 Selector 实例。

      在无参构造过程中,我们发现,Netty 设置线程个数是 CPU 核心数的两倍,假设我们的机器 CPU 是 4 核,那么对应的就会有 8 个 Selector 实例。

    • selectStrategy:select 操作的策略,这个不急。

    • ioRatio:这是 IO 任务的执行时间比例,因为每个线程既有 IO 任务执行,也有非 IO 任务需要执行,所以该参数为了保证有足够时间是给 IO 的。这里也不需要急着去理解什么 IO 任务、什么非 IO 任务。

    然后我们继续走它的构造方法,我们看到上面的构造方法调用了父类的构造器,它的父类是 SingleThreadEventLoop。

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
    
        // 我们可以直接忽略这个东西,以后我们也不会再介绍它
        tailTasks = newTaskQueue(maxPendingTasks);
    }
    

    SingleThreadEventLoop 这个名字很诡异有没有?然后它的构造方法又调用了父类 SingleThreadEventExecutor 的构造方法:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        // taskQueue,这个东西很重要,提交给 NioEventLoop 的任务都会进入到这个 taskQueue 中等待被执行
        // 这个 queue 的默认容量是 16
        taskQueue = newTaskQueue(this.maxPendingTasks);
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    

    到这里就更加诡异了,NioEventLoop 的父类是 SingleThreadEventLoop,而 SingleThreadEventLoop 的父类是 SingleThreadEventExecutor,它的名字告诉我们,它是一个 Executor,是一个线程池,而且是 Single Thread 单线程的。

    也就是说,线程池 NioEventLoopGroup 中的每一个线程 NioEventLoop 也可以当做一个线程池来用,只不过它只有一个线程。这种设计虽然看上去很巧妙,不过有点反人类的样子。

    上面这个构造函数比较简单:

    • 设置了 parent,也就是之前创建的线程池 NioEventLoopGroup 实例

    • executor:它是我们之前实例化的 ThreadPerTaskExecutor,我们说过,这个东西在线程池中没有用,它是给 NioEventLoop 用的,马上我们就要看到它了。提前透露一下,它用来开启 NioEventLoop 中的线程(Thread 实例)。

    • taskQueue:这算是该构造方法中新的东西,它是任务队列。我们前面说过,NioEventLoop 需要负责 IO 事件和非 IO 事件,通常它都在执行 selector 的 select 方法或者正在处理 selectedKeys,如果我们要 submit 一个任务给它,任务就会被放到 taskQueue 中,等它来轮询。该队列是线程安全的 LinkedBlockingQueue,默认容量为 16。

    • rejectedExecutionHandler:taskQueue 的默认容量是 16,所以,如果 submit 的任务堆积了到了 16,再往里面提交任务会触发 rejectedExecutionHandler 的执行策略。

      还记得默认策略吗:抛出RejectedExecutionException 异常。

      在 NioEventLoopGroup 的默认构造中,它的实现是这样的:

      private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
          @Override
          public void rejected(Runnable task, SingleThreadEventExecutor executor) {
              throw new RejectedExecutionException();
          }
      };
      

    然后,我们再回到 NioEventLoop 的构造方法:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // 我们刚刚说完了这个
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        // 创建 selector 实例
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        
        selectStrategy = strategy;
    }
    

    可以看到,最重要的方法其实就是 openSelector() 方法,它将创建 NIO 中最重要的一个组件 Selector。在这个方法中,Netty 也做了一些优化,这部分我们就不去分析它了。

    到这里,我们的线程池 NioEventLoopGroup 创建完成了,并且实例化了池中的所有 NioEventLoop 实例。

    同时,大家应该已经看到,上面并没有真正创建 NioEventLoop 中的线程(没有创建 Thread 实例)。

    提前透露一下,创建线程的时机在第一个任务提交过来的时候,那么第一个任务是什么呢?是我们马上要说的 channel 的 register 操作。

    365天干货不断微信搜索「猿灯塔」第一时间阅读,回复【资料】【面试】【简历】有我准备的一线大厂面试资料和简历模板

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 28,178
精华内容 11,271
关键字:

netty线程池设置