精华内容
下载资源
问答
  • 2021-10-08 20:18:28

    submit 方法

    public abstract class AbstractExecutorService implements ExecutorService {
    
        // RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
        // 下面两个 newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        // 提交任务
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            // 1. 将任务包装成 FutureTask
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            // 2. 交给执行器执行,execute 方法由具体的子类来实现
            // 前面也说了,FutureTask 间接实现了Runnable 接口。
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            // 1. 将任务包装成 FutureTask
            RunnableFuture<T> ftask = newTaskFor(task, result);
            // 2. 交给执行器执行
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            // 1. 将任务包装成 FutureTask
            RunnableFuture<T> ftask = newTaskFor(task);
            // 2. 交给执行器执行
            execute(ftask);
            return ftask;
        }
    }
    
    

    尽管submit方法能提供线程执行的返回值,但只有实现了Callable才会有返回值,而实现Runnable的线程则是没有返回值的,也就是说在上面的3个方法中,submit(Callable task)能获取到它的返回值,submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值或者准确来说交给线程处理一下,而最后一个方法submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。

    源码分析

    这是FutureTask包装后的run方法,这个方法里 result = c.call(); 就是保存方法执行的结果。

    public void run() {
        //保证callable任务只被运行一次
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable < V > c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行任务,上面的例子我们可以看出,call()里面可能是一个耗时的操作,不过这里是同步的
                    result = c.call();
                    //上面的call()是同步的,只有上面的result有了结果才会继续执行
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //执行完了,设置result
                    set(result);
            }
        }
        finally {
            runner = null;
            int s = state;
            //判断该任务是否正在响应中断,如果中断没有完成,则等待中断操作完成
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    

    get方法

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    //这个方法是带有超时时间功能的
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    
    
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)
                Thread.yield();
            else if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) {
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
            else if (timed) {
            	...省略  
            }
            else
                LockSupport.park(this);
       }
    }
    
        /**
         * Returns result or throws exception for completed task.
         * 返回结果
         * @param s completed state value
         */
        @SuppressWarnings("unchecked")
        private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    

    总结

    其实就是把任务方法包装了一个新对象FutureTask,新对象会继承改写run方法,然后把新对象塞到线程池中,线程池复用线程来执行新对象的run方法,然后run方法中执行任务方法,并且把任务方法的返回值放到FutureTask的一个属性字段里,通过该对象的get方法可以获取属性字段值。

    参考博客

    深度解析submit方法

    更多相关内容
  • 下面小编就为大家带来一篇简单谈谈ThreadPoolExecutor线程池submit方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • 线程池submit和execute

    2020-09-07 09:14:03
    线程池的作用 : 避免大量的线程强占资源 避免大量的线程创建和销毁带来的开销 线程池的原理 : 创建线程池的时候,可以使用executors的静态...线程的执行有两种方式,一种是submit(runnable v)的形式,一种是execute(ru

    线程池的作用 :

    1. 避免大量的线程强占资源
    2. 避免大量的线程创建和销毁带来的开销

    线程池的原理 :
    创建线程池的时候,可以使用executors的静态方法,也可以使用new ThreadPoolExecutor的方式手动创建线程池,通过在线程池中指定参数达到创建不同类型的线程池的效果
    其中,executors底层其实也是调用的new ThreadPoolExecutor()的方式创建的,是对不同线程池的封装,
    线程的执行有两种方式,一种是submit(runnable v)的形式,一种是execute(runnable b) 的形式,不同的是submit可以返回一个future的实现类,相同的一点是submit底层其实也是调用的execute
    调用execut方法,首先判断传入的参数是否为空,如果为空,抛出异常,如果不为空,使用获取ctl值,计算出当前线程状态码,通过状态码计算出当前线程池工作线程是否小于核心线程数量
    如果小于,判断添加工作线程操作是否正常,如果正常,直接返回,如果不正常,继续执行获取ctl值,在添加工作线程的过程中,首先通过循环的方式保证ctl在加1的情况下状态同步,如果不同步,一直循环到同步为止,添加完成后,创建线程工作对象,把工作线程添加到set集合中,并执行.start,如果执行不成功,从set中删除添加的worker对象,并且ctl回滚到之前没有自增的值.
    如果上述中添加工作线程失败,或者当前线程池中工作线程数量操作和信息数量,执行下列逻辑
    判断当前线程池状态是否是running状态:
    如果不是running状态,或者是running状态,并且添加到线程队列失败,重新添加个工作线程,此时入参中第二个参数用于添加工作线程的逻辑中当前工作线程数量与最大线程数量做对比,如果添加失败,执行reject处理类处理

    如果是running状态,并且添加队列成功,重新获取ctl值,判断当前线程池状态如果是不是running状态,并且从对象中删除成功,则当前线程交给拒绝线程处理器处理,如果不满足上面条件,判断当前线程池的工作线程数如果为0,重新添加一个不带任务的线程.

    //AbstractExecutorService.java文件
        // executorService 中的 submit 方法
        public Future<?> submit(Runnable task) {
            // 首先判断传入的runnable 对象是否为空
            if (task == null) throw new NullPointerException();
            // 创建一个 futuretask 对象
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        // 根据runnable 创建一个futuretask对象
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
    
        // ThreadPoolExecutor.java文件
        // 执行创建线程池的方法
        public void execute(Runnable command) {
            // 首先判断传入的线程是否为空
            if (command == null)
                // 为空,抛出异常
                throw new NullPointerException();
            // 获取线程池的状态码, 这个状态码是自增的,原子类型的自增, 在执行addworker后ctl会加1
            int c = ctl.get();
            // 通过状态码,获取线程池中的线程的数量,如果小于核心数量
            if (workerCountOf(c) < corePoolSize) {
                // 添加线程到线程池,并且为true时使用核心线程数作为边界,如果false ,使用最大数量线程数作为边界
                if (addWorker(command, true))
                    // 添加完成后,返回
                    return;
                // 如果添加失败,重新获取状态值
                c = ctl.get();
            }
            // 执行下面逻辑有两种情况
            //      1. 工作线程数大于核心线程
            //      2. 添加线程时出错
            // 如果线程池中线程的数量大于核心的数量, 判断如果是运行状态, 并且也把线程加进了阻塞队列 workQueue 中
            if (isRunning(c) && workQueue.offer(command)) {
                // 重新获取 线程池 状态值
                int recheck = ctl.get();
                // 判断当前线程池如果不是运行状态,并且成功从队列中移除(从workQueue中移除线程, 并尝试终止线程池)
                if (! isRunning(recheck) && remove(command))
                    // 执行拒绝执行线程的处理
                    reject(command);
                    // 如果工作线程数为0
                else if (workerCountOf(recheck) == 0)
                    // 添加一个null的工作包装对象
                    addWorker(null, false);
    
            } else if (!addWorker(command, false))
                // 如果添加到线程池中出错,执行拒接的线程
                reject(command);
        }
    
        // 创建一个原子类对象用于计算线程的中状态
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // integer.size 为 32
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // 即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务
        private static final int RUNNING    = -1 << COUNT_BITS;
        // 即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        // 即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务
        private static final int STOP       =  1 << COUNT_BITS;
        // 即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法
        private static final int TIDYING    =  2 << COUNT_BITS;
        // 即高3位为100,低29位为0,terminated()方法调用完成后变成此状态  
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // 用户计算线程的状态 32位中 高3位为1 低29位为0 
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 用于计算线程池中线程的数量 32位中 高3位为0  低29位为1
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // rs 为 runState, wc 为 workerCount 通过工作状态和线程数量来计算出 ctl
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        // 添加工作线程的方法
        private boolean addWorker(Runnable firstTask, boolean core) {
            // 设置循环跳出点,如果执行到某个位置,使用break,直接跳出的是这个标签范围内的所有循环
            retry:
            for (;;) {
                // 获取线程状态
                int c = ctl.get();
                int rs = runStateOf(c);
                // 判断线程池状态是否在shutdown上以及 状态不是关闭并且添加的线程不为空,并且线程队列中的线程不是空的
                if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                    // 如果满足上面条件,说明线程池已经不适合添加新的线程了, 直接返回false
                    return false;
                // 如果不满足上面条件,说明线程池可以添加线程, 下面这个循环主要是对ctl进行操作,保证在增1后线程状态保持同步
                for (;;) {
                    // 获取工作线程数量
                    int wc = workerCountOf(c);
                    // 判断当前线程池中工作线程数量是否大于线程容量,大于核心线程数或最大线程数
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        // 满足条件,说明当前线程不是适合添加新的线程的
                        return false;
                    // 如果工作数量少于最大量或者核心线程数或最大线程数, 工作线程数加1,即操作ctl,通过cas的方式
                    if (compareAndIncrementWorkerCount(c))
                        // 如果添加成功,跳出内循环,
                        break retry;
                    // 如果添加失败,重新获取ctl
                    c = ctl.get();  // Re-read ctl
                    // 判断此时线程池状态是否已经改变
                    if (runStateOf(c) != rs)
                        //如果状态不一致,跳过,重新循环
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            // 创建一个线程包装对象,用于包装线程
            Worker w = null;
            try {
                w = new Worker(firstTask);
                // 创建一个worker 工作线程
                final Thread t = w.thread;
                // 判断创建的线程是否为空
                if (t != null) {
                     // 如果不为空,获取锁对象
                    final ReentrantLock mainLock = this.mainLock;
                    // 开始加锁
                    mainLock.lock();
                    try {
                        // 获取线程池状态
                        int rs = runStateOf(ctl.get());
                        // 如果线程池状态是running或者线程池状态关闭并且传入的线程是空的
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 判断创建的工作线程是否是活动状态(已经开始还没有死掉)
                            if (t.isAlive()) // precheck that t is startable
                                // 如果是活动状态,抛出 非法线程状态异常 
                                throw new IllegalThreadStateException();
                            // 如果不是活动状态, 添加到set集合中,这个set集合只有持有mainlock才可以访问
                            workers.add(w);
                            // 获取集合长度
                            int s = workers.size();
                            // 如果存放刚才创建的workers工作线程的集合中的线程数超过最大的池的大小
                            if (s > largestPoolSize)
                                // 把set集合中的数量代替原线程池最大值
                                largestPoolSize = s;
    
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                    }
                    // 根据前面的判断是否需要开启线程,如果线程已经是活动的,不需要开启,如果不是活动线程,开启线程
                    if (workerAdded) {
                        t.start();
                        // 开启成功,设置workerStarted 为 true
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果工作线程开启失败,调用添加到失败的线程中
                if (! workerStarted)
                    // 从set中移除失败的线程,并且ctl减1, 并且尝试终止线程池
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
        // 线程开启失败后的方法
        private void addWorkerFailed(Worker w) {
            // 获取锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    // 如果线程不为空,从set集合中移除没有开启成功的线程
                    workers.remove(w);
                // 减去之前ctl增加的1
                decrementWorkerCount();
                // 尝试中断线程
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
        // 通过cas方式ctl加1
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        // 移除线程
        public boolean remove(Runnable task) {
            // 从等待队列中一尺线程
            boolean removed = workQueue.remove(task);
            // 尝试终止线程池
            tryTerminate(); // In case SHUTDOWN and now empty
            return removed;
        }
    
        // 使用拒绝处理对象执行拒接指定线程
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    
    

    execute() 方法的执行机制
    工作线程数小于核心线程数时,直接新建核心线程执行任务;
    大于核心线程数时,将任务添加进等待队列;
    队列满时,创建非核心线程执行任务;
    工作线程数大于最大线程数时,拒绝任务

    展开全文
  • 因此需要做到线程协同合作,一开始我马上使用了countDownLacth来进行协同实现,后来采用了线程池submit,也同样达到了协同(等待所有线程处理完)的效果。 这是以前的一个疑惑:...

    以前的一段时间,做首页数据报表功能时,采用了多线程分治的方式来处理(虽然后面因为考虑到数据量太大不适合将数据拉取到内存中处理而放弃),那时候留下了一个疑问,我需要保证所有线程都执行完之后然后处理数据吗,因此需要做到线程协同合作,一开始我马上使用了countDownLacth来进行协同实现,后来采用了线程池的submit,也同样达到了协同(等待所有线程处理完)的效果。

    这是以前的一个疑惑:https://blink.csdn.net/details/823641


    不过没有大佬回答我....在自己沉淀一段时间后实力有了提升,我现在卷土重来,自己解决这个疑惑!

    在这个方法中,测试了线程池的execute方法,是异步的,没啥问题!

    里面有一个细节,因为execute只接受Runnable接口,但是我们的任务又是callable接口,通过观察FuterTask类,可以发现他是继承于Runnable接口的,因此需要用到FuterTask来进行转换适配。

    还有一点我觉得,这个FuterTask名字起的很好,首先很明确,runnable这个接口只是一个任务,一些初学者经常会把runnable与现场混为一谈(我一开始也是!) 实际上,它们只是一个简单的任务,就是指示线程要去做什么的,因此这个命名很明确!

     

    第二次测试:submit

    在这个方法中我们测试了线程池的submit方法,从表现上来看,推断这是一种排队的方式执行的任务,也可以理解为同步的线程池.....本来使用线程池就是希望做到异步处理,咳咳....是异步了,但是没完全异步。

    接下来我们探究一下源码,看看submit他是做了什么操作!:

     啊这....翻车了,submit实际上和我们做的execute测试类似。。。帮我们进行了callable的转换,然后还是执行了execute方法.....

    那到底是为啥....接下来通过几个测试例子研究一下:

    ....省略过程,直接说结论吧

    经过排查发现,是因为调用了 futur.get方法,导致主线程进入阻塞,从而发生了让我们看起来像是任务排队的现象。接下来看一下get()的源码

    get的会获取当前任务的执行状态,判断是否完成,如果未完成,会进入到awaitDone方法,自选询问状态,直到成功为止。然后获取报告....接下来我们去看一下awaitDone方法!

    有以下的情况:

    检测执行任务的线程 是否发生异常,是的话则在阻塞队列中移除它!

    如果状态非正常,则返回状态值!

    如果当前的等待节点引用为null,则获取最新一个等待节点!

    如果以上情况都没发生,那么通过Unsafe类,由系统底层调用CAS,自旋将下一个等待节点替换到当前等待节点。如果成功则说明当前任务已经执行完毕,可以获取返回结果!

    展开全文
  • 线程池submit和execute方法原理

    千次阅读 2019-05-19 15:48:22
    线程池的作用 : 1. 避免大量的线程强占资源 2. 避免大量的线程创建和销毁带来的开销 线程池的原理 : 创建线程池的时候,可以使用executors的静态方法,也可以使用new ThreadPoolExecutor的方式手动创建线程池,通过...

    线程池的作用 :

    1. 避免大量的线程强占资源
    2. 避免大量的线程创建和销毁带来的开销  
    

    线程池的原理 :

    • 创建线程池的时候,可以使用executors的静态方法,也可以使用new ThreadPoolExecutor的方式手动创建线程池,通过在线程池中指定参数达到创建不同类型的线程池的效果
    • 其中,executors底层其实也是调用的new ThreadPoolExecutor()的方式创建的,是对不同线程池的封装,
    • 线程的执行有两种方式,一种是submit(runnable v)的形式,一种是execute(runnable b) 的形式,不同的是submit可以返回一个future的实现类,相同的一点是submit底层其实也是调用的execute
    • 调用execut方法,首先判断传入的参数是否为空,如果为空,抛出异常,如果不为空,使用获取ctl值,计算出当前线程状态码,通过状态码计算出当前线程池工作线程是否小于核心线程数量
    • 如果小于,判断添加工作线程操作是否正常,如果正常,直接返回,如果不正常,继续执行获取ctl值,在添加工作线程的过程中,首先通过循环的方式保证ctl在加1的情况下状态同步,如果不同步,一直循环到同步为止,添加完成后,创建线程工作对象,把工作线程添加到set集合中,并执行.start,如果执行不成功,从set中删除添加的worker对象,并且ctl回滚到之前没有自增的值.
    • 如果上述中添加工作线程失败,或者当前线程池中工作线程数量操作和信息数量,执行下列逻辑
    • 判断当前线程池状态是否是running状态:
      • 如果不是running状态,或者是running状态,并且添加到线程队列失败,重新添加个工作线程,此时入参中第二个参数用于添加工作线程的逻辑中当前工作线程数量与最大线程数量做对比,如果添加失败,执行reject处理类处理

      • 如果是running状态,并且添加队列成功,重新获取ctl值,判断当前线程池状态如果是不是running状态,并且从对象中删除成功,则当前线程交给拒绝线程处理器处理,如果不满足上面条件,判断当前线程池的工作线程数如果为0,重新添加一个不带任务的线程.

    //AbstractExecutorService.java文件
        // executorService 中的 submit 方法
        public Future<?> submit(Runnable task) {
            // 首先判断传入的runnable 对象是否为空
            if (task == null) throw new NullPointerException();
            // 创建一个 futuretask 对象
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        // 根据runnable 创建一个futuretask对象
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
    
        // ThreadPoolExecutor.java文件
        // 执行创建线程池的方法
        public void execute(Runnable command) {
            // 首先判断传入的线程是否为空
            if (command == null)
                // 为空,抛出异常
                throw new NullPointerException();
            // 获取线程池的状态码, 这个状态码是自增的,原子类型的自增, 在执行addworker后ctl会加1
            int c = ctl.get();
            // 通过状态码,获取线程池中的线程的数量,如果小于核心数量
            if (workerCountOf(c) < corePoolSize) {
                // 添加线程到线程池,并且为true时使用核心线程数作为边界,如果false ,使用最大数量线程数作为边界
                if (addWorker(command, true))
                    // 添加完成后,返回
                    return;
                // 如果添加失败,重新获取状态值
                c = ctl.get();
            }
            // 执行下面逻辑有两种情况
            //      1. 工作线程数大于核心线程
            //      2. 添加线程时出错
            // 如果线程池中线程的数量大于核心的数量, 判断如果是运行状态, 并且也把线程加进了阻塞队列 workQueue 中
            if (isRunning(c) && workQueue.offer(command)) {
                // 重新获取 线程池 状态值
                int recheck = ctl.get();
                // 判断当前线程池如果不是运行状态,并且成功从队列中移除(从workQueue中移除线程, 并尝试终止线程池)
                if (! isRunning(recheck) && remove(command))
                    // 执行拒绝执行线程的处理
                    reject(command);
                    // 如果工作线程数为0
                else if (workerCountOf(recheck) == 0)
                    // 添加一个null的工作包装对象
                    addWorker(null, false);
    
            } else if (!addWorker(command, false))
                // 如果添加到线程池中出错,执行拒接的线程
                reject(command);
        }
    
        // 创建一个原子类对象用于计算线程的中状态
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // integer.size 为 32
        private static final int COUNT_BITS = Integer.SIZE - 3;
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // 即高3位为1,低29位为0,该状态的线程池会接收新任务,也会处理在阻塞队列中等待处理的任务
        private static final int RUNNING    = -1 << COUNT_BITS;
        // 即高3位为0,低29位为0,该状态的线程池不会再接收新任务,但还会处理已经提交到阻塞队列中等待处理的任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        // 即高3位为001,低29位为0,该状态的线程池不会再接收新任务,不会处理在阻塞队列中等待的任务,而且还会中断正在运行的任务
        private static final int STOP       =  1 << COUNT_BITS;
        // 即高3位为010,低29位为0,所有任务都被终止了,workerCount为0,为此状态时还将调用terminated()方法
        private static final int TIDYING    =  2 << COUNT_BITS;
        // 即高3位为100,低29位为0,terminated()方法调用完成后变成此状态  
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // 用户计算线程的状态 32位中 高3位为1 低29位为0 
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 用于计算线程池中线程的数量 32位中 高3位为0  低29位为1
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // rs 为 runState, wc 为 workerCount 通过工作状态和线程数量来计算出 ctl
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        // 添加工作线程的方法
        private boolean addWorker(Runnable firstTask, boolean core) {
            // 设置循环跳出点,如果执行到某个位置,使用break,直接跳出的是这个标签范围内的所有循环
            retry:
            for (;;) {
                // 获取线程状态
                int c = ctl.get();
                int rs = runStateOf(c);
                // 判断线程池状态是否在shutdown上以及 状态不是关闭并且添加的线程不为空,并且线程队列中的线程不是空的
                if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                    // 如果满足上面条件,说明线程池已经不适合添加新的线程了, 直接返回false
                    return false;
                // 如果不满足上面条件,说明线程池可以添加线程, 下面这个循环主要是对ctl进行操作,保证在增1后线程状态保持同步
                for (;;) {
                    // 获取工作线程数量
                    int wc = workerCountOf(c);
                    // 判断当前线程池中工作线程数量是否大于线程容量,大于核心线程数或最大线程数
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        // 满足条件,说明当前线程不是适合添加新的线程的
                        return false;
                    // 如果工作数量少于最大量或者核心线程数或最大线程数, 工作线程数加1,即操作ctl,通过cas的方式
                    if (compareAndIncrementWorkerCount(c))
                        // 如果添加成功,跳出内循环,
                        break retry;
                    // 如果添加失败,重新获取ctl
                    c = ctl.get();  // Re-read ctl
                    // 判断此时线程池状态是否已经改变
                    if (runStateOf(c) != rs)
                        //如果状态不一致,跳过,重新循环
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            // 创建一个线程包装对象,用于包装线程
            Worker w = null;
            try {
                w = new Worker(firstTask);
                // 创建一个worker 工作线程
                final Thread t = w.thread;
                // 判断创建的线程是否为空
                if (t != null) {
                     // 如果不为空,获取锁对象
                    final ReentrantLock mainLock = this.mainLock;
                    // 开始加锁
                    mainLock.lock();
                    try {
                        // 获取线程池状态
                        int rs = runStateOf(ctl.get());
                        // 如果线程池状态是running或者线程池状态关闭并且传入的线程是空的
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 判断创建的工作线程是否是活动状态(已经开始还没有死掉)
                            if (t.isAlive()) // precheck that t is startable
                                // 如果是活动状态,抛出 非法线程状态异常 
                                throw new IllegalThreadStateException();
                            // 如果不是活动状态, 添加到set集合中,这个set集合只有持有mainlock才可以访问
                            workers.add(w);
                            // 获取集合长度
                            int s = workers.size();
                            // 如果存放刚才创建的workers工作线程的集合中的线程数超过最大的池的大小
                            if (s > largestPoolSize)
                                // 把set集合中的数量代替原线程池最大值
                                largestPoolSize = s;
    
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                    }
                    // 根据前面的判断是否需要开启线程,如果线程已经是活动的,不需要开启,如果不是活动线程,开启线程
                    if (workerAdded) {
                        t.start();
                        // 开启成功,设置workerStarted 为 true
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果工作线程开启失败,调用添加到失败的线程中
                if (! workerStarted)
                    // 从set中移除失败的线程,并且ctl减1, 并且尝试终止线程池
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
        // 线程开启失败后的方法
        private void addWorkerFailed(Worker w) {
            // 获取锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    // 如果线程不为空,从set集合中移除没有开启成功的线程
                    workers.remove(w);
                // 减去之前ctl增加的1
                decrementWorkerCount();
                // 尝试中断线程
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
        // 通过cas方式ctl加1
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
        // 移除线程
        public boolean remove(Runnable task) {
            // 从等待队列中一尺线程
            boolean removed = workQueue.remove(task);
            // 尝试终止线程池
            tryTerminate(); // In case SHUTDOWN and now empty
            return removed;
        }
    
        // 使用拒绝处理对象执行拒接指定线程
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    
    
    
    展开全文
  • 线程池中提交任务的submit方法不是阻塞方法,而Future.get方法是一个阻塞方法,当submit提交多个任务时,只有所有任务都完成后,才能使用get按照任务的提交顺序得到返回结果。 Future虽然可以实现获取异步执行结果...
  • Java创建线程有三种方法,1是继承Thread类,2是实现Runnable接口...执行submit()方法会返回一个Future。这个future.get()可以阻塞获取值 demo- 21-40 行是创建一个线程池,重点关注42,49,57代码快 package study4; i
  • 小伙伴们知道线程池submit和execute的区别都有哪些嘛?它们都是java线程池中较常用的,下面就让我们一起来看看它们有哪些区别吧。一、返回值类别submit()方法可以提供Future < T > 类型的返回值。executor()...
  • 有关线程池ExecutorService,只谈submit的使用 可创建的类型如下: private static ExecutorService pool = Executors.newFixedThreadPool(20);//创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中...
  • 线程池submit的坑

    千次阅读 2019-07-31 21:22:08
    线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成,而使用 get(long timeout,...
  • 血的教训之背景:使用线程池对存量数据进行迁移,但是总有一批数据迁移失败,无异常日志打印 凶案起因 ​ 听说parallelStream并行流是个好东西,由于日常开发stream串行流的场景比较多,这次需要写迁移程序刚好...
  • 线程池submit和map的应用

    千次阅读 2019-02-23 16:15:44
    线程池submit的应用 import time def timeit(f): def wrapper(*args, **kwargs): start_time = time.time() res = f(*args, **kwargs) end_time = time.time() print("%s函数运行时间:%.2f"...
  • 一心多用多线程-细谈java线程池submit与execute的区别

    万次阅读 多人点赞 2016-11-24 01:56:28
    深夜学习,发现ThreadPoolExecutor里面一个小知识点,故开热点连wifi怒写submit与execute方法的区别。1.问题的来源在看书的时候,涉及到java线程池问题的时候常常面临这样一个问题。当定义了一个Runnable对象想提交...
  • 通过案例分析线程池类Executors相关的ExecutorService的submit方法和Callable的call方法调用流程。有这样一个需求:获取多个任务的返回结果,根据这些任务的返回结果来决定当前线程该做什么。
  • 线程池submit和execute

    2022-03-06 23:55:28
    1、execute:执行线程池中的线程,但不能得到线程执行的返回值 2、submit:执行线程池中的线程,返回Future对象,可以得到线程执行的返回值、线程执行成功或失败结果、执行发生的异常。
  • 在研究ThreadPoolExecutor线程池的时候,发现可以有两种启动线程的方法:submit(Runnable runnable),excute(Runnable runnable) 先说个结论吧: submit()方法,可以提供Future &amp;lt; T &amp;gt; 类型...
  • python中的线程池submit应用

    千次阅读 2019-01-21 21:36:41
    代码描述: _thread, threading, multiprocessing ...线程池里面的线程越多越好? import time def timeit(f): def wrapper(*args, **kwargs): start_time = time.time() res = f(*args, **kwarg...
  • java线程池_submit()

    千次阅读 2021-06-10 22:30:17
    submit()方法 有三个重载方法: submit(Callable<T> task)//参数为...线程池测试类 package com.thread.pool; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor
  • 在使用线程池时,我们都知道线程池有两种提交任务的方式,那么他们有什么区别呢?1.execute提交的是Runnable类型的任务,而submit提交的是Callable或者Runnable类型的任务2.execute的提交没有返回值,而submit的提交...
  • 线程池submit方法

    2019-10-03 20:25:35
    ThreadPoolExetor#submit(Callable<T> task) 有返回值 ThreadPoolExetor#submit(Runnable task, T result) 有返回值,返回值是通过result间接获取的 ThreadPoolExetor#submit(Runnable runnable) 没有返回值...
  • 那么线程池submit提交方式,两种都可以作为参数传递。要知道submit是当需要返回值的情况下才使用,runnable是没有返回值,那么submit为什么可以接收两种呢,我们继续往下看。 submit方法解析 先看下submit的方法...
  • submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它...
  • Java线程池系列之execute和submit区别

    千次阅读 2022-01-08 11:40:43
    1. execute只能提交Runnable类型的任务,没有返回值,而submit既能提交Runnable类型任务也能提交Callable类型任务,返回Future类型。 2. execute方法提交的任务异常是直接抛出的,而submit方法是是捕获了异常的,当...
  • 线程池submit和execute区别

    万次阅读 2020-10-28 14:52:33
    线程池中的execute方法大家都不陌生,即开启线程执行池中的任务。还有一个方法submit也可以做到,它的功能是提交指定的任务去执行并且返回Future对象,即执行的结果。下面简要介绍一下两者的三个区别: 1、接收的...
  • 线程池中的execute方法大家都不陌生,即开启线程执行池中的任务。还有一个方法submit也可以做到,它的功能是提交指定的任务去执行并且返回Future对象,即执行的结果。下面简要介绍一下两者的三个区别: 1、接收的...
  • 使用工具类创建线程池 上一节我们已经自己实现了一个线程池,本节我们看看JDK提供的线程池是如何实现的? public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit...
  • MarkDown文档,意在说明线程池方法:submit()和execute()方法的区别
  • 线程池中的execute方法大家都不陌生,即开启线程执行池中的任务。还有一个方法submit也可以做到,它的功能是提交指定的任务去执行并且返回Future对象,即执行的结果。下面简要介绍一下两者的三个区别:1、接收的参数...
  • 线程池中有两个提交任务的方法 向线程池提交任务的两种方式大致如下: 方式一:调用execute()方法 // todo 源码 方式二:调用submit()方法 // todo 源码 一、区别 以上的submit()和execute()两类方法的区别在哪里呢...
  • 文章目录前言线程池通过submit提交任务,出现任务,会怎样? 前言   如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差,实在白嫖的话,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 64,289
精华内容 25,715
关键字:

线程池submit