精华内容
下载资源
问答
  • 线程池-自定义线程池

    2021-05-12 12:52:49
    目录自定义线程池 自定义线程池

    前言

    为了更好的理解线程池的流程、内部属性的调度关系以及几个重要的参数。这里让我们自己来定义一个线程池吧!

    线程池内部调度关系

    在这里插入图片描述

    代码实现

    package com.coderzpw.demo.线程池.自定义线程池;
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.HashSet;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestPool {
        public static void main(String[] args) {
            ThreadPoll threadPoll = new ThreadPoll(2, 1000, TimeUnit.MICROSECONDS, 5, (queue,task)->{ // 五种拒绝策略
                // 1. 死等策略
    //            queue.put(task);
                // 2. 超时等待
    //            System.out.println(queue.offer(task,20,TimeUnit.MICROSECONDS));
                // 3. 让调用者放弃任务执行
    //            System.out.println("我放弃这个任务---,啥也不执行了");
                // 4. 让调用者抛出异常
    //            throw new RuntimeException("任务执行失败"+task);
                // 5. 让调度者自己执行任务
                task.run();
            });// 创建线程池对象
            for (int i=0; i<15; i++){
                int j=i;
                threadPoll.execute(() -> {      // 着了execute传过来的是一个Runnable接口实现类对象,这里用的lambda表达式写法
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(j);
                });
            }
        }
    }
    
    /**
     * 定义一个拒绝策略接口
     */
    interface RejectPolicy<T> {
        void reject(BlockingQueue queue, T task);
    }
    
    /**
     * 定义一个阻塞队列
     * @param <T>
     */
    class BlockingQueue<T> {
        // 1. 任务队列
        private Deque<T> queue = new ArrayDeque<>(); // 双向链表
        // 2. 锁(保护任务队列头或队列尾的任务对象)
        private ReentrantLock lock = new ReentrantLock();
        // 3. 生产者条件变量
        private Condition fullWaitSet = lock.newCondition();
        // 4. 消费者条件变量
        private Condition emptyWaitSet = lock.newCondition();
        // 5. 容量
        private int capcity;
    
        public BlockingQueue(int capcity) {
            this.capcity = capcity;
        }
    
        // 带超时的任务获取  (因为一直等待需要一直循环,太消耗CPU了)
        public T poll (long timeout, TimeUnit unit) {
            lock.lock();
            try {
                // 将timeout统一转换为纳秒
                long nanos = unit.toNanos(timeout);
                while (queue.isEmpty()){
                    try {
                        if (nanos <= 0){
                            return null;
                        }
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T t = queue.removeFirst();
                fullWaitSet.signal();
                return t;
            }finally {
                lock.unlock();
            }
        }
    
    
        // 添加任务,如果任务队列满了,就await,死等,直到队列有位置为止(拒绝策略之一)
        public void put (T task){
            lock.lock();
            try {
                while (queue.size() == capcity){
                    try {
                        System.out.println("等待加入任务队列"+task);
                        fullWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("加入任务队列 "+task);
                queue.addLast(task);
                emptyWaitSet.signal();
            }finally {
                lock.unlock();
            }
        }
        // 添加任务,超时添加,如果超过规定时间,任务还不能添加到任务队列里,就返回false(拒绝策略之一)
        public boolean offer(T task, long timeout, TimeUnit timeUnit){   // 在设置时间内添加成功就返回true,否则返回false
            lock.lock();
            try {
                long nanos = timeUnit.toNanos(timeout);
                while (queue.size() == capcity){
                    try {
                        System.out.println("等待加入任务队列"+task);
                        if (nanos <= 0){
                            return false;
                        }
                        nanos = fullWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("加入任务队列 "+task);
                queue.addLast(task);
                emptyWaitSet.signal();
                return true;
            }finally {
                lock.unlock();
            }
        }
    
        /**
         * 重点  如果任务队列已满,仍有任务添加队列,则采取拒绝策略
         * @param rejectPolicy
         * @param task
         */
        public void tryPut(RejectPolicy<T> rejectPolicy, T task){
            lock.lock();
            try {
                // 判断队列是否已满
                if (queue.size() == capcity){   // 已满的话,就采取拒绝策略,拒绝策略由调用者定义并以参数形式传入(上面那个put,死等也是一种策略)
                    rejectPolicy.reject(this,task);
                }else {     // 有空闲
                    System.out.println("加入任务队列 "+task);
                    queue.addLast(task);
                    emptyWaitSet.signal();
                }
            }finally {
                lock.unlock();
            }
        }
    
    
    
        // 获取大小
        public int size() {
            lock.lock();
            try {
                return queue.size();
            }finally {
                lock.unlock();
            }
        }
    }
    
    /**
     *  定义一个线程池类
     */
    class ThreadPoll{
        // 阻塞任务队列
        private BlockingQueue<Runnable> taskQueue;
        // 线程池里的线程集合
        private HashSet<Worker> workers = new HashSet<>();   // 核心
        // 核心线程数
        private int coreSize;
        // 获取任务的超时时间
        private long timeout;
        // 时间单位
        private TimeUnit timeUnit;
        // 拒绝策略
        private RejectPolicy<Runnable> rejectPolicy;
    
        public ThreadPoll(int coreSize, long timeout, TimeUnit timeUnit, int capcity, RejectPolicy<Runnable> rejectPolicy) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.timeUnit = timeUnit;
            this.taskQueue = new BlockingQueue<>(capcity);
            this.rejectPolicy = rejectPolicy;
        }
    
        // 执行任务 传一个Runnable的任务参数
        public void execute(Runnable task) {
            /**
             * 重要
             * 1. 当线程池里的任务线程数(workers)没有超过 coresize 时, 就新增一个workrt去执行, 并将woker加入到workers中
             * 2. 如果 线程池里的任务线程数(workers)超过了 最大线程数 , 就将任务加入任务队列暂存
             */
            synchronized (workers) {        // 因为这里线程集合(workers)是一个共享对象,为了保护该对象,我们为其上锁
                if (workers.size() < coreSize) {    // 情况1
                    Worker worker = new Worker(task);       // 就新增一个workrt去执行
                    System.out.println("新增的worker和task---------------"+worker+"-------"+task);
                    workers.add(worker);                    // 将woker加入到workers中
                    worker.start();                         // workrt去执行任务(task)
                } else {                            // 情况2
                    taskQueue.tryPut(rejectPolicy, task);
                }
            }
        }
    
    
        /**
         * 线程池里的线程类
         */
        class Worker extends Thread{
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                /**
                 * 执行任务
                 * 1. 先执行构造时传过来的task
                 * 2. 当前面的task执行完毕, 再接着从任务队列获取任务执行,直到任务队列为空
                 * 3. 执行完所有的task后,将该线程(worker)从workers中移除
                 */
                while (task != null ||  (task=taskQueue.poll(timeout,timeUnit))!=null ){  // ’task != null‘判断的是第一步,’(task=taskQueue.take())!=null‘判断的是第二步
                    try {
                        System.out.println("正在执行---"+task);
                        task.run();
                    } catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        task = null;
                    }
                }
                synchronized (workers) {
                    System.out.println("worker被移除"+this);
                    workers.remove(this);
                }
            }
        }
    }
    
    展开全文
  • 自定义线程池

    2020-08-12 22:13:34
    2.自定义线程池 自定义的线程池由三个部分组成: 线程池 阻塞队列 任务生产者 3.代码 结合视频学习:视频链接 下面是源码 package HighConcurrency.ThreadPool; import java.util.ArrayDeque; import java....

    1.为什么要设计线程池

    • 创建线程和销毁线程的花销是很大的,这些时间有可能比处理业务的时间还要长。这样频繁的创建线程和销毁线程,再加上业务工作线程,消耗系统资源的时间,可能导致系统资源不足

    2.自定义线程池

    • 自定义的线程池由三个部分组成:
      • 线程池
      • 阻塞队列
      • 任务生产者
        在这里插入图片描述

    3.代码

    package HighConcurrency.ThreadPool;
    
    import java.util.ArrayDeque;
    import java.util.Deque;
    import java.util.HashSet;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    
    public class ThreadPool {
        public static void main(String[] args) throws InterruptedException {
        	//设置线程池数量为2
            TestPool testPool = new TestPool(2, 1000, TimeUnit.MILLISECONDS,10);
            //开启五个任务
            for(int i = 0 ; i < 5 ; i++){
                int j = i;
                testPool.excute(()->{
                    System.out.println(j);
                });
            }
        }
    }
    
    //BlockingQueue是任务队列
    class BlockingQueue<T>{
    
        //1.任务队列(使用双向链表)
        private Deque<T> queue = new ArrayDeque<>();
    
        /**
         * 线程池都要去从队列中获取任务,但是只能有一个获取成功,所以需要加锁
         * 队列头和队列尾都需要加锁
         */
        //2.锁
        private ReentrantLock lock = new ReentrantLock();
    
        /**
         * 条件变量:
         * 1.当任务队列中没有任务时,线程需要在waitSet中等待
         * 2.当任务队列满了时,生成者需要一个条件变量进入等待状态
         */
    
        //3.生产者条件变量
        private Condition fullWaitSet = lock.newCondition();
    
        //4.消费者条件变量
        private Condition emptyWaitSet = lock.newCondition();
    
        //5.容量
        private int capcity;
    
        public BlockingQueue(int capcity){
            this.capcity = capcity;
        }
    
        /**
         * 以上是五个属性
         * 下面是几个方法
         */
    
        //I.阻塞获取(指线程从阻塞队列中拉取任务)
        public T take(){
            //获取时需要加锁
            lock.lock();
            try{
                //判断队列是否为空
                while(queue.isEmpty()){
                    try{
                        //如果队列为空,那么就进入等待状态
                        emptyWaitSet.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //如果队列不为空,那么将队列中的第一个任务出队,并返回该任务
                T t = queue.removeFirst();
                //这里是为了唤醒fullWaitSet.await()
                //因为现在拉取了一个任务,队列不为满
                fullWaitSet.signal();
                return t;
            }finally {
                //释放锁
                lock.unlock();
            }
        }
    
        //II.阻塞添加(指生产者往阻塞队列中添加任务)
        public void put(T element) throws InterruptedException {
            //任务入队也需要加锁
            lock.lock();
            try{
                //判断队列是否为满
                while(queue.size() == capcity){
                    try{
                        //如果队列已满,则进入等待状态
                        fullWaitSet.await();
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
                //如果队列不满,那么将任务添加到队列中
                queue.addLast(element);
                //这里是为了唤醒emptyWaitSet.await()
                //因为现在任务队列中有任务了,线程池可以由等待状态进入唤醒状态,从任务队列中拉取任务
                emptyWaitSet.signal();
            }finally {
                //释放锁
                lock.unlock();
            }
    
        }
    
        //III.获取大小(获取阻塞队列的大小)
        public int size(){
            lock.lock();
            try{
                return queue.size();
            }finally {
                lock.unlock();
            }
        }
    
        //IIII.带超时的阻塞获取
        //因为前面的await方法会一直等待
        //这里涉及一个带超时的方法
        //参数:timeout是时间参数,unit可以进行时间的转换
        public T poll(long timeout, TimeUnit unit){
            //这里将超时时间timeout统一转换为纳秒
            long nanos = unit.toNanos(timeout);
    
            //获取时需要加锁
            lock.lock();
            try{
                //判断队列是否为空
                while(queue.isEmpty()){
                    try{
                        //如果nanos纳秒时间过去了,还没等到那么就返回null
                        //说明队列中没有任务可执行
                        if(nanos <= 0)
                            return null;
                        //如果队列为空
                        //这里不需要一直等
                        //这里返回的是剩余时间,重新复制给nonos,实现对nanos的更新
                        nanos = emptyWaitSet.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                //如果队列不为空,那么将队列中的第一个任务出队,并返回该任务
                T t = queue.removeFirst();
                //这里是为了唤醒fullWaitSet.await()
                //因为现在拉取了一个任务,队列不为满
                fullWaitSet.signal();
                return t;
            }finally {
                //释放锁
                lock.unlock();
            }
        }
    }
    
    
    //实现线程池
    class TestPool{
        //任务队列taskQueue
        private BlockingQueue<Runnable> taskQueue;
    
        //线程集合
        //这里泛型类型不使用Thread,使用包装的Worker类型
        private HashSet<Worker> workers = new HashSet<>();
    
        //核心线程数
        private int coreSize;
    
        //获取任务的超时时间
        private long timeout;
    
        private TimeUnit timeUnit;
    
        //这里包装成Worker内部类
        class Worker extends Thread{
            private Runnable task;
            //构造方法
            public Worker(Runnable task){
                this.task = task;
            }
    
            @Override
            public void run() {
                //执行任务
                //1.当task不为空时,执行任务
                //2.当task执行完毕,从任务队列获取任务
                //(task = taskQueue.take()) != null)的目的是当任务执行完毕,从任务队列中获取
                while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){
                    try{
                        task.run();
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        task = null;
                    }
                }
                //将当前worker对象从workers中移除
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    
        //构造方法
        public TestPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapcity) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.timeUnit = timeUnit;
            this.taskQueue = new BlockingQueue<>(queueCapcity);
        }
    
        //执行任务
        public  void excute(Runnable task) throws InterruptedException {
            //workers属于共享资源,需要加锁
            synchronized (workers){
                //当任务数没有超过coreSize时,直接交给Worker执行
                //当超过coreSize时,加入任务队列暂存起来
                if(workers.size() < coreSize){
                    Worker worker = new Worker(task);
                    //将新创建的线程加入线程集合中
                    workers.add(worker);
                    //线程执行任务
                    worker.start();
                }else{
                    //如果任务数超过核心线程数
                    //进入任务队列
                    taskQueue.put(task);
                }
            }
        }
    }
    
    展开全文
  • Java自定义线程池

    千次阅读 2020-06-29 20:48:54
    学习自定义线程池之前大家应该先学习设计模式-享元模式 下面这张图就是自定义线程池原理: ThreadPool类就是我们的线程池; BlockQuene类是我们定义的阻塞队列,当线程池满了,我们把来的任务先放到阻塞队列中; ...

    学习自定义线程池之前大家应该先学习设计模式-享元模式

    • 下面这张图就是自定义线程池原理:
    1. ThreadPool类就是我们的线程池;
    2. BlockQuene类是我们定义的阻塞队列,当线程池满了,我们把来的任务先放到阻塞队列中;
    3. main就是我们客户也就是生产者,那线程池就是消费者;
      在这里插入图片描述
      在这里插入图片描述

    ThreadPool类

    ThreadPool里面有内部类Worker,它是线程池中装任务的容器,当任务来了就放到这个里面。
    线程池当然还要维护线程池的最大任务个数,当线程池中有空的worker那么就从阻塞队列blockqueue中拿取,当线程池满了,来的任务就放入阻塞队列。
    excuteTask方法中当线程池已经满了,那么可以选择这时是放入阻塞队列中,当时else中我们有注释,我们利用策略模式思想,可以选择当阻塞队列满了,我们选择怎么办,可以抛出异常,或者直接用我们写好的put死等,或者超时等待tryput方法。

    class ThreadPool{
        //核心线程数
        private int coreSize;
        //获取任务的超时时间
        private long timeout;
    
        private TimeUnit unit;
        //任务队列
        private BlockQuene<Runnable> taskqueue;
        //线程集合
        private HashSet<Worker> workers=new HashSet<>();
    
        private RejectPolicy<Runnable> rejectPolicy;
    
        public ThreadPool(int coreSize, long timeout, TimeUnit unit,int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {
            this.coreSize = coreSize;
            this.timeout = timeout;
            this.unit = unit;
            this.taskqueue=new BlockQuene<>(queueCapcity);
            this.rejectPolicy=rejectPolicy;
        }
    
        //执行任务
        public void excuteTask(Runnable task){
            //来一个任务,如果不超过线程池的核心线程数,则加入到线程池进行工作,如果此时池中线程数超过核心线程数,则加入阻塞等待队列。
            ReentrantLock lock=new ReentrantLock();
            lock.lock();
            try{
                if (workers.size() < coreSize){
                    System.out.println("增加任务");
                    Worker worker=new Worker(task);
                    workers.add(worker);
                    worker.start();
                }else {
                    System.out.println("加入任务队列一个");
                    //taskqueue.put(task);
                    //当我们的阻塞队列满了,这时我们能够采取的措施
                    //1.死等wait,无参添加队列方法的实现
                    //2.添加入队列方法,加上时间参数,超时无法添加则放弃添加
                    //3.让调用者放弃执行
                    //4.让调用者抛出异常
                    //5.让调用者自己执行任务
                    //各种选择
                    taskqueue.tryput(rejectPolicy,task);//超时等待方法
                }
            }finally {
                lock.unlock();
            }
        }
    
    
        class Worker extends Thread{
            private Runnable task;
    
            public Worker(Runnable task) {
                this.task = task;
            }
    
            @Override
            public void run() {
                //while保证worker能不断的不停执行
                //当task里面有任务则执行,没有任务则从阻塞队列里面获取
                //执行完worker里的任务则讲task重新置为null为了下一次放任务
                while (task != null || (task = taskqueue.tack())!=null){
                    try{
                        task.run();
                    }finally {
                        task=null;
                    }
                }
                synchronized (this){
                    workers.remove(this);
                }
            }
        }
    }
    

    BlockQuene类

    阻塞队列,当生产者main提供任务,线程池已经满了,那么就先将任务放入阻塞队列中,当线程池有空位,那么从阻塞队列中拿任务。

    class BlockQuene<T>{//阻塞队列
        //锁
        private ReentrantLock lock =new ReentrantLock();
        //任务队列
        private Deque<T> queue=new ArrayDeque<>();
        //生产者状态main
        private Condition fullwaitset=lock.newCondition();
        //消费者状态threadpool
        private Condition emptywaitset=lock.newCondition();
        //容量
        private int capacity;
    
        public BlockQuene(int capacity) {
            this.capacity = capacity;
        }
    
        //阻塞获取
        public T tack(){
            lock.lock();
            try {
                while (queue.isEmpty()){
                    try {
                        emptywaitset.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T element=queue.removeFirst();
                fullwaitset.signal();//当阻塞队列挪走一个线程则唤醒满阻塞队列中的一个线程
                return element;
            }finally {
                lock.unlock();
            }
        }
        //超时获取方法,就是take的超时实现
        public T poll(long timeout, TimeUnit unit){
            lock.lock();
            try{
                long nanos=unit.toNanos(timeout);
                while (queue.isEmpty()){
                    try {
                        nanos=emptywaitset.awaitNanos(nanos);//返回的是剩余时间
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                T element=queue.removeFirst();
                fullwaitset.signal();//当阻塞队列挪走一个线程则唤醒满监控中的一个线程
                return element;
            } finally {
                lock.unlock();
            }
        }
    
    
    
        //阻塞添加,死等方法,阻塞队列满时就死等
        public void put(T element){
            lock.lock();
            try {
                while (queue.size() >= capacity){
                    try {
                        fullwaitset.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.addLast(element);
                emptywaitset.signal();//当不为空则告诉消费者里有线程
            }finally {
                lock.unlock();
            }
        }
        //超时等待放入,阻塞队列满,则等待参数时间,参数时间不能放入放回flase
        public boolean put(T task,long timeout,TimeUnit unit){
            lock.lock();
            try {
                long nanos = unit.toNanos(timeout);
                while (queue.size() >= capacity){//超过了阻塞队列容量
                    if (nanos <= 0){//超时
                        return false;
                    }
                    try {
                        nanos= fullwaitset.awaitNanos(nanos);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                queue.addLast(task);
                emptywaitset.signal();
                return true;
            } finally {
                lock.unlock();
            }
        }
    
        //获取大小
        public int getCapacity(){
            lock.lock();
            try{
                return queue.size();
            }finally {
                lock.unlock();
            }
        }
    
        public void tryput(RejectPolicy rejectPolicy,T task) {
            lock.lock();
            try {
                if (queue.size()>=capacity){
                    rejectPolicy.reject(this,task);
                }else {
                    queue.addLast(task);
                    emptywaitset.signal();
                }
            } finally {
                lock.unlock();
            }
        }
    }
    

    用策略模式思想来实现当阻塞队列满时不同实现方法,这是策略接口

    interface RejectPolicy<T>{
        void reject(BlockQuene<T> quene,T task);
    }
    

    main

    public class TestPool1 {
        public static void main(String[] args) {
            ThreadPool threadPool=new ThreadPool(2,1000,TimeUnit.MILLISECONDS,10, ((quene, task) -> {
                quene.put(task);
                }
            ));
            for (int i = 0; i <5; i++) {
                threadPool.excuteTask(()->{
                    System.out.println("nihao");
                });
            }
        }
    }
    
    展开全文
  • java自定义线程池ThreadPoolExecutor java线程获取结果Callable、Future、FutureTask 理解 Thread.Sleep 函数 自定义创建线程池 在我的文章 Java线程池的使用与分析 里也讲到到线程池的各个概念,今天我们...

    java自定义线程池ThreadPoolExecutor

    java线程获取结果Callable、Future、FutureTask

    理解 Thread.Sleep 函数

     

    自定义创建线程池    

              在我的文章  Java线程池的使用与分析  里也讲到到线程池的各个概念,今天我们就来实践一下,自定义一个线程池。

    一、我们回顾一下在 Java线程池的使用与分析 里讲到的关于线程池 ThreadPoolExecutor 的知识点

    ThreadPoolExecutor。它提供了好几个构造方法,但是最底层的构造方法却只有一个。那么,我们就从这个构造方法着手分析。 

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler);

    这个构造方法有7个参数,我们逐一来进行分析。

      1、corePoolSize线程池中的核心线程数

      2、maximumPoolSize线程池中的最大线程数

      3、keepAliveTime空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁

      4、unit空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等

      5、workQueue等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象

      6、threadFactory线程工厂,我们可以使用它来创建一个线程

      7、handler拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理

    这些参数里面,基本类型的参数都比较简单,我们不做进一步的分析。我们更关心的是workQueuethreadFactoryhandler,接下来我们将进一步分析。

        1. 等待队列-workQueue

           等待队列是BlockingQueue类型的,理论上只要是它的子类,我们都可以用来作为等待队列。同时,jdk内部自带一些阻塞队列,我们来看看大概有哪些。

      1)ArrayBlockingQueue,队列是有界的,基于数组实现的阻塞队列

      2)LinkedBlockingQueue,队列可以有界,也可以无界。基于链表实现的阻塞队列

      3)SynchronousQueue,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()的默认队列

      4)PriorityBlockingQueue,带优先级的无界阻塞队列

         通常情况下,我们需要指定阻塞队列的上界(比如1024)。另外,如果执行的任务很多,我们可能需要将任务进行分类,然后将不同分类的任务放到不同的线程池中执行。

       2. 线程工厂-threadFactory

       ThreadFactory是一个接口,只有一个方法。既然是线程工厂,那么我们就可以用它生产一个线程对象。来看看这个接口的定义。

    public interface ThreadFactory {
    
        /**
         * Constructs a new {@code Thread}.  Implementations may also initialize
         * priority, name, daemon status, {@code ThreadGroup}, etc.
         *
         * @param r a runnable to be executed by new thread instance
         * @return constructed thread, or {@code null} if the request to
         *         create a thread is rejected
         */
        Thread newThread(Runnable r);
    }

       Executors的实现使用了默认的线程工厂-DefaultThreadFactory。它的实现主要用于创建一个线程,线程的名字为pool-{poolNum}-thread-{threadNum} 

    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

    很多时候,我们需要自定义线程名字。我们只需要自己实现ThreadFactory,用于创建特定场景的线程即可

    3、拒绝策略-handler

        所谓拒绝策略,就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他...

    jdk自带4种拒绝策略:

     1)CallerRunsPolicy // 在调用者线程执行

     2)AbortPolicy // 直接抛出RejectedExecutionException异常

     3)DiscardPolicy // 任务直接丢弃,不做任何处理

     4)DiscardOldestPolicy // 丢弃队列里最旧的那个任务,再尝试执行当前任务

           这四种策略各有优劣,比较常用的是DiscardPolicy,但是这种策略有一个弊端就是任务执行的轨迹不会被记录下来。所以,我们往往需要实现自定义的拒绝策略, 通过实现RejectedExecutionHandler接口的方式

    4、提交任务的几种方式

         往线程池中提交任务,主要有两种方法,execute()submit()

      execute()用于提交不需要返回结果的任务,我们看一个例子。

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.execute(() -> System.out.println("hello"));
    }
    

      submit()用于提交一个需要返回果的任务。该方法返回一个Future对象,通过调用这个对象的get()方法,我们就能获得返回结果。get()方法会一直阻塞,直到返回结果返回。另外,我们也可以使用它的重载方法get(long timeout, TimeUnit unit),这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Future<Long> future = executor.submit(() -> {
            System.out.println("task is executed");
            return System.currentTimeMillis();
        });
        System.out.println("task execute time is: " + future.get());
    }

    5、关闭线程池

          在线程池使用完成之后,我们需要对线程池中的资源进行释放操作,这就涉及到关闭功能。我们可以调用线程池对象的shutdown()shutdownNow()方法来关闭线程池。

    这两个方法都是关闭操作,又有什么不同呢?

      1、shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。

      2、shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。

    另外,关闭线程池涉及到两个返回boolean的方法,isShutdown()isTerminated(),分别表示是否关闭和是否终止。

     

    二、开始自定义线程池

         1、自定义线程池的原因:

              1)无长度限制的队列,可能因为任务堆积导致OOM

              2)自定义拒绝策略

              3)根据服务器配置,去设置更合理、更高效的 线程池参数,使程序更健壮

        2、写个demo,自定义线程池并且自定义拒绝策略

              从上面可以看到默认提供的四种策略似乎都不太友好,要么放弃要么抛异常,而直接在调用者线程中执行或许也不是你想要的,因为它破坏了线程的执行顺序。

              有时候我们需要保证任务添加不会失败,并且只要被添加的任务能依次顺序执行就好了,而不需要这个添加动作立即响应,即让线程池等待池中的任务完成后再继续添加新任务,此时JDK提供的四种策略无法满足需求,需要自定义拒绝策略

             废话了这么多,现在直接上菜:

    package com.montnets.task;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 自定义阻塞型线程池 当池满时会阻塞任务提交
     * 
     */
    public class BlockThreadPool {
    
        private ThreadPoolExecutor pool = null;
        
        public BlockThreadPool(int poolSize) {
            //初始化线程池
            pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(),
                    new CustomRejectedExecutionHandler());
        }
        
        //销毁线程池
        public void destory() {
            if (pool != null) {
                pool.shutdownNow();
            }
        }
    
       //自定义创建线程的工厂  自定义线程名称
        private class CustomThreadFactory implements ThreadFactory {
            private AtomicInteger count = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                String threadName = BlockThreadPool.class.getSimpleName() + count.addAndGet(1);
                t.setName(threadName);
                return t;
            }
        }
    
        //自定义拒绝策略  如果队列已满 就堵塞继续等待 直到队列有空闲
        private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    // 核心改造点,由blockingqueue的offer改成put阻塞方法
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        //开始执行
        public void execute(Runnable runnable) {
            this.pool.execute(runnable);
        }
    
        // 测试构造的线程池
        public static void main(String[] args) {
            BlockThreadPool pool = new BlockThreadPool(3);
            for (int i = 1; i < 100; i++) {
                System.out.println("提交第" + i + "个任务!");
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println(Thread.currentThread().getId() + "=====开始");
                            TimeUnit.SECONDS.sleep(10);
                            System.out.println(Thread.currentThread().getId() + "=====【结束】");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                System.out.println("【提交第" + i + "个任务成功!】");
            }
    
            // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
            // exec.destory();
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    注意:

            这里的例子是   当核心池和缓存队列满了之后外部再调用execute时就会阻塞住,一直等到池里某个任务完成后释放出空闲线程以后,再将该任务添加到缓存队列,而不会抛异常或丢弃该任务。

    适用于一些定时扫描触发任务类场景

    展开全文
  • 自定义线程池总结

    2020-11-05 16:11:51
    自定义线程池总结 一、四种线程池 Java通过Executors提供四种线程池,分别为: 1、newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, ...
  • 如何自定义线程池

    千次阅读 2018-09-04 14:33:37
    一,要自定义线程池,需要使用的ThreadPoolExecutor类 .ThreadPoolExecutor类的构造方法: public ThreadPoolExecutor(int coreSize,int maxSize,long KeepAliveTime,TimeUnit unit,BlockingQueue queue,...
  • JDK1.5中引入了强大的concurrent包,其中最常用的莫过了线程池的实现ThreadPoolExecutor,它给我们带来了极大的方便,但同时,对于该线程池不恰当的设置也可能使其效率并不能达到预期的效果,甚至仅相当于或低于单...
  • Spring Boot-自定义线程池

    千次阅读 2019-03-16 14:29:29
    本文的核心内容:线程池核心参数,Spring Boot 自定义线程池,获取异步线程执行结果。
  • java有预置线程池:newSingleThreadExecutor,newFixedThreadPool...如果不适合,还可以使用ThreadPoolExecutor创建自定义线程池。主要构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
  • 自定义线程池需要了解四大方法、七大参数、四种拒绝策略 1、常用的创建线程池的四大方法 (1)创建单个线程的线程池:Executors.newSingleThreadExecutor(); //源码 public static ExecutorService ...
  • 自定义线程池ThreadPoolExecutor

    多人点赞 2020-11-28 15:34:29
    优雅使用线程池ThreadPoolExecutor实现自定义 一.引言 线程池想必大家也都用过,JDK的Executors 也自带一些线程池。但是不知道大家有没有想过,如何才是最优雅的方式去使用过线程池吗? 生产环境要怎么去配置自己的...
  • 7大参数自定义线程池

    2021-06-16 17:48:30
    在实际开发中,经常会用到的就是自定义线程池,通过new ThreadPoolExecutor(…),并添加需要的参数和设置需要的值,以下是7大参数的使用方法: 1.corePoolSize:设置核心线程数量。 2.maximumPoolSize:设置最大的...
  • 在程序运行时,频繁的创建线程, 销毁线程是十分损耗性能的,这就引出了线程池技术: 线程池在系统启动时就会创建一些线程, 当有任务到达时,线程池分配一个线程处理该任务 ,处理完后,该线程并不会死亡,而是由...
  • 多线程-自定义线程池

    2018-10-26 14:01:39
    java-自定义线程池 当我们使用 线程池的时候,可以使用 newCachedThreadPool()或者 newFixedThreadPool(int)等方法,其实我们深入到这些方法里面,就可以看到它们的是实现方式是这样的。 1 public static ...
  • 先说一下线程池底层的数据结构 队列:一种线性表,它的特性是先进先出,插入在一端,删除在另一端。 队列又分为阻塞队列BlockingQueue和非阻塞队列ConcurrentLinkedQueue   生产者生产元素插入队列,消费者消费...
  • 一、自定义线程池ThreadPoolExecutor 上一节我们演示了利用Executors创建了三种类型的线程池,但是实际生产环境中,我们是不会用这种方式创建线程池的,主要原因就是这些线程池用的阻塞队列没有限制容量,容易引发...
  • 这里提到了使用ThreadPoolExecutor来创建线程池,其实这就是我们常常听说的“自定义线程池”。该类位于java的java.util.concurrent包下。至于使用自定义的好处,上面的图片已经可以说明,接下来我们从概念到实战,...
  • Java线程池实现原理之自定义线程池(一) 1....谈到多线程先讲下队列的概念,之后的多线程学习会用到此类知识。...阻塞式队列超出总数会进入等待(等待时间=设置超时时间)。 3.获...
  • 包括其他几种不同类型的线程池,其实都是通过 ThreadPoolExecutor这个核心类来创建的,如果我们要自定义线程池,那么也是通过这个类来实现的   public ThreadPoolExecutor(int corePoolSize, int maximumPoo...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 23,190
精华内容 9,276
关键字:

自定义线程池设置超时时间