精华内容
下载资源
问答
  • 但是我无法判断第步开出的15个线程是否结束,我只能手工屏蔽第二和第三部分代码,然后依次执行。 高手有什么办法没,如何判断15个线程是否结束。 就是如何<span style="color: red">...
  • 用来限制访问频率以及多线程情况下对某一资源访问资格的问题的解决方案:想象一个桶,每次你访问网络时候都要下桶内是否有剩余的令牌,如果有你可以拿走令牌去访问,没有你就等着令牌投进来再访问。一、最简单的:...

    提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

    文章目录

    什么是令牌桶?

    用来限制访问频率以及多线程情况下对某一资源访问资格的问题的解决方案:想象一个桶,每次你访问网络时候都要看下桶内是否有剩余的令牌,如果有你可以拿走令牌去访问,没有你就等着令牌投进来再访问。

    一、最简单的:单令牌

    class OneToken():

    def __init__(self):

    self.current_token = 1

    async def greenlight(self):

    while self.consume() == False:

    # print(".")

    await asyncio.sleep(0.01)

    def consume(self):

    if self.current_token:

    self.current_token=0

    return True

    else:

    return False

    def giveback(self):

    self.current_token=1

    在要用令牌的异步task/coroutine 的语句前面添加 await self.mytoken.greenlight()

    这样就可以限制后面部分的代码不会被两个协程同时调用。

    这个思路帮我解决了aiomysql 里,当执行一个插入语句和commit这两个动作都需要await时候,会出现其他的协程调用数据库,在两个动作期间插入其他动作造成bug的情况。

    请注意: 这里单令牌的交回也是由自己的程序做出的 建议用 try except finally 这种语法来释放token

    与线程锁lock差不多

    二、总量限制桶

    理解了简单的单个令牌后,我们设想一下,在一个小时内只能访问50次,但是不限制你这50的间隔。

    class LongWaitToken():

    def __init__(self,split,mytime):

    self.split = split

    self.currenttoken = split

    self.waittime = mytime

    self.lastconsumetime = time.time()

    def consume(self):

    if self.currenttoken>0:

    self.currenttoken-=1

    self.lastconsumetime = time.time()

    return True

    else:

    if time.time() -self.lastconsumetime>self.waittime:

    self.currenttoken = self.split

    self.lastconsumetime = time.time()

    return True

    else:

    return False

    async def greenlight(self, number=1):

    """

    Block and yeild the contral from the subprocess.

    :param number:

    :return:

    """

    while self.consume() == False:

    # print(".")

    await asyncio.sleep(1)

    逻辑与上面的单令牌类似,只是在查询还剩多少个令牌这一步(consume)时候,当所有令牌都没了以后,等待一个固定的时间,然后把桶装满。

    三、水流桶(完整桶)

    代码如下(示例):

    class TokenBucket(object):

    def __init__(self, name, rate, capacity,longwait = {}):

    """

    :param name: 单例模式下的 key

    :param rate: 这个实际上是多少秒增加一个token

    :param capacity: 桶内最大token数量(并发数量)

    """

    if name in created_bucket.keys():

    self = created_bucket[name]

    # 更新实例内参数 方便调用者 继承或者覆盖使用

    else:

    self._name = name

    self._rate = rate

    self._capacity = capacity

    self._current_token = capacity

    self._last_consum_time = time.time()

    if longwait!={}:

    self.longwait_init(longwait)

    else:

    self.use_longwait = False

    created_bucket[name] = self

    def longwait_init(self,longwait):

    self._longwait_split = longwait["split"]

    self._longwait_time = longwait["time"]

    self._longwait_current_left = longwait["split"]

    self.use_longwait = True

    def update(self,rate,capacity,longwait = {}):

    self._rate = rate

    self._capacity = capacity

    self._last_consum_time = time.time()

    if longwait != {}:

    self.longwait_init(longwait)

    created_bucket[self._name] = self

    def giveback(self, giveback: int = 1):

    """

    通知桶,有人将令牌拿回来了

    :return:

    """

    self._current_token = min(self._capacity, self._current_token + giveback)

    if self.use_longwait:

    self._longwait_current_left = min(self._longwait_split,self._longwait_current_left+1)

    def consume(self, token_amount):

    """

    根据时间计算桶内现有token量,待桶内量大于拿出量,程序返回True并在b桶内减少相应的token更取token时间 否则返回False 继续计时

    :param token_amount: 单次消耗几个token

    :return:

    """

    if token_amount > self._capacity:

    print("错误: 需求token大于桶内容量!")

    raise TokenExceedMax("希望获取的token数量大于桶内最大容量,逻辑死锁!!!")

    timefly = (time.time() - self._last_consum_time)

    increasement = int(timefly/ self._rate)

    # print(increasement)

    # 水流不能超过水桶容量

    self._current_token = min(self._capacity, self._current_token + increasement)

    # print("token token")

    if token_amount > self._current_token:

    return False

    else:

    if self.use_longwait:

    if self._longwait_current_left > 0:

    self._longwait_current_left -= 1

    self._current_token -= token_amount

    self._last_consum_time = time.time()

    return True

    else:

    if time.time() - self._last_consum_time>self._longwait_time :

    self._longwait_current_left = self._longwait_split

    self._current_token -= token_amount

    self._last_consum_time = time.time()

    return True

    else:

    return False

    else:

    self._current_token -= token_amount

    self._last_consum_time = time.time()

    return True

    async def greenlight(self, number=1):

    """

    Block and yeild the contral from the subprocess.

    :param number:

    :return:

    """

    while self.consume(number) == False:

    # print(".")

    await asyncio.sleep(0.01)

    与之前两个桶不同的在于 这个桶的填装方式是基于时间的,而不是基于动作的,每隔一段时间就会往桶里放相应多的令牌,令牌的数量可以跟你锁并行的访问数量相同,这样就相当于并发了,这种桶往往更容易调整到服务器访问限制相同的情况

    注意: 不要使用 await asyncio.sleep(0) 会让CPU跑满

    总结

    本文由简到繁介绍了三种程度的令牌桶,以及其正确的异步调用方法。

    在拿到令牌之前,实际的代码是在不停的await asyncio.sleep(0.01) 因此进程可以去执行其他协程,这里的0.01只是一个代表 我实际使用时候用的是0.2(根据你自己的情况调整,间隔小CPU占用稍高,间隔大,桶不精确程度增加)

    展开全文
  • 引言上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用... 我们先最后一个问题一般一个线程执行完任务之后就结...

    引言

    上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池 还留了几个小问题。。建议看这篇文章之前可先看下前面那篇文章。这篇文章我们就来分析下上篇文章的几个小问题

    • 线程池是否区分核心线程和非核心线程?
    • 如何保证核心线程不被销毁?
    • 线程池的线程是如何做到复用的? 我们先看最后一个问题一般一个线程执行完任务之后就结束了,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。如果你对一个已经启动的线程对象再调用一次start方法的话,会产生:IllegalThreadStateException异常,但是Thread的run方法是可以重复调用的。所以这里也会有一个面试经常问到的问题:Thread类中run()和start()方法的有什么区别? 下面我们就从jdk的源码来一起看看如何实现线程复用的: 线程池执行任务的ThreadPoolExecutor#execute方法为入口
     public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();        int c = ctl.get();     // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务     if (workerCountOf(c) 

    excute方法主要业务逻辑

    • 如果当前的线程池运行线程小于coreSize,则创建新线程来执行任务。
    • 如果当前运行的线程等于coreSize或多余coreSize(动态修改了coreSize才会出现这种情况),把任务放到阻塞队列中。
    • 如果队列已满无法将新加入的任务放进去的话,则需要创建新的线程来执行任务。
    • 如果新创建线程已经达到了最大线程数,任务将会被拒绝。

    addWorker 方法

    上述方法的核心主要就是addWorker方法,

    private boolean addWorker(Runnable firstTask, boolean core) {       // 前面还有一部分就省略了。。。。        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    if (rs  largestPoolSize)                            largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (! workerStarted)                addWorkerFailed(w);        }        return workerStarted;    }

    这个方法我们先看看这个work类吧

    private final class Worker        extends AbstractQueuedSynchronizer        implements Runnable    {             Worker(Runnable firstTask) {            setState(-1); // inhibit interrupts until runWorker            this.firstTask = firstTask;            this.thread = getThreadFactory().newThread(this);        }                public void run() {            runWorker(this);        }

    work类实现了Runnable接口,然后run方法里面调用了runWorker方法

    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        // 新增创建        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {             // 判断 task 是否为空,如果不为空直接执行         // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行            while (task != null || (task = getTask()) != null) {                w.lock();                if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                      runStateAtLeast(ctl.get(), STOP))) &&                    !wt.isInterrupted())                    wt.interrupt();                try {                    beforeExecute(wt, task);                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x; throw x;                    } catch (Error x) {                        thrown = x; throw x;                    } catch (Throwable x) {                        thrown = x; throw new Error(x);                    } finally {                        afterExecute(task, thrown);                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }

    这个runwork方法中会优先取worker绑定的任务,如果创建这个worker的时候没有给worker绑定任务,worker就会从队列里面获取任务来执行,执行完之后worker并不会销毁,而是通过while循环不停的执行getTask方法从阻塞队列中获取任务调用task.run()来执行任务,这样的话就达到了线程复用的目的。 while (task != null || (task = getTask()) != null) 这个循环条件只要getTask 返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。 那么任务执行完怎么保证核心线程不销毁?非核心线程销毁? 答案就在这个getTask()方法里面

    private Runnable getTask() {  // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true  // 这个标记非常之重要,下面会说到  boolean timedOut = false;  for (;;) {    // 获取ctl变量值    int c = ctl.get();    int rs = runStateOf(c);    // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP    // 则操作AQS减少工作线程数量,并且返回null,线程被回收    // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {      // 操作AQS将线程池中的线程数量减一      decrementWorkerCount();      return null;    }    // 获取线程池中的有效线程数量    int wc = workerCountOf(c);    // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的    // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收    // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        // 这里说明了两点销毁线程的条件:    // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,    // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了    // 以上两点满足其一,都可以触发线程超时回收    if ((wc > maximumPoolSize || (timed && timedOut))        && (wc > 1 || workQueue.isEmpty())) {      // 尝试用AQS将线程池线程数量减一      if (compareAndDecrementWorkerCount(c))        // 减一成功后返回null,线程被回收        return null;      // 否则循环重试      continue;    }    try {      // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务      Runnable r = timed ?        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :      workQueue.take();      if (r != null)        return r;      // 如果poll超时获取任务超时了, 将timeOut设置为true      // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了      timedOut = true;    } catch (InterruptedException retry) {      timedOut = false;    }  }}

    所以保证线程不被销毁的关键代码就是这一句代码

       Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

    只要timed为false这个workQueue.take()就会一直阻塞,也就保证了线程不会被销毁。timed的值又是通过allowCoreThreadTimeOut和正在运行的线程数量是否大于coreSize控制的。

    • 只要getTask方法返回null 我们的线程就会被回收(runWorker方法会调用processWorkerExit)
    • 这个方法的源码也就解释了为什么我们在创建线程池的时候设置了allowCoreThreadTimeOut =true的话,核心线程也会进行销毁。
    • 通过这个方法我也们可以回答上面那个问题线程池是不区分核心线程和非核心线程的。

    结束

    • 由于自己才疏学浅,难免会有纰漏,假如你发现了错误的地方,还望留言给我指出来,我会对其加以修正。
    • 如果你觉得文章还不错,你的转发、分享、赞赏、点赞、留言就是对我最大的鼓励。
    • 感谢您的阅读,十分欢迎并感谢您的关注。 巨人的肩膀摘苹果 http://objcoding.com/2019/04/25/threadpool-running/
    bf49851767e6f73625ca1320439c95cc.png
    展开全文
  • 引言上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池 ...我们先最后一个问题一般一个线程执行完任务...

    引言

    上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池 还留了几个小问题。。建议看这篇文章之前可先看下前面那篇文章。这篇文章我们就来分析下上篇文章的几个小问题

    • 线程池是否区分核心线程和非核心线程?
    • 如何保证核心线程不被销毁?
    • 线程池的线程是如何做到复用的?

    我们先看最后一个问题一般一个线程执行完任务之后就结束了,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。如果你对一个已经启动的线程对象再调用一次start方法的话,会产生:IllegalThreadStateException异常,但是Threadrun方法是可以重复调用的。所以这里也会有一个面试经常问到的问题:「Thread类中run()和start()方法的有什么区别?」下面我们就从jdk的源码来一起看看如何实现线程复用的:线程池执行任务的ThreadPoolExecutor#execute方法为入口
     public void execute(Runnable command) {
         if (command == null)
             throw new NullPointerException();
       
         int c = ctl.get();
         // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务
         if (workerCountOf(c)          if (addWorker(command, true))
                 return;
             c = ctl.get();
         }
         // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中
         if (isRunning(c) && workQueue.offer(command)) {
          // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态
             int recheck = ctl.get();
             // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队
             if (! isRunning(recheck) && remove(command))
                 reject(command);
             // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务
             else if (workerCountOf(recheck) == 0)
                 addWorker(nullfalse);
         }
         // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务
         // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略
         else if (!addWorker(command, false))
             reject(command);
     }

    「excute」方法主要业务逻辑

    • 如果当前的线程池运行线程小于「coreSize」,则创建新线程来执行任务。
    • 如果当前运行的线程等于「coreSize」或多余「coreSize」(动态修改了coreSize才会出现这种情况),把任务放到阻塞队列中。
    • 如果队列已满无法将新加入的任务放进去的话,则需要创建新的线程来执行任务。
    • 如果新创建线程已经达到了最大线程数,任务将会被拒绝。

    addWorker 方法

    上述方法的核心主要就是addWorker方法,

    private boolean addWorker(Runnable firstTask, boolean core) {
           // 前面还有一部分就省略了。。。。

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());

                        if (rs                         (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    这个方法我们先看看这个「work」类吧

    private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{
         
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
            
            public void run() {
                runWorker(this);
            }

    「work」类实现了「Runnable」接口,然后run方法里面调用了「runWorker」方法

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            // 新增创建
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                 // 判断 task 是否为空,如果不为空直接执行
             // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

    这个runwork方法中会优先取worker绑定的任务,如果创建这个worker的时候没有给worker绑定任务,worker就会从队列里面获取任务来执行,执行完之后worker并不会销毁,而是通过while循环不停的执行getTask方法从阻塞队列中获取任务调用task.run()来执行任务,这样的话就达到了线程复用的目的。

    while (task != null || (task = getTask()) != null) 这个循环条件只要getTask 返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。「那么任务执行完怎么保证核心线程不销毁?非核心线程销毁?」答案就在这个getTask()方法里面

    private Runnable getTask() {
      // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true
      // 这个标记非常之重要,下面会说到
      boolean timedOut = false;
      for (;;) {
        // 获取ctl变量值
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP
        // 则操作AQS减少工作线程数量,并且返回null,线程被回收
        // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
          // 操作AQS将线程池中的线程数量减一
          decrementWorkerCount();
          return null;
        }

        // 获取线程池中的有效线程数量
        int wc = workerCountOf(c);

        // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的
        // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收
        // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 这里说明了两点销毁线程的条件:
        // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,
        // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
        // 以上两点满足其一,都可以触发线程超时回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          // 尝试用AQS将线程池线程数量减一
          if (compareAndDecrementWorkerCount(c))
            // 减一成功后返回null,线程被回收
            return null;
          // 否则循环重试
          continue;
        }

        try {
          // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务
          Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
          if (r != null)
            return r;
          // 如果poll超时获取任务超时了, 将timeOut设置为true
          // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了
          timedOut = true;
        } catch (InterruptedException retry) {
          timedOut = false;
        }
      }
    }

    所以保证线程不被销毁的关键代码就是这一句代码

       Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

    只要timedfalse这个workQueue.take()就会一直阻塞,也就保证了线程不会被销毁。timed的值又是通过allowCoreThreadTimeOut和正在运行的线程数量是否大于coreSize控制的。

    • 只要getTask方法返回null 我们的线程就会被回收(runWorker方法会调用processWorkerExit)
    • 这个方法的源码也就解释了为什么我们在创建线程池的时候设置了allowCoreThreadTimeOut =true的话,核心线程也会进行销毁。
    • 通过这个方法我也们可以回答上面那个问题线程池是不区分核心线程和非核心线程的。

    END

    推荐好文

    强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

    为什么MySQL不推荐使用uuid或者雪花id作为主键?

    为什么建议大家使用 Linux 开发?爽(外加七个感叹号)

    IntelliJ IDEA 15款 神级超级牛逼插件推荐(自用,真的超级牛逼)

    炫酷,SpringBoot+Echarts实现用户访问地图可视化(附源码)

    记一次由Redis分布式锁造成的重大事故,避免以后踩坑!

    十分钟学会使用 Elasticsearch 优雅搭建自己的搜索系统(附源码)

    b2e123b8275e8217519d0294c2434bc7.png

    展开全文
  • 引言上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池 ...我们先最后一个问题一般一个线程执行完任务...

    引言

    上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用线程池 还留了几个小问题。。建议看这篇文章之前可先看下前面那篇文章。这篇文章我们就来分析下上篇文章的几个小问题

    • 线程池是否区分核心线程和非核心线程?
    • 如何保证核心线程不被销毁?
    • 线程池的线程是如何做到复用的?

    我们先看最后一个问题一般一个线程执行完任务之后就结束了,Thread.start()只能调用一次,一旦这个调用结束,则该线程就到了stop状态,不能再次调用start。如果你对一个已经启动的线程对象再调用一次start方法的话,会产生:IllegalThreadStateException异常,但是Threadrun方法是可以重复调用的。所以这里也会有一个面试经常问到的问题:「Thread类中run()和start()方法的有什么区别?」下面我们就从jdk的源码来一起看看如何实现线程复用的:线程池执行任务的ThreadPoolExecutor#execute方法为入口
     public void execute(Runnable command) {
         if (command == null)
             throw new NullPointerException();
       
         int c = ctl.get();
         // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务
         if (workerCountOf(c)          if (addWorker(command, true))
                 return;
             c = ctl.get();
         }
         // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中
         if (isRunning(c) && workQueue.offer(command)) {
          // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态
             int recheck = ctl.get();
             // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队
             if (! isRunning(recheck) && remove(command))
                 reject(command);
             // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务
             else if (workerCountOf(recheck) == 0)
                 addWorker(nullfalse);
         }
         // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务
         // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略
         else if (!addWorker(command, false))
             reject(command);
     }

    「excute」方法主要业务逻辑

    • 如果当前的线程池运行线程小于「coreSize」,则创建新线程来执行任务。
    • 如果当前运行的线程等于「coreSize」或多余「coreSize」(动态修改了coreSize才会出现这种情况),把任务放到阻塞队列中。
    • 如果队列已满无法将新加入的任务放进去的话,则需要创建新的线程来执行任务。
    • 如果新创建线程已经达到了最大线程数,任务将会被拒绝。

    addWorker 方法

    上述方法的核心主要就是addWorker方法,

    private boolean addWorker(Runnable firstTask, boolean core) {
           // 前面还有一部分就省略了。。。。

            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());

                        if (rs                         (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }

    这个方法我们先看看这个「work」类吧

    private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{
         
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
            
            public void run() {
                runWorker(this);
            }

    「work」类实现了「Runnable」接口,然后run方法里面调用了「runWorker」方法

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            // 新增创建
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                 // 判断 task 是否为空,如果不为空直接执行
             // 如果 task 为空,调用 getTask() 方法,从 workQueue 中取出新的 task 执行
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }

    这个runwork方法中会优先取worker绑定的任务,如果创建这个worker的时候没有给worker绑定任务,worker就会从队列里面获取任务来执行,执行完之后worker并不会销毁,而是通过while循环不停的执行getTask方法从阻塞队列中获取任务调用task.run()来执行任务,这样的话就达到了线程复用的目的。

    while (task != null || (task = getTask()) != null) 这个循环条件只要getTask 返回获取的值不为空这个循环就不会终止, 这样线程也就会一直在运行。「那么任务执行完怎么保证核心线程不销毁?非核心线程销毁?」答案就在这个getTask()方法里面

    private Runnable getTask() {
      // 超时标记,默认为false,如果调用workQueue.poll()方法超时了,会标记为true
      // 这个标记非常之重要,下面会说到
      boolean timedOut = false;
      for (;;) {
        // 获取ctl变量值
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果当前状态大于等于SHUTDOWN,并且workQueue中的任务为空或者状态大于等于STOP
        // 则操作AQS减少工作线程数量,并且返回null,线程被回收
        // 也说明假设状态为SHUTDOWN的情况下,如果workQueue不为空,那么线程池还是可以继续执行剩下的任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
          // 操作AQS将线程池中的线程数量减一
          decrementWorkerCount();
          return null;
        }

        // 获取线程池中的有效线程数量
        int wc = workerCountOf(c);

        // 如果主动开启allowCoreThreadTimeOut,或者获取当前工作线程大于corePoolSize,那么该线程是可以被超时回收的
        // allowCoreThreadTimeOut默认为false,即默认不允许核心线程超时回收
        // 这里也说明了在核心线程以外的线程都为“临时”线程,随时会被线程池回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        // 这里说明了两点销毁线程的条件:
        // 1.原则上线程池数量不可能大于maximumPoolSize,但可能会出现并发时操作了setMaximumPoolSize方法,如果此时将最大线程数量调少了,很可能会出现当前工作线程大于最大线程的情况,这时就需要线程超时回收,以维持线程池最大线程小于maximumPoolSize,
        // 2.timed && timedOut 如果为true,表示当前操作需要进行超时控制,这里的timedOut为true,说明该线程已经从workQueue.poll()方法超时了
        // 以上两点满足其一,都可以触发线程超时回收
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
          // 尝试用AQS将线程池线程数量减一
          if (compareAndDecrementWorkerCount(c))
            // 减一成功后返回null,线程被回收
            return null;
          // 否则循环重试
          continue;
        }

        try {
          // 如果timed为true,阻塞超时获取任务,否则阻塞获取任务
          Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
          workQueue.take();
          if (r != null)
            return r;
          // 如果poll超时获取任务超时了, 将timeOut设置为true
          // 继续循环执行,如果碰巧开发者开启了allowCoreThreadTimeOut,那么该线程就满足超时回收了
          timedOut = true;
        } catch (InterruptedException retry) {
          timedOut = false;
        }
      }
    }

    所以保证线程不被销毁的关键代码就是这一句代码

       Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

    只要timedfalse这个workQueue.take()就会一直阻塞,也就保证了线程不会被销毁。timed的值又是通过allowCoreThreadTimeOut和正在运行的线程数量是否大于coreSize控制的。

    • 只要getTask方法返回null 我们的线程就会被回收(runWorker方法会调用processWorkerExit)
    • 这个方法的源码也就解释了为什么我们在创建线程池的时候设置了allowCoreThreadTimeOut =true的话,核心线程也会进行销毁。
    • 通过这个方法我也们可以回答上面那个问题线程池是不区分核心线程和非核心线程的。

    END

    推荐好文

    强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

    为什么MySQL不推荐使用uuid或者雪花id作为主键?

    为什么建议大家使用 Linux 开发?爽(外加七个感叹号)

    IntelliJ IDEA 15款 神级超级牛逼插件推荐(自用,真的超级牛逼)

    炫酷,SpringBoot+Echarts实现用户访问地图可视化(附源码)

    记一次由Redis分布式锁造成的重大事故,避免以后踩坑!

    十分钟学会使用 Elasticsearch 优雅搭建自己的搜索系统(附源码)

    1705ac23e683b0f9c82c8cb781559025.png

    展开全文
  • 1. 使用条件变量  判断是否任务B已经做,然后再执行任务A。...CountDownLatch其实可以把它看作一个计数器,只不过这个计数器的操作是原子操作,同时只能有一个线程去操作这个计数器,也就是同时只能有...
  • 导语上一讲介绍了虚拟机是如何利用可达性算法,判断一个对象是否需要回收,而HotSpot在实现这个算法时,必须对算法的执行效率有严格的要求,才能保证虚拟机的高效运行,那么,HotSpot是如何实现的呢?本文是...
  • 金融java_java金融

    2021-02-12 10:27:13
    引言上一篇文章我们有介绍过线程池的一个基本执行流程《【Java并发编程】面试必备之线程池》以及它的7个核心参数,以及每个参数的作用、以及如何去使用...我们先最后一个问题一般一个线程执行完任务之后就结束了...
  • 也可以用工具IceSword(冰刃)中"文件/设置/禁止进线程创建",来停掉其中一个进程,再停掉另一个进程,杀掉病毒。  3、对于像被"熊猫烧香"感染的EXE文件,上述两种手工处理无效,因为无法手工清除受病毒感染的文件...
  • 竟态条件

    2021-01-01 12:28:47
    如果线程1执行完一个add_event,线程2把set释放了,线程1再执行第二个add_event,这样它将引用了一个释放掉的内存空间,这样就会导致程序崩溃。 请看看是否会出现这个的问题呢?...
  • JAVA源码,媒体网络,山寨QQ,Java聊天程序 Java编写的山寨QQ,多人聊天+用户在线,程序分服务端和客户端,典型C/S结构, 当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,...
  • JAVA源码,媒体网络,山寨QQ,Java聊天程序 Java编写的山寨QQ,多人聊天+用户在线,程序分服务端和客户端,典型C/S结构, 当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,...
  • java线程池概念.txt

    2019-08-16 10:14:03
    corePoolSize:核心池的大小,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; ...
  • 52、当一个线程进入一个对象的一个synchronized方法后,其它线程是否可进入此对象的其它方法? 33 53、线程的基本概念、线程的基本状态以及状态之间的关系 34 54、简述synchronized和java.util.concurrent.locks.Lock...
  • JavaScript中如何判断一个字符串是否为合法日期 JavaServer Page(JSP)简介1 JavaServer Pages (JSP) 1.0简单介绍 ---III JavaServer PagesTM 白皮书 Java布局管理器深入讨论 Java代码编写的一般性指导 JAVA的...
  • vc++ 应用源码包_1

    热门讨论 2012-09-15 14:22:12
    压缩包内有两个源码包,一个是注册机源程序,另一个是解密机的源程序,一套完整的参考实例。 VC+MapX源码含GPS跟踪演示 VC3D 利用VC编程在界面上实现3D文字 在MFC应用程序中浏览PDF、Word文档文件 vcdialog 自...
  • vc++ 应用源码包_2

    热门讨论 2012-09-15 14:27:40
    压缩包内有两个源码包,一个是注册机源程序,另一个是解密机的源程序,一套完整的参考实例。 VC+MapX源码含GPS跟踪演示 VC3D 利用VC编程在界面上实现3D文字 在MFC应用程序中浏览PDF、Word文档文件 vcdialog 自...
  • vc++ 应用源码包_6

    热门讨论 2012-09-15 14:59:46
    压缩包内有两个源码包,一个是注册机源程序,另一个是解密机的源程序,一套完整的参考实例。 VC+MapX源码含GPS跟踪演示 VC3D 利用VC编程在界面上实现3D文字 在MFC应用程序中浏览PDF、Word文档文件 vcdialog 自...
  • vc++ 应用源码包_5

    热门讨论 2012-09-15 14:45:16
    压缩包内有两个源码包,一个是注册机源程序,另一个是解密机的源程序,一套完整的参考实例。 VC+MapX源码含GPS跟踪演示 VC3D 利用VC编程在界面上实现3D文字 在MFC应用程序中浏览PDF、Word文档文件 vcdialog 自...
  • vc++ 开发实例源码包

    2014-12-16 11:25:17
    另外有只打开一个应用程序、CRichEdit的使用、最小到托盘、自动检测在线用户(多播组)等。 FreeBird2011Dlg.h 主对话框类头文件 MultiGroup.h 多播组类头文件 UserLink.h 用户链表类头文件 ListenSocket.h 侦听接口...
  • vc++ 应用源码包_4

    热门讨论 2012-09-15 14:38:35
    压缩包内有两个源码包,一个是注册机源程序,另一个是解密机的源程序,一套完整的参考实例。 VC+MapX源码含GPS跟踪演示 VC3D 利用VC编程在界面上实现3D文字 在MFC应用程序中浏览PDF、Word文档文件 vcdialog 自...
  • vc++ 应用源码包_3

    热门讨论 2012-09-15 14:33:15
    压缩包内有两个源码包,一个是注册机源程序,另一个是解密机的源程序,一套完整的参考实例。 VC+MapX源码含GPS跟踪演示 VC3D 利用VC编程在界面上实现3D文字 在MFC应用程序中浏览PDF、Word文档文件 vcdialog 自...
  • java源码包---java 源码 大量 实例

    千次下载 热门讨论 2013-04-18 23:15:26
     当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,启动该通讯线程,通讯完毕,关闭Scoket。  QQ客户端登录界面,中部有三个JPanel,有一个叫选项卡窗口管理。还可以更新...
  • JAVA源码,媒体网络,山寨QQ,Java聊天程序 Java编写的山寨QQ,多人聊天+用户在线,程序分服务端和客户端,典型C/S结构, 当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,...
  • JAVA上百实例源码以及开源项目

    千次下载 热门讨论 2016-01-03 17:37:40
     当用户发送第一次请求的时候,验证用户登录,创建一个该qq号和服务器端保持通讯连接得线程,启动该通讯线程,通讯完毕,关闭Scoket。  QQ客户端登录界面,中部有三个JPanel,有一个叫选项卡窗口管理。还可以更新...
  • love爱意表达部分,程序员表白神器,简易含蓄的单身程序员可以看看该模块,可以给女朋友一个惊喜! 玩Android部分,接口是鸿洋大神开放的api,学习kotlin时所写 markDown格式笔记本,支持md格式,数据是保存到本地。...
  • 所以第步,我们将学习如何得到oSIP的静态和动态链接库,以便我们自己的程序能够使用它们来成功编译和执行我们的程序。 第阶段: ----------------------------------...
  • 二十三种设计模式【PDF版】

    热门讨论 2011-05-30 14:13:49
    之道 》,其中很多观点我了很受启发,以前我也将"设计模式" 看成一个简单的解决方案,没有从一种高度来看待"设计模式"在软 件中地位,下面是我自己的一些想法: 建筑和软件某些地方是可以来比喻的 特别是中国传统建筑...

空空如也

空空如也

1 2 3 4
收藏数 68
精华内容 27
关键字:

如何看一个线程是否执行完