精华内容
下载资源
问答
  • LinkedBlockingQueue 是线程池默认使用的任务队列,为了满足多线程环境下元素出入队列的安全性,其主要有以下特点: LinkedBlockingQueue 是基于链表结构的阻塞队列,默认容量为 Integer.MAX_VALUE,可以

    1. LinkedBlockingQueue 简介

    LinkedBlockingQueue 是线程池默认使用的任务队列,为了满足多线程环境下元素出入队列的安全性,其主要有以下特点:

    1. LinkedBlockingQueue 是基于链表结构的阻塞队列,默认容量为 Integer.MAX_VALUE,可以认为是无界队列
    2. LinkedBlockingQueue 队列按 FIFO(先进先出)排序元素
    3. LinkedBlockingQueue 内部使用两个独占锁来保证线程安全,其中写入锁用于控制添加元素的操作,取出锁用于控制取出元素的操作。新元素从队列尾部入队,取出元素则从队列头部移除,因此可以同时在队列头部和队列尾部并发地进行元素取出、添加操作

    2. LinkedBlockingQueue 的关键属性

    LinkedBlockingQueue 中添加到队列的数据都将被封装成 Node 节点,可以看到 Node 的结构比较简单,只有一个后指针指向下一个节点,形成单向链表结构

    static class Node<E> {
            E item;
            
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    

    LinkedBlockingQueue 中关键属性如下,其中 headlast 分别指向队列的头节点和尾节点。LinkedBlockingQueue 内部分别使用 takeLockputLock 两个独占锁对并发进行控制,在高并发的情况下生产者和消费者可以并行地操作队列中的数据,也就大大提高了队列的并发性能

    // 最大容量上限,默认是 Integer.MAX_VALUE
    private final int capacity;
    
    // 元素计数器,这是个原子类。因为读写分别使用不同的锁,但都会访问这个属性,所以它需要保证线程安全
    private final AtomicInteger count = new AtomicInteger();
    
    // 队列头结点
    private transient Node<E> head;
    
    // 队列尾结点
    private transient Node<E> last;
    
    // 队头出队锁
    private final ReentrantLock takeLock = new ReentrantLock();
    
    // 等待获取出队锁的线程的等待队列
    private final Condition notEmpty = takeLock.newCondition();
    
    // 队尾入队锁
    private final ReentrantLock putLock = new ReentrantLock();
    
    // 等待获取入队锁的线程的等待队列
    private final Condition notFull = putLock.newCondition();
    
    

    3. LinkedBlockingQueue 的元素存取流程

    3.1 添加元素

    在这里插入图片描述

    LinkedBlockingQueue 添加元素的主要方法是 LinkedBlockingQueue#put()/LinkedBlockingQueue#offer(),本文以 LinkedBlockingQueue#put() 为例

    1. LinkedBlockingQueue#put() 方法的处理步骤如下:

      1. 当前线程首先获取 putLock 写入锁
      2. 获取锁成功,则判断当前队列是否已经满了,队列容量已满的情况下不允许添加新的元素,此时当前线程需要挂起等待,notFull.await() 类似于 Object.wait()方法
      3. 线程唤醒后依然在 while 循环中判断队列容量是否已满,因为消费线程会不断取出队列中的元素,故此处循环正常情况下是一定能够跳出的。跳出后调用 LinkedBlockingQueue#enqueue() 方法将元素入队
      4. 当前线程完成添加元素的操作,检查队列容量是否已满,如果队列没有达到容量上限显然可以继续添加元素,则将在等待 putLock 写入锁的一个线程唤醒,让它能完成添加元素的操作,notFull.signal() 类似于 Object.notify()方法,是 AQS 的一种实现机制,读者如感兴趣可参考Java 队列同步器 AQS(AbstractQueuedSynchronizer)源码解析
      5. 当前线程释放 putLock 写入锁
      6. 如果队列在本次添加元素之前是空的,那么很有可能存在消费线程挂起等待可消费的元素,故此时调用 LinkedBlockingQueue#signalNotEmpty() 方法唤醒一个在等待的消费线程

      LinkedBlockingQueue#put() 方法会一直阻塞直到能够把元素添加到队列中,也就是除非发生异常,否则元素是必然能添加到队列中的
      LinkedBlockingQueue#offer() 方法则允许添加元素失败,无法将元素添加到队列时直接返回 false 即可结束调用

      public void put(E e) throws InterruptedException {
           if (e == null) throw new NullPointerException();
           // Note: convention in all put/take/etc is to preset local var
           // holding count negative to indicate failure unless set.
           int c = -1;
           Node<E> node = new Node<E>(e);
           final ReentrantLock putLock = this.putLock;
           final AtomicInteger count = this.count;
           putLock.lockInterruptibly();
           try {
               /*
                * Note that count is used in wait guard even though it is
                * not protected by lock. This works because count can
                * only decrease at this point (all other puts are shut
                * out by lock), and we (or some other waiting put) are
                * signalled if it ever changes from capacity. Similarly
                * for all other uses of count in other wait guards.
                */
               while (count.get() == capacity) {
                   notFull.await();
               }
               enqueue(node);
               c = count.getAndIncrement();
               if (c + 1 < capacity)
                   notFull.signal();
           } finally {
               putLock.unlock();
           }
           if (c == 0)
               signalNotEmpty();
       }
      
    2. LinkedBlockingQueue#enqueue() 方法的处理如下,非常简单易懂,就是元素从队列尾部入队

      private void enqueue(Node<E> node) {
           // assert putLock.isHeldByCurrentThread();
           // assert last.next == null;
           last = last.next = node;
       }
      

    3.2 取出元素

    LinkedBlockingQueue 取出元素的主要方法为 LinkedBlockingQueue#take()/LinkedBlockingQueue#poll(),本文以 LinkedBlockingQueue#take() 为例

    1. LinkedBlockingQueue#take() 方法的处理步骤如下:

      1. 当前线程首先获取 takeLock 取出锁
      2. 获取锁成功,则判断当前队列是否是空的,队列元素数量为 0 的情况下无法取出元素,此时当前线程需要挂起等待,调用 notEmpty.await() 方法
      3. 线程唤醒后依然在 while 循环中判断队列是否为空,因为生产线程会往队列中添加元素,故此处循环是一定能够跳出的。跳出后调用 LinkedBlockingQueue#dequeue() 方法将队列头部元素出队
      4. 元素出队后,检查队列是否是空的,如果队列不为空显然可以继续取出元素,则将在等待 takeLock 取出锁的一个线程唤醒,让它能完成取出元素的操作,调用notEmpty.signal()方法
      5. 当前线程释放 takeLock 取出锁
      6. 如果队列在本次取出元素之前是满的,那么很有可能存在生产线程挂起等待时机添加元素,故此时调用 LinkedBlockingQueue#signalNotFull() 方法唤醒一个在等待的生产线程

      LinkedBlockingQueue#take() 方法会一直阻塞直到能够从队列中取到元素,也就是除非发生异常,否则该方法是必然能获取到元素的
      LinkedBlockingQueue#poll() 方法则允许获取元素失败,无法从队列中获取元素时直接返回 null 即可结束调用

      public E take() throws InterruptedException {
           E x;
           int c = -1;
           final AtomicInteger count = this.count;
           final ReentrantLock takeLock = this.takeLock;
           takeLock.lockInterruptibly();
           try {
               while (count.get() == 0) {
                   notEmpty.await();
               }
               x = dequeue();
               c = count.getAndDecrement();
               if (c > 1)
                   notEmpty.signal();
           } finally {
               takeLock.unlock();
           }
           if (c == capacity)
               signalNotFull();
           return x;
       }
      
    2. LinkedBlockingQueue#dequeue() 方法其实就是将元素从队列头部移除,使其出队

      private E dequeue() {
           // assert takeLock.isHeldByCurrentThread();
           // assert head.item == null;
           Node<E> h = head;
           Node<E> first = h.next;
           h.next = h; // help GC
           head = first;
           E x = first.item;
           first.item = null;
           return x;
       }
      
    展开全文
  • 线程池中的阻塞队列选择

    万次阅读 多人点赞 2019-02-11 16:04:41
    内存被耗尽可能有一个原因是,因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越...

    这是一个十分严重的问题

    自从最近的某年某月某天起,线上服务开始变得不那么稳定。在高峰期,时常有几台机器的内存持续飙升,并且无法回收,导致服务不可用。

    例如GC时间采样曲线:

    和内存使用曲线:

    图中所示,18:50-19:00的阶段,已经处于服务不可用的状态了。上游服务的超时异常会增加,该台机器会触发熔断。熔断触发后,改台机器的流量会打到其他机器,其他机器发生类似的情况的可能性会提高,极端情况会引起所有服务宕机,曲线掉底。

    因为线上内存过大,如果采用 jmap dump的方式,这个任务可能需要很久才可以执行完,同时把这么大的文件存放起来导入工具也是一件很难的事情。再看JVM启动参数,也很久没有变更过 Xms, Xmx, -XX:NewRatio, -XX:SurvivorRatio, 虽然没有仔细分析程序使用内存情况,但看起来也无大碍。

    于是开始找代码,某年某天某月~ 嗯,注意到一段这样的代码提交:

    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void push2Kafka(Object msg) {
        executor.execute(new WriteTask(msg,  false));    
    }
    

    相关代码的完整功能是,每次线上调用,都会把计算结果的日志打到 Kafka,Kafka消费方再继续后续的逻辑。内存被耗尽可能有一个原因是,因为使用了 newFixedThreadPool 线程池,而它的工作机制是,固定了N个线程,而提交给线程池的任务队列是不限制大小的,如果Kafka发消息被阻塞或者变慢,那么显然队列里面的内容会越来越多,也就会导致这样的问题。

    为了验证这个想法,做了个小实验,把 newFixedThreadPool 线程池的线程个数调小一点,例如 1。果然压测了一下,很快就复现了内存耗尽,服务不可用的悲剧。

    最后的修复策略是使用了自定义的线程池参数,而非 Executors 默认实现解决了问题。下面就把线程池相关的原理和参数总结一下,避免未来踩坑。

    1. Java线程池

    虽然Java线程池理论,以及构造线程池的各种参数,以及 Executors 提供的默认实现之前研读过,不过线上还没有发生过线程池误用引发的事故,所以有必要把这些参数再仔细琢磨一遍。

    优先补充一些线程池的工作理论,有助于展开下面的内容。线程池顾名思义,就是由很多线程构成的池子,来一个任务,就从池子中取一个线程,处理这个任务。这个理解是我在第一次接触到这个概念时候的理解,虽然整体基本切入到核心,但是实际上会比这个复杂。例如线程池肯定不会无限扩大的,否则资源会耗尽;当线程数到达一个阶段,提交的任务会被暂时存储在一个队列中,如果队列内容可以不断扩大,极端下也会耗尽资源,那选择什么类型的队列,当队列满如何处理任务,都有涉及很多内容。线程池总体的工作过程如下图:

    线程池内的线程数的大小相关的概念有两个,一个是核心池大小,还有最大池大小。如果当前的线程个数比核心池个数小,当任务到来,会优先创建一个新的线程并执行任务。当已经到达核心池大小,则把任务放入队列,为了资源不被耗尽,队列的最大容量可能也是有上限的,如果达到队列上限则考虑继续创建新线程执行任务,如果此刻线程的个数已经到达最大池上限,则考虑把任务丢弃。

    在 java.util.concurrent 包中,提供了 ThreadPoolExecutor 的实现。

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
    } 
    

    既然有了刚刚对线程池工作原理对概述,这些参数就很容易理解了:

    corePoolSize- 核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。

    maximumPoolSize-池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。

    keepAliveTime - 当线程数大于核心时,多于的空闲线程最多存活时间

    unit - keepAliveTime 参数的时间单位。

    workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:无界队列,有界队列和同步移交。将在下文中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接口的任务。 别看这个参数位置很靠后,但是真的很重要,因为楼主的坑就因这个参数而起,这些细节有必要仔细了解清楚。

    threadFactory - 执行程序创建新线程时使用的工厂。

    handler - 阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。

    2. 可选择的阻塞队列BlockingQueue详解

    在重复一下新任务进入时线程池的执行策略: 
    如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存入queue中,而是直接运行) 
    如果运行的线程大于等于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。 
    如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。 
    主要有3种类型的BlockingQueue:

    无界队列

    队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个无界LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。

    有界队列

    常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。 
    使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

    在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。

    同步移交队列

    如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列

    3. 可选择的饱和策略RejectedExecutionHandler详解

    JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进行实现。

    3.1 AbortPolicy中止策略

    该策略是默认饱和策略。

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
     } 
    

    使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

    3.2 DiscardPolicy抛弃策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
    

    如代码所示,不做任何处理直接抛弃任务

    3.3 DiscardOldestPolicy抛弃旧任务策略

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
    } 
    

    如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

    3.4 CallerRunsPolicy调用者运行

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
    } 
    

    既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

    4. Java提供的四种常用线程池解析

    既然楼主踩坑就是使用了 JDK 的默认实现,那么再来看看这些默认实现到底干了什么,封装了哪些参数。简而言之 Executors 工厂方法Executors.newCachedThreadPool() 提供了无界线程池,可以进行自动线程回收;Executors.newFixedThreadPool(int) 提供了固定大小线程池,内部使用无界队列;Executors.newSingleThreadExecutor() 提供了单个后台线程。

    详细介绍一下上述四种线程池。

    4.1 newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    } 
    

    在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 
    初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程? 
    这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。

    4.2 newFixedThreadPool

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
     }
    

    看代码一目了然了,线程数量固定,使用无限大的队列。再次强调,楼主就是踩的这个无限大队列的坑。

    4.3 newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    

    在来看看ScheduledThreadPoolExecutor()的构造函数

     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        } 
    

    ScheduledThreadPoolExecutor的父类即ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是DelayedWorkQueue这个阻塞对列,在上面没有介绍,它作为静态内部类就在ScheduledThreadPoolExecutor中进行了实现。简单的说,DelayedWorkQueue是一个无界队列,它能按一定的顺序对工作队列中的元素进行排列。

    4.4 newSingleThreadExecutor

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
     } 
    

    首先new了一个线程数目为 1 的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:

    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
                super(executor);
                e = executor;
    } 
    

    在看看它的父类

    DelegatedExecutorService(ExecutorService executor) { 
               e = executor; 
    } 
    

    其实就是使用装饰模式增强了ScheduledExecutorService(1)的功能,不仅确保只有一个线程顺序执行任务,也保证线程意外终止后会重新创建一个线程继续执行任务。

    以上除了优化线程池,更应该优化kafka队列,设置相应的超时机制,避免超长时间阻塞等待,导致OOM。

     

    Alibaba命名规范的解释:

    【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 说明: Executors 返回的线程池对象的弊端如下:

    1) FixedThreadPool 和 SingleThreadPool : 允许的请求队列长度为 Integer.MAX_VALUE ,可能会堆积大量的请求,从而导致 OOM 。

    2) CachedThreadPool 和 ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE ,可能会创建大量的线程,从而导致 OOM 。

     

    转自: https://zhuanlan.zhihu.com/p/32867181  略有改变。

    展开全文
  • 线程池阻塞队列

    2021-07-26 22:21:48
    阻塞队列 add/remove 如果队列满了,add(T)会抛出异常 如果队列为空,remove()会抛出异常 offer/poll 如果队列满了,offer(T)会返回false 如果队列为空,poll()会返回null put/take 如果队列满了,put(T)会...

    阻塞队列

    add/remove

            如果队列满了,add(T)会抛出异常        

            如果队列为空,remove()会抛出异常

    offer/poll

            如果队列满了,offer(T)会返回false

            如果队列为空,poll()会返回null

    put/take

            如果队列满了,put(T)会阻塞

            如果队列为空,take()会阻塞

    常用的阻塞队列

           LinkedBlockingQueue:链表实现的阻塞队列,可以不指定大小,默认为Integer.MAX_VALUE

           ArrayBlockingQueue:数组实现的阻塞队列,必须指定大小

           SynchronousQueue:不能存放元素的阻塞队列

    线程池

    线程池参数

            corePoolSize:核心线程数

            maximumPoolSize:最大线程数、规定阻塞队列满了之后,允许创建的最大任务个数

            keepAliveTime:线程空闲时间

            unit:线程空闲时间单位

            workQueue:阻塞队列

            threadFactory:线程工厂,主要是给线程指定name

            handler:拒绝策略,默认是抛出异常

                     AbortPolicy:当任务书大于maximumPoolSize+阻塞队列大小之后,会抛出异常

                     DiscardPolicy:抛弃最近提交的任务

                     DiscardOldestPolicy:抛弃最早提交未执行的任务

                     CallerRunsPolicy:由调用线程自己执行该任务,如果是主线程那么就是main

    线程池工作原理

    ·         提交的任务数小于等于核心线程数,会创建核心线程执行任务

             ·提交的任务数大于核心线程数,超出部分会被加入阻塞队列中

             ·阻塞队列如果满了,判断超出的任务数是否大于最大线程数,如果大于那么走拒绝策略,否则执行。

    合理配置线程池

    CPU密集型

            有大量计算型的任务,最大线程数不要超过逻辑核心数

    IO密集型

            有大量磁盘io,网络通讯,最大线程数一般配置为2*逻辑核心数

    线程不安全的原因

             java内存模型中,为了提高效率在线程中引入了工作内存和主存,工作内存是线程私有的,两个线程同时要修改一个变量,刚开始都是从主存中拷贝一份副本,然后在工作内存中修改值,在未来某个时间刷新到主存中,由于两个线程都是在自己内部的工作内存修改变量,所以最终刷新到主存中的数据可能是不正确的,这就是线程不安全的原因。

    展开全文
  • 线程池常用的阻塞队列有哪些?

    千次阅读 2020-06-25 05:51:27
    线程池常用的阻塞队列有哪些? 文章目录线程池常用的阻塞队列有哪些?1.线程池的内部结构2.阻塞队列3.LinkedBlockingQueue4.SynchronousQueue5.DelayedWorkQueue6.参考 1.线程池的内部结构 线程池内部由四部分组成 ...

    线程池常用的阻塞队列有哪些?

    1.线程池的内部结构

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ajtBM5yG-1593035343720)(G:\workspace\csdn\learn-document\java\concurrent\image-20200624165150713.png)]
    线程池内部由四部分组成

    • 第一部分是线程池管理器,主要负责管理线程池的创建、销毁、添加任务等管理操作。

    • 第二部分是工作线程,它又分为核心工作线程和非核心工作线程。

    • 第三部分是任务队列,作为一种缓冲机制,会将超过核心线程数的任务放到任务队列中,等待执行;由于多线程同时从任务队列中获取任务是并发场景,此时就需要任务队列满足线程安全的要求,所以线程池中任务队列采用 BlockingQueue 来保障线程安全。

    • 第四部分是任务,任务要求实现统一的接口,以便工作线程可以处理和执行。

    2.阻塞队列

    线程池对应的阻塞队列

    线程池阻塞队列
    FixedThreadPoolLinkedBlockingQueue
    SingleThreadExecutorLinkedBlockingQueue
    CachedThreadPoolSynchronousQueue
    ScheduledThreadPoolDelayedWorkQueue
    SingleThreadScheduledExecutorDelayedWorkQueue

    3.LinkedBlockingQueue

    第一种阻塞队列是 LinkedBlockingQueue,它的容量是 Integer.MAX_VALUE,为 231 -1 ,是一个非常大的值,可以认为是无界队列。

    FixedThreadPool 和 SingleThreadExecutor 线程池的线程数是固定的,所以没有办法增加特别多的线程来处理任务,这时就需要 LinkedBlockingQueue 这样一个没有容量限制的阻塞队列来存放任务。

    4.SynchronousQueue

    第二种阻塞队列是 SynchronousQueue,对应的线程池是 CachedThreadPool。线程池 CachedThreadPool 的最大线程数是 Integer.MAX_VALUE,可以理解为线程数是可以无限扩展的。

    CachedThreadPool 和上一种线程池 FixedThreadPool 的情况恰恰相反,FixedThreadPool 的情况是阻塞队列的容量是无限的,而这里 CachedThreadPool 是线程数可以无限扩展,所以 CachedThreadPool 线程池并不需要一个任务队列来存储任务,因为一旦有任务被提交就直接转发给线程或者创建新线程来执行,而不需要另外保存它们。

    我们自己创建使用 SynchronousQueue 的线程池时,如果不希望任务被拒绝,那么就需要注意设置最大线程数要尽可能大一些,以免发生任务数大于最大线程数时,没办法把任务放到队列中也没有足够线程来执行任务的情况。

    5.DelayedWorkQueue

    第三种阻塞队列是DelayedWorkQueue,它对应的线程池分别是 ScheduledThreadPool 和 SingleThreadScheduledExecutor,这两种线程池的最大特点就是可以延迟执行任务,比如说一定时间后执行任务或是每隔一定的时间执行一次任务。

    DelayedWorkQueue 的特点是内部元素并不是按照放入的时间排序,而是会按照延迟的时间长短对任务进行排序,内部采用的是“堆”的数据结构(堆的应用之一就是 优先级队列)。之所以线程池 ScheduledThreadPool 和 SingleThreadScheduledExecutor 选择 DelayedWorkQueue,是因为它们本身正是基于时间执行任务的,而延迟队列正好可以把任务按时间进行排序,方便任务的执行。

    展开全文
  • 一、线程池定义和使用 jdk 1.5 之后就引入了线程池。 1.1 定义 从上面的空间切换看得出来,线程是稀缺资源,它的创建与销毁是一个相对偏重且耗资源的操作,而Java线程依赖于内核线程,创建线程需要进行操作系统状态...
  • 线程池 线程池介绍 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价...
  • 线程池阻塞队列

    2020-07-27 11:23:29
    由于频繁的创建和销毁线程会消耗很多资源,因此线程池应运而生来去除频繁的创建与删除线程这一过程。 2、常见线程池 ①、newSingleThreadExecutor 单一线程池,使用唯一的工作线程执行任务,保证所有任务按照指定...
  • 使用线程池比手动创建线程好在哪里? 手工创建线程,每一个任务都创建线程问题: 第一点,反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的...
  • java线程池常用的阻塞队列

    千次阅读 2020-07-22 16:46:03
    表格左侧是线程池,右侧为它们对应的阻塞队列,可以看到 5 种线程池对应了 3 种阻塞队列,我们接下来对它们进行逐一的介绍。 1. LinkedBlockingQueue 对于 FixedThreadPool 和 SingleThreadExector 而言,它们使用的...
  • Java线程池阻塞队列

    2020-08-17 01:16:24
    一.Java线程池的优点 1.降低资源消耗:通过重复利用线程池中已创建好的线程来降低线程创建和销毁造成的消耗。 2.提高响应速度:当任务到达时,任务可以直接拿到线程池中已创建好的线程立即执行。 3.提高线程的可...
  • Java线程池带实例讲解,阻塞队列说明 首先,线程池是啥,有啥好处这里就不提了.google一下马上知道. 嘻嘻嘻 首先第一步!我们先来认识下 在 java.util.concurrent 包中,提供了 ThreadPoolExecutor 的实现。 该类是核心,...
  • 一,线程池 首先了解线程池整个框架 1,这里记住最核心的类是ThreadPoolExecutor 2,在ExecuorService中提供了newSingleThreadExecutor,newFixedThreadPool,newCacheThreadPool,newScheduledThreadPool四个...
  • ArrayBlockingQueue(有界队列):使用该对象时,若有新任务需要执行,如果线程池的实际线程数少于corePoolSize,则会优先创建新的线程,若大于,则会将新任务加入到等待队列。当队列满时,则在总线程数不大于...
  • 在设置线程池队列长度时,如果长度设置的不合理就无法发挥出多线程的威力。设置线程池队列长度取决于使用场景;比如全程异步的系统,队列可以设置为0,corePoolSize设置为cpu核数。研究tomcat、Dubbo等业界成熟的...
  • 线程池里面的队列跟之前学习的队列不一样,之前学习的队列Deque是非阻塞队列,但是在JUC包里面的队列阻塞队列; 什么是阻塞队列?如果队列中没有元素的情况下获取元素(出队),程序不会结束一直阻塞线程池中...
  • 组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数 组外,ArrayBlockingQueue 内部还保存着两个整形变量,分别标识着队列的 头部和尾部在数组中的位置。 ArrayBlockingQueue 在生产者放入数据...
  • 阻塞队列要满足下面两点: 插入:队列不满时可以插入,满了阻塞插入元素的线程 移除:队列不为空时可以移除,空了阻塞取走元素的线程 即生产者-消费者模式 参数意义 public ThreadPoolExecutor( // 核心...
  • 线程池策略 corePoolSize:核心线程数;...第二步: 判断工作队列(workQueue)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步; 第三步: 判断线程池中的线程数量是否达到了maxumunPo
  • 多线程高并发一定少不了线程池技术。 作用 提升性能 线程的创建和销毁都会... 线程池最大线程数,队列满时,线程最大并发数 keepAliveTime 空闲线程的最大存活时间,系统默认只回收非核心线程,核心线程可以通
  • 技术交流QQ群【JAVA,C++,Python,.NET,BigData,AI】:170933152 上面是线程的执行周期 这个线程的生命周期,可以看到时间都浪费在了创建和销毁的这里了. 实际上执行业务的时间只有...那么也就是说,线程池已经准备.
  • 接口源码: public interface BlockingQueue<... * 向队列中添加元素 成功返回true,队列满了则抛出异常 */ boolean add(E e); /** * 向队列中添加元素,成功返回true,队列满了返回false */ ...
  • 而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列阻塞产生,系统内存就有可能已被消耗殆尽了...
  • 阻塞队列线程池

    2021-01-30 17:59:12
    1. 阻塞队列 1)支持阻塞的插入方法:当队列满时,队列阻塞插入元素的线程,直到队列不满。 2)支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变为非空。     在并发编程中使用...
  • Java 线程池等待队列问题

    千次阅读 2019-09-20 15:38:48
    线程池的等待队列最大长度默认为int的最大值,随口默写出来就是2147483647(2^31 -1,高中物理老师说过一句话,记住一些固定的数字可以预判一些问题)。线程池在提交任务时,如果线程池未达到最大线程数,则起线程...
  • Java 线程池原理和队列详解

    万次阅读 2016-06-17 18:41:27
    参考: http://blog.csdn.net/mazhimazh/article/details/19243889 http://shift-alt-ctrl.iteye.com/blog/1840385 ...http://blog.csdn.net/sd0902/article/details/8395677线程池
  • 点击关注公众号,利用碎片时间学习线程池的工作原理:ThreadPoolExecutor(intcorePoolSize,//核心线程数 intm...
  • 为什么要用线程池? 在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在实际处理实际...
  • 线程池的等待队列最大长度默认为int的最大值,随口默写出来就是2147483647(2^31 -1,高中物理老师说过一句话,记住一些固定的数字可以预判一些问题)。线程池在提交任务时,如果线程池未达到最大线程数,则起线程...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 58,403
精华内容 23,361
关键字:

线程池默认的阻塞队列