精华内容
下载资源
问答
  • java线程池队列

    2017-10-20 16:31:31
    java线程池队列

    线程池

    ExecutorService接口实现类,四种常用线程池:

    • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
      补充:为了合理利用资源,我们通常把定长池的长度设置为当前PC机获取cpu核心数:Runtime.getRuntime().availableProcessors():获取当前CPU核心数;

    • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程

    • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行

    • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    package com.swagger.demo.thread;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    
    public class MyThreadPool {
    
        public static void main(String [] args){
            int num = Runtime.getRuntime().availableProcessors();
            Executor executor = Executors.newFixedThreadPool(num);
            for (int i = 0 ; i<num ; i++){
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("我是一个子线程!!");
                    }
                });
            }
        }
    }

    队列

    java.ulil.concurrent包提供了阻塞队列的4个变种

    • LinkedBlockingQueue在不指定容量时最大容量为Integer.MAX_VALUE它是基于链表的队列,此队列按 FIFO(先进先出)排序元素
    • PriorityBlockingQueue是一个带优先级的 队列,而不是先进先出队列。元素按优先级顺序被移除,该队列也没有上限(看了一下源码,PriorityBlockingQueue是对 PriorityQueue的再次包装,是基于堆数据结构的,而PriorityQueue是没有容量限制的,与ArrayList一样,所以在优先阻塞 队列上put时是不会受阻的。虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),但是如果队列为空,那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,往入该队列中的元素要具有比较能力。

    • DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。 下面是延迟接口:
      代码如下:
      public interface Delayed extends Comparable {
      long getDelay(TimeUnit unit);
      }
      放入DelayQueue的元素还将要实现compareTo方法,DelayQueue使用这个来为元素排序。

    应用:

    使用的SpringBoot应用环境搭建:
    基于AOP的原理实现一个简单的日志处理:

    package com.swagger.demo.aop;
    
    import com.swagger.demo.Entity.LogContentEntity;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Before;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    import javax.servlet.http.HttpServletRequest;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    @Configuration
    @Aspect
    @Component
    public class AopLogConfig implements Runnable {
    
        @Autowired
        private HttpServletRequest request;
    
        private LinkedBlockingQueue<LogContentEntity> logQueue;
    
        public AopLogConfig() {
            //Spring启动后,该对象创建时。初始化队列以及线程池。
            logQueue = new LinkedBlockingQueue<LogContentEntity>(3000);
            int num = Runtime.getRuntime().availableProcessors();
            ExecutorService  executor = Executors.newFixedThreadPool(num);
            for (int i = 0 ;i<num ;i++){
                executor.execute(this);
            }
        }
    
        @Before("execution(public * com.swagger.demo.controller..*.*(..))")
        public void doBefore(JoinPoint joinPoint) throws Exception{
    
            //日志记录的信息可自行修改
            LogContentEntity Log = new LogContentEntity();
            String method = request.getMethod();
            Log.setHttpMethod(method);
            String url = request.getRequestURL().toString();
            Log.setUrl(url);
            String ip = request.getRemoteAddr();
            Log.setIp(ip);
            Log.setContent("test Log Content");
            //将需要记录的日志对象放到队列中等待线程异步执行。
            logQueue.put(Log);
        }
    
        @Override
        public void run() {
            try{
                while(true){
                    //如果队列里没有,则会阻塞;
                    LogContentEntity take = logQueue.take();
                    //日志处理逻辑可自行修改;
                    System.out.println(take.toString());
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    
    展开全文
  • 以前一直听说java线程池,但没怎么真正使用过,最近在尝试手写网络访问框架,使用到了线程池跟队列,做个简单记录importandroid.util.Log;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent...

    以前一直听说java线程池,但没怎么真正使用过,最近在尝试手写网络访问框架,使用到了线程池跟队列,做个简单记录

    importandroid.util.Log;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by Administrator on 2017/1/13 0013.
     * 一个阻塞线程,不断从任务队列获取任务执行,失败后自动添加回任务
     */
    
    public class ThreadPoolManager {
    private static final String TAG ="05" ;
        private static  ThreadPoolManager instance=new ThreadPoolManager();//简单单例
    
        private LinkedBlockingQueue<Future<?>> taskQuene=new LinkedBlockingQueue<>();//任务队列
    
        private ThreadPoolExecutor threadPoolExecutor;
        public static ThreadPoolManager getInstance() {
    
    return instance;
    }
    private ThreadPoolManager()
        {
    threadPoolExecutor=new ThreadPoolExecutor(4,10,10, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(4), handler);
    threadPoolExecutor.execute(runable);
    }
    
    private Runnable runable =new Runnable() {
    @Override
    public void run() {
    while (true)
                {
                    FutureTask futrueTask=null;
    
                    try {
    /**
     * 阻塞式函数
     */
    //   Log.i(TAG,"等待队列     "+taskQuene.size());
    futrueTask= (FutureTask) taskQuene.take();
    } catch (InterruptedException e) {
                        e.printStackTrace();
    }
    if(futrueTask!=null)
                    {
    threadPoolExecutor.execute(futrueTask);
    }
    Log.i(TAG,"线程池大小      "+threadPoolExecutor.getPoolSize());
    }
            }
        };
    //添加任务
    public <T> void execte(FutureTask<T> futureTask) throws InterruptedException {
    taskQuene.put(futureTask);
    }
    
        //线程池任务执行失败回调
    private RejectedExecutionHandler handler=new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
    taskQuene.put(new FutureTask<Object>(r,null) {
                    });
    } catch (InterruptedException e) {
                    e.printStackTrace();
    }
            }
        };
    }
    展开全文
  • Java 线程池等待队列问题

    千次阅读 2019-09-20 15:38:48
    线程池的等待队列最大长度默认为int的最大值,随口默写出来就是2147483647(2^31 -1,高中物理老师说过一句话,记住一些固定的数字可以预判一些问题)。线程池在提交任务时,如果线程池未达到最大线程数,则起线程...
    • 1、首先看下Executor获取线程池,这样方式,可以设置线程池的大小,但是了解线程池的内部原理的情况下,这样的线程池可能会引起OOM,原因在于
      该线程池的等待队列最大长度默认为int的最大值,随口默写出来就是2147483647(2^31 -1,高中物理老师说过一句话,记住一些固定的数字可以预判一些问题)。线程池在提交任务时,如果线程池未达到最大线程数,则起线程执行任务,在达到最大值后,会放入等待队列,按默认的int最大值,很容易造成内存溢出。所以通常会选择自行构造线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
    • 2、通过自行构建线程池,指定等待队列的长度。那么问题来了,虽然用的是BlockingQueue,但是往BlockingQueue放任务时,用的是offer(),方法,而不是阻塞的方法put();这样在队列满了之后,继续往队列放任务就会抛异常,线程池提供了rejection机制去处理这种情况,应用可以自定义如何处理队列满的情况,默认是直接丢弃。对于有些业务场景,我们宁愿阻塞等待,也不要无止境的放队列,然后让他失败。这时候需要再用一点奇技淫巧,用来保证在等待队列放置一定数量后,阻塞生成任务的线程,等到线程池里已经有任务处理完了再继续放入任务。
    ExecutorService threadPool = new ThreadPoolExecutor(5, 5,
    			0L, TimeUnit.MILLISECONDS,
    			new ArrayBlockingQueue<Runnable>(5*2));
    
    • 3、还是使用2中自行构建的线程池,除此之外再定义一个blockQueue,最大长度为5,如下列代码。在while循环时,threadpool中的等待队列会逐渐增加最后稳定在5,并且此时与blockingQueue的长度一致,此时如果继续循环,便在blockingQueue.put(new Data())处阻塞,直到线程池中已有任务处理完。
    BlockingQueue<Data> blockingQueue = new ArrayBlockingQueue<Data>(5);
    
        @Test
    	public void testBlock() throws InterruptedException {
    		int i=0;
    		while(i<100){
    
    			i++;
    			blockingQueue.put(new Data());
    //			System.out.println("blockingQueue.size():"+blockingQueue.size()+"  "+"threadpool:"+threadPool);
    //			封装成task时并没有从blockingQueue中take,只有submit提交执行时才take,因此,等待队列中的task数目,就等于blockingQueue的长度了
    			threadPool.submit( ()->{return process( blockingQueue.take());});
    			System.out.println("blockingQueue.size():"+blockingQueue.size()+"  "+"threadpool:"+threadPool);
    		}
    		Thread.sleep(10000L);
    		System.out.println("blockingQueue.size():"+blockingQueue.size()+"  "+"threadpool:"+threadPool);
    	}
    
    
    	public Result process(Data data) throws InterruptedException {
    		Thread.sleep(300L);
    		return new Result();
    	}
    
    展开全文
  • 一、线程池阻塞队列、 二、拒绝策略、 三、使用 ThreadPoolExecutor 自定义线程池参数、





    一、线程池阻塞队列



    线程池阻塞队列是线程池创建的第 5 5 5 个参数 : BlockingQueue<Runnable> workQueue ;

        public ThreadPoolExecutor(int corePoolSize,					// 核心线程数 , 这些线程基本不会被销毁
                                  int maximumPoolSize, 				// 最大线程数 , 线程池能创建的最大线程数量
                                  long keepAliveTime, 				// 空闲情况下 , 非核心线程存活时间
                                  TimeUnit unit, 					// 空闲时间单位
                                  BlockingQueue<Runnable> workQueue,// 任务的阻塞队列 ★
                                  ThreadFactory threadFactory, 		// 创建线程的工厂类
                                  RejectedExecutionHandler handler) // 拒绝策略
    

    线程池阻塞队列 : 线程池中的阻塞队列 , 同一时刻 , 只能有 1 1 1 个线程访问队列 , 执行任务 入队 / 出队 操作 ; 队列都是 FIFO 先进先出 ;

    • 阻塞队列相关概念 :

      • 大小边界 :
        • 有界 : 阻塞队列 大小有限制 , 不是无限大的 ;
        • 无界 : 阻塞队列 理论上无限大 , 比如设置成 Integer.MAX_VALUE ;
      • 队列已满 : 只能出队 , 不能入队 ; 入队操作需阻塞等待 ;
      • 队列为空 : 只能入队 , 不能出队 ; 出队操作需要等待 ;
    • ArrayBlockingQueue : 有界阻塞队列 , 需要 指定阻塞队列大小 ;

    • LinkedBlockingQueue : 无界阻塞队列 , 基于链表的阻塞队列 ;

      • Executors.newCachedThreadPool()Executors.newFixedThreadPool(10) 方法创建的线程池 , 使用的是该阻塞队列 ;
    • SynchronousQueue : 队列 不存储元素 , 后一个 Runnable 任务入队 , 必须等到前一个任务执行完毕才可以 , 否则会一直阻塞等待 ;

      • Executors.newCachedThreadPool() 方法创建的线程池 , 使用的是该阻塞队列 ;
    • PriorityBlockingQueue : 有优先级的阻塞队列 ;


    阻塞队列吞吐量 : SynchronousQueue > LinkedBlockingQueue > ArrayBlockingQueue ;





    二、拒绝策略



    线程池拒绝策略是线程池创建的第 7 7 7 个参数 : RejectedExecutionHandler handler ;

        public ThreadPoolExecutor(int corePoolSize,					// 核心线程数 , 这些线程基本不会被销毁
                                  int maximumPoolSize, 				// 最大线程数 , 线程池能创建的最大线程数量
                                  long keepAliveTime, 				// 空闲情况下 , 非核心线程存活时间
                                  TimeUnit unit, 					// 空闲时间单位
                                  BlockingQueue<Runnable> workQueue,// 任务的阻塞队列 
                                  ThreadFactory threadFactory, 		// 创建线程的工厂类
                                  RejectedExecutionHandler handler) // 拒绝策略 ★
    

    线程池拒绝策略 : 如果核心线程 , 非核心线程都在执行任务 , 阻塞队列是有界的 , 也满了 , 此时线程池如果再添加任务 , 就会触发如下拒绝策略 ;

    • DiscardPolicy : 丢弃任务 ;
    • DiscardOldestPolicy : 丢弃队头的最旧的任务 ;
    • AbortPolicy : 抛出异常 , 这也是默认方式 ;
    • CallerRunsPolicy : 调用者自行处理 ;

    线程池默认的拒绝策略是 抛出异常 方式 ;

        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    




    三、使用 ThreadPoolExecutor 自定义线程池参数



    创建 1 1 1 个线程池 , 核心线程数是 2 2 2 , 最大线程数是 3 3 3 , 则非核心线程 0 ~ 1 个 , 非核心线程最大空闲存活时间 60 秒 , 阻塞队列最大存放 10 个元素 , 拒绝策略设置为抛出异常方式 , 如果阻塞队列装满 , 再次尝试执行新任务时 , 会抛出异常 ;


    代码示例 :

    import java.util.concurrent.*;
    
    public class Main {
    
        public static void main(String[] args) {
            ExecutorService executorService = new ThreadPoolExecutor(
                    2,                          // 核心线程数 2
                    3,                      // 最大线程数 3, 非核心线程 0 ~ 1 个
                    60,                        // 非核心线程最大空闲存活时间 60 秒
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<>(10),   // 阻塞队列, 最大存放 10 个元素
                    Executors.defaultThreadFactory(),       // 线程工厂
                    new ThreadPoolExecutor.AbortPolicy()    // 决绝策略, 如果执行任务失败, 抛出异常
            );
            for (int i = 0; i < 20; i ++) {
                executorService.execute(new Task(i));
            }
        }
    
        static class Task implements Runnable {
            /**
             * 记录线程的索引 0 ~ 99
             */
            private int i = 0;
    
            public Task(int i) {
                this.i = i;
            }
    
            @Override
            public void run() {
                System.out.println("线程 ID : " + Thread.currentThread().getName() + " , 线程索引 : " + i);
    
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    执行结果 : 这里线程最大执行到了 12 12 12 , 也就是从 0 0 0 开始计数 , 执行了 13 13 13 个任务 , 其中 3 3 3 个线程池各自执行一个任务 , 阻塞队列存放 10 10 10 个任务 , 再次尝试将第 14 14 14 个任务放入阻塞队列时 , 报出 java.util.concurrent.RejectedExecutionException 异常 , 但是队列中的 10 10 10 个任务也正常执行完毕 ;

    线程 ID : pool-1-thread-2 , 线程索引 : 1
    线程 ID : pool-1-thread-3 , 线程索引 : 12
    线程 ID : pool-1-thread-1 , 线程索引 : 0
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: 
    Task Main$Task@5cad8086 rejected from java.util.concurrent.ThreadPoolExecutor@6e0be858
    [Running, pool size = 3, active threads = 3, queued tasks = 10, completed tasks = 0]
    	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    	at Main.main(Main.java:16)
    线程 ID : pool-1-thread-3 , 线程索引 : 2
    线程 ID : pool-1-thread-1 , 线程索引 : 4
    线程 ID : pool-1-thread-2 , 线程索引 : 3
    线程 ID : pool-1-thread-1 , 线程索引 : 5
    线程 ID : pool-1-thread-2 , 线程索引 : 7
    线程 ID : pool-1-thread-3 , 线程索引 : 6
    线程 ID : pool-1-thread-1 , 线程索引 : 9
    线程 ID : pool-1-thread-2 , 线程索引 : 8
    线程 ID : pool-1-thread-3 , 线程索引 : 10
    线程 ID : pool-1-thread-2 , 线程索引 : 11
    

    在这里插入图片描述

    展开全文
  • (五)java 线程池工作队列

    千次阅读 2018-07-01 15:20:01
    线程池工作队列 &nbsp;&nbsp;上一章我们介绍了线程的基本情况,这一章进一步了解线程池中的工作队列,BlockingQueue 队列。 在类 Executors 中,我们可以看到不同线程池维护的工作队列是不同的,如...
  • java线程池队列详细讲解

    千次阅读 2015-04-18 12:20:18
    Java线程池使用说明 一简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包...
  • java 线程池队列选型

    千次阅读 2015-11-09 21:39:20
    线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程...
  • Java线程池阻塞队列

    2020-08-17 01:16:24
    一.Java线程池的优点 1.降低资源消耗:通过重复利用线程池中已创建好的线程来降低线程创建和销毁造成的消耗。 2.提高响应速度:当任务到达时,任务可以直接拿到线程池中已创建好的线程立即执行。 3.提高线程的可...
  • java线程池队列分析

    万次阅读 2014-12-26 14:28:48
    Java项目 当想让程序异步操作的时候,首先考虑使用Java多线程,但有的时候我们总会在想是简单的extends Thread 、implements Runnable接口还是使用线程池呢?而大多开发者可能更会选择使用线程池,.减少了创建和销毁...
  • 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程...
  • Java线程池带实例讲解,阻塞队列说明 首先,线程池是啥,有啥好处这里就不提了.google一下马上知道. 嘻嘻嘻 首先第一步!我们先来认识下 在 java.util.concurrent 包中,提供了 ThreadPoolExecutor 的实现。 该类是核心,...
  • java线程池常用的阻塞队列

    千次阅读 2020-07-22 16:46:03
    表格左侧是线程池,右侧为它们对应的阻塞队列,可以看到 5 种线程池对应了 3 种阻塞队列,我们接下来对它们进行逐一的介绍。 1. LinkedBlockingQueue 对于 FixedThreadPool 和 SingleThreadExector 而言,它们使用的...
  • java线程池--阻塞队列

    2019-02-14 10:20:40
    线程池的线程全部开启后,之后的线程任务就会先放入阻塞队列,由阻塞队列把线程任务发放到线程池的空闲线程,当阻塞队列没有任务时,线程池就会一直阻塞,直到新的线程任务进入,具体的线程池细节可参考我的另一篇...
  • java自带线程池队列详细讲解 前言 今天博主将为大家分享Java(面试必备):java自带线程池队列详细讲解,不喜勿喷,如有异议欢迎讨论! 线程的使用在java中占有极其重要的地位,Jdk1.5之后加入了java.util....
  • @TOC线程池阻塞队列为什么都用LinkedBlockingQueue LinkedBlockingQueue 使用单链表实现,提供3种构造函数 LinkedBlockingQueue() 无参构造函数,链表长度为Integer.MAX_VALUE LinkedBlockingQueue(int capacity) ...
  • java自带线程池队列详细讲解

    千次阅读 2016-02-06 11:13:33
    Java线程池使用说明 一简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包...
  • 注意这个ConcurrentLinkedQueue这个是个非阻塞队列, 这个队列是没有长度限制的,不用给一个长度,直接,向队列不停的.add数据就可以, 然后poll,是出队列,出队列的同时,并且会从队列中移出元素 然后peek,会返回队列...
  • 本人博客原地址:Java 线程池ExecutorService 等待队列问题 创作时间: 2019.09.30 11:12:35 1、首先看下Executor获取线程池,这样方式,可以设置线程池的大小,但是了解线程池的内部原理的情况下,这样的线程池...
  • Java 线程池 队列

    2014-10-29 23:48:29
    1.使用线程池的目的:
  • Java自带线程池队列详细讲解

    千次阅读 2017-02-16 20:35:26
    Java线程池使用说明 一简介 线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包...
  • 重点关注线程池的实现以及七个主要内容: 二.深入剖析线程池实现原理  在上一节我们从宏观上介绍了ThreadPoolExecutor,下面我们来深入解析一下线程池的具体实现原理,将从下面几个方面讲解: 1.线程池状态 2....
  • 其中有一个参数是BlockingQueue<Runnable> workQueue,是阻塞队列,和线程池一起实现了生产者-消费者模式。 什么是生产者-消费者模式呢? 就是当生产者生产的东西一定数量,消费者来不及消费,生产者会等待着...
  • Java线程池七个参数详解

    万次阅读 多人点赞 2019-04-23 11:14:33
    java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释。 从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 77,132
精华内容 30,852
关键字:

java线程池的阻塞队列

java 订阅