精华内容
下载资源
问答
  • 线程阻塞与唤醒

    千次阅读 2017-07-17 16:44:20
    线程阻塞与唤醒的方法如图: package newThread;import java.util.Scanner;public class Twothread implements Runnable { private int i; @Override public void run() { //run方法同样是线程执行体 for(;i...

    线程阻塞与唤醒的方法如图:
    这里写图片描述

    package newThread;
    
    import java.util.Scanner;
    
    public class Twothread implements Runnable {
        private int i;
        @Override
        public void run() {
            //run方法同样是线程执行体
            for(;i<10;i++) {
                System.out.println(Thread.currentThread().getName()+" "+i);
            }
        }
        public static void main(String[] args) {
            for(int i=0;i<5;i++) {
                System.out.println(Thread.currentThread().getName()+""+i);
                if(i==1) {
                    Twothread th=new Twothread();
                    //通过new Thread(target,name)创建新线程
                    new Thread(th,"新线程1").start();//开启第一条线程
                    /*①通过sleep()阻塞线程
                     *try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }*/
                    //②通过I/O阻塞线程
                    Scanner scanner=new Scanner(System.in);
                    //③通过thread的suspend挂起进程
                    //suspend();
                    new Thread(th,"新线程2").start();
                    //开启线程一定要用start(),不能用run(),如果调用run()方法会被当作普通方法不会被看作线程执行。
                }
            }
        }
    
    }
    

    通过①②③方法既可实现及时中断主线程,在i=1时马上进入子线程而不是由底端运行机制
    根据调度策略随机调度。这里写图片描述

    此外yield()方法会让进程从运行状态进入就绪状态

    展开全文
  • Java 线程阻塞、中断及优雅退出

    千次阅读 2018-07-18 20:39:44
    本文转自:Java 线程阻塞、中断及优雅退出 线程阻塞 一个线程进入阻塞状态的原因可能如下(已排除Deprecated方法): sleep() sleep()使当前线程进入停滞状态(阻塞当前线程),让出CUP的使用、目的是不让当前...

    本文转自:Java 线程阻塞、中断及优雅退出

    一、线程阻塞

    一个线程进入阻塞状态的原因可能如下(已排除Deprecated方法):

    sleep()

    sleep()使当前线程进入停滞状态(阻塞当前线程),让出CUP的使用、目的是不让当前线程独自霸占该进程所获的CPU资源,以留一定时间给其他线程执行的机会;

    当在一个Synchronized块中调用Sleep()方法是,线程虽然休眠了,但是对象锁并没有被释放,其他线程无法访问这个对象(即使睡着也持有对象锁)。

    wait()

    调用wait()/1.5中的condition.await()使线程挂起,直到线程获取notify()/notifyAll()消息,(或者在Java SE5中java.util.concurrent类库中等价的signal()/signalAll()消息),线程才会进入就绪状态;

    wait()调用会释放当前对象锁(monitor),这样其他线程可以继续进入对象的同步方法。参见上一篇文章线程间协作——wait & notify & notifyAll

    另外,调用join()也会导致线程阻塞,因为源码中join()就是通过wait()实现的;

    等待I/O

    class Demo3 implements Runnable throws InterruptedException{
         private InputStream in;
         public void run(){
              in.read();
         }
    }

    无法持有锁进入同步代码

    进入同步代码前无法获取锁,比如试图调用synchronized方法,或者显示锁对象的上锁行为ReentrantLock.lock(),而对应锁已被其他线程获取的情况下都将导致线程进入阻塞状态;

    注意:yield()并不会导致线程转到等待/睡眠/阻塞状态。在大多数情况下,yield()将导致线程从运行状态转到可运行状态,但有可能没有效果。

    二、线程中断

    线程中断可以在线程内部设置一个中断标识,同时让处于(可中断)阻塞的线程抛出InterruptedException中断异常,使线程跳出阻塞状态。相比其他语言,Java线程中断比较特殊,经常会引起开发人员的误解。因为中断听起来高深复杂,实质原理上非常简单。

    中断原理

    Java中断机制是一种协作机制,也就是说通过中断并不能直接终止另一个线程,而需要被中断的线程自己处理中断。这好比是家里的父母叮嘱在外的子女要注意身体,但子女是否注意身体,怎么注意身体则完全取决于自己。

    Java中断模型也是这么简单,每个线程对象里都有一个boolean类型的标识(不一定就要是Thread类的字段,实际上也的确不是,这几个方法最终都是通过native方法来完成的),代表着是否有中断请求(该请求可以来自所有线程,包括被中断的线程本身)。例如,当线程t1想中断线程t2,只需要在线程t1中将线程t2对象的中断标识置为true,然后线程2可以选择在合适的时候处理该中断请求,甚至可以不理会该请求,就像这个线程没有被中断一样。

    中断相关的方法

    方法解释
    public static boolean interrupted()测试当前线程是否已经中断。线程的中断状态 由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外)
    public boolean isInterrupted()测试线程是否已经中断。线程的中断状态不受该方法的影响
    public void interrupt()中断线程,设置中断标识为为true

    其中,interrupt方法是唯一能将中断状态设置为true的方法。静态方法interrupted会将当前线程的中断状态清除,但这个方法的命名极不直观,很容易造成误解,需要特别注意。

    此外,类库中的有些类的方法也可能会调用中断,如FutureTask中的cancel方法,如果传入的参数为true,它将会在正在运行异步任务的线程上调用interrupt方法,如果正在执行的异步任务中的代码没有对中断做出响应,那么cancel方法中的参数将不会起到什么效果;

    ExecutorService exec = Executors.newCachedThreadPool();
    Futrue<?> f = exec.submit(new TaskThread());
    f.interrupt();

    又如ThreadPoolExecutor中的shutdownNow方法会遍历线程池中的工作线程并调用线程的interrupt方法来中断线程,所以如果工作线程中正在执行的任务没有对中断做出响应,任务将一直执行直到正常结束。

    ExecutorService exec = Executors.newCachedThreadPool();
    for(int i=0;i<5;i++)
         exec.execute(new TaskThread())
    exec.shutdownNow();
    

    中断的处理 - 处理 InterruptedException

    如果抛出 InterruptedException 意味着一个方法是阻塞方法,那么调用一个阻塞方法则意味着您的方法也是一个阻塞方法,而且您应该有某种策略来处理 InterruptedException。通常最容易的策略是自己抛出 InterruptedException,如清单 1 中 putTask() 和 getTask() 方法中的代码所示。 这样做可以使方法对中断作出响应,并且只需将 InterruptedException 添加到 throws 子句。

    清单 1. 不捕捉 InterruptedException,将它传播给调用者

    public class TaskQueue {
        private static final int MAX_TASKS = 1000;
    
        private BlockingQueue<Task> queue 
            = new LinkedBlockingQueue<Task>(MAX_TASKS);
    
        public void putTask(Task r) throws InterruptedException { 
            queue.put(r);
        }
    
        public Task getTask() throws InterruptedException { 
            return queue.take();
        }
    }

    有时候需要在传播异常之前进行一些清理工作。在这种情况下,可以捕捉 InterruptedException,执行清理,然后抛出异常。清单 2 演示了这种技术,该代码是用于匹配在线游戏服务中的玩家的一种机制。 matchPlayers() 方法等待两个玩家到来,然后开始一个新游戏。如果在一个玩家已到来,但是另一个玩家仍未到来之际该方法被中断,那么它会将那个玩家放回队列中,然后重新抛出 InterruptedException,这样那个玩家对游戏的请求就不至于丢失。

    清单 2. 在重新抛出 InterruptedException 之前执行特定于任务的清理工作

    public class PlayerMatcher {
        private PlayerSource players;
    
        public PlayerMatcher(PlayerSource players) { 
            this.players = players; 
        }
    
        public void matchPlayers() throws InterruptedException { 
            try {
                 Player playerOne, playerTwo;
                 while (true) {
                     playerOne = playerTwo = null;
                     // Wait for two players to arrive and start a new game
                     playerOne = players.waitForPlayer(); // could throw IE
                     playerTwo = players.waitForPlayer(); // could throw IE
                     startNewGame(playerOne, playerTwo);
                 }
             }
             catch (InterruptedException e) {  
                 // If we got one player and were interrupted, put that player back
                 if (playerOne != null)
                     players.addFirst(playerOne);
                 // Then propagate the exception
                 throw e;
             }
        }
    }

    不要生吞中断

    有时候抛出 InterruptedException 并不合适,例如当由 Runnable 定义的任务调用一个可中断的方法时,就是如此。在这种情况下,不能重新抛出 InterruptedException,但是您也不想什么都不做。当一个阻塞方法检测到中断并抛出 InterruptedException 时,它清除中断状态。如果捕捉到 InterruptedException 但是不能重新抛出它,那么应该保留中断发生的证据,以便调用栈中更高层的代码能知道中断,并对中断作出响应。该任务可以通过调用 interrupt() 以 “重新中断” 当前线程来完成,如清单 3 所示。至少,每当捕捉到 InterruptedException 并且不重新抛出它时,就在返回之前重新中断当前线程。

    清单 3. 捕捉 InterruptedException 后恢复中断状态

    public class TaskRunner implements Runnable {
        private BlockingQueue<Task> queue;
    
        public TaskRunner(BlockingQueue<Task> queue) { 
            this.queue = queue; 
        }
    
        public void run() { 
            try {
                 while (true) {
                     Task task = queue.take(10, TimeUnit.SECONDS);
                     task.execute();
                 }
             }
             catch (InterruptedException e) { 
                 // Restore the interrupted status
                 Thread.currentThread().interrupt();
             }
        }
    }

    当线程处于阻塞状态时,中断线程,抛出InterruptedException,此时线程并未中断,而是再次唤醒,所以需要在异常中再次设置标志位进行中断操作。(举个例子就是:你在睡觉,有人把你叫醒去做其他事,你醒后脑袋懵的,不知道做什么,需要人再次叫你去做相应的事情)

    处理 InterruptedException 时采取的最糟糕的做法是生吞它 —— 捕捉它,然后既不重新抛出它,也不重新断言线程的中断状态。对于不知如何处理的异常,最标准的处理方法是捕捉它,然后记录下它,但是这种方法仍然无异于生吞中断,因为调用栈中更高层的代码还是无法获得关于该异常的信息。(仅仅记录 InterruptedException 也不是明智的做法,因为等到人来读取日志的时候,再来对它作出处理就为时已晚了。) 清单 4 展示了一种使用得很广泛的模式,这也是生吞中断的一种模式:

    清单 4. 生吞中断 —— 不要这么做

    // Don't do this 
    public class TaskRunner implements Runnable {
        private BlockingQueue<Task> queue;
    
        public TaskRunner(BlockingQueue<Task> queue) { 
            this.queue = queue; 
        }
    
        public void run() { 
            try {
                 while (true) {
                     Task task = queue.take(10, TimeUnit.SECONDS);
                     task.execute();
                 }
             }
             catch (InterruptedException swallowed) { 
                 /* DON'T DO THIS - RESTORE THE INTERRUPTED STATUS INSTEAD */
             }
        }
    }

    如果不能重新抛出 InterruptedException,不管您是否计划处理中断请求,仍然需要重新中断当前线程,因为一个中断请求可能有多个 “接收者”。标准线程池 (ThreadPoolExecutor)worker 线程实现负责中断,因此中断一个运行在线程池中的任务可以起到双重效果,一是取消任务,二是通知执行线程线程池正要关闭。如果任务生吞中断请求,则 worker 线程将不知道有一个被请求的中断,从而耽误应用程序或服务的关闭。

    中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断。某些线程非常重要,以至于它们应该不理会中断,而是在处理完抛出的异常之后继续执行,但是更普遍的情况是,一个线程将把中断看作一个终止请求,这种线程的run方法遵循如下形式:

    public void run() {
        try {
            ...
            /*
             * 不管循环里是否调用过线程阻塞的方法如sleep、join、wait,这里还是需要加上
             * !Thread.currentThread().isInterrupted()条件,虽然抛出异常后退出了循环,显
             * 得用阻塞的情况下是多余的,但如果调用了阻塞方法但没有阻塞时,这样会更安全、更及时。
             */
            while (!Thread.currentThread().isInterrupted()&& more work to do) {
                do more work 
            }
        } catch (InterruptedException e) {
            //线程在wait或sleep期间被中断了
        } finally {
            //线程结束前做一些清理工作
        }
    }
    

    上面是while循环在try块里,如果try在while循环里时,因该在catch块里重新设置一下中断标示,因为抛出InterruptedException异常后,中断标示位会自动清除,此时应该这样:

    public void run() {
        while (!Thread.currentThread().isInterrupted()&& more work to do) {
            try {
                ...
                sleep(delay);
                //wait(delay);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();   //重新设置中断标示
            }
        }
    }

    三、可中断阻塞与不可中断阻塞

    对于处于sleep,join等操作的线程,如果被调用interrupt()后,会抛出InterruptedException,然后线程的中断标志位会由true重置为false,因为线程为了处理异常已经重新处于就绪状态。

    不可中断的操作,包括获取锁(进入synchronized段)以及Lock.lock(),Java.io 包中的同步 I/O,Java.io 包中的同步 Socket IO (对套接字进行读取和写入的操作:InputStream 和 OutputStream 中的read和write等),Java.nio包中的Selector的异步I/O 等。调用interrupt()对于这几个问题无效,因为它们都不抛出中断异常。如果拿不到资源,它们会无限期阻塞下去。

    对于Lock.lock(),可以改用Lock.lockInterruptibly(),可被中断的加锁操作,它可以抛出中断异常。等同于等待时间无限长的Lock.tryLock(long time, TimeUnit unit)。

    对于inputStream等资源,有些(实现了interruptibleChannel接口)可以通过close()方法将资源关闭,对应的阻塞也会被放开。

    但是,你可能正使用Java1.0之前就存在的传统的I/O,Thread.interrupt()将不起作用,因为线程将不会退出被阻塞状态。

    很幸运,对于 Socket 同步 I/O,Java平台为这种情形提供了一项解决方案,即调用阻塞该线程的套接字的close()方法。在这种情形下,如果线程被I/O操作阻塞,当调用该套接字的close方法时,该线程在调用accept地方法将接收到一个SocketException(SocketException为IOException的子异常)异常,这与使用interrupt()方法引起一个InterruptedException异常被抛出非常相似。

    java.nio类库提供了更加人性化的I/O中断,被阻塞的nio通道会自动地响应中断,不需要关闭底层资源;

    对于非标准的取消操作,我们可以一些方法来对它进行封装(如通过改写interrupt方法来将它封装在 Thread 中,通过 newTaskFor 等)。

    改写 interrupt 方法封装 Socket 非标准的取消方式

    下面的例子将通过改写 interrupt 方法将 Socket 非标准的取消方式封装在 Thread 中。封装之后,调用者可以通过调用 interrupt 方法来取消Socket 操作。

    public class ReaderThread extends Thread {
        private static final int BUFSZ = 512;
        private final Socket socket;
        private final InputStream in;
    
        public ReaderThread(Socket socket) throws IOException {
            this.socket = socket;
            this.in = socket.getInputStream();
        }
    
        @Override
        public void interrupt() {
            // 先关闭套接字,再调用 interrupt
            try {
                socket.close();
            } catch (IOException ignored) {
            } finally {
                super.interrupt();
            }
        }
    
        public void run() {
            try {
                byte[] buf = new byte[BUFSZ];
                while(true) {
                    int count = in.read(buf);
                    if(count < 0) {
                        break;
                    } else if (count > 0) {
                        processBuffer(buf, count);
                    }
                }
            } catch (IOException e) {
                /* Allow thread to exit */
                interrupt();
            }
        }
    
        public void processBuffer(byte[] buf, int count) {
        }
    
    }

    采用 newTaskFor 来封装非标准的取消

    我们可以通过 newTaskFor 方法来进一步优化 ReaderThread 中封装非标准取消的技术,这是 Java 6 在 ThreadPoolExecutor 中的新增功能。当把一个 Callable 提交给 ExecutorService 时,submit 方法会返回一个 Future,我们可以通过这个 Future 来取消任务。newTaskFor 是一个工厂方法,它将创建 Future 来代表任务。newTaskFor 还能返回一个 RunnableFuture 接口,该接口扩展了 Future 和 Runnable(并由 Future 实现)。

    通过定制表示任务的 Future 可以改变 Future.cancel 的行为。例如,定制的取消代码可以实现日志记录或者手机取消操作的统计消息,以及取消一些不响应中断的操作。通改写 interrupt 方法,ReaderThread 可以取消基于套接字的线程。同样,通过改写 cancel 方法也可以实现类似的功能。

    定义可取消任务

    接口

    import java.util.concurrent.Callable;
    import java.util.concurrent.RunnableFuture;
    
    public interface CancellableTask<T> extends Callable<T> {
    
        void cancel();
    
        RunnableFuture<T> newTask();
    
    }

    实现类

    import java.io.IOException;
    import java.net.Socket;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.RunnableFuture;
    
    import com.johnfnash.learn.annotation.GuardedBy;
    
    public class SocketUsingTask<T> implements CancellableTask<T> {
        @GuardedBy("this")
        private Socket socket;
    
        public synchronized void setSocket(Socket s) {
            this.socket = s;
        }
    
        @Override
        public T call() throws Exception {
            // ......
            return null;
        }
    
        @Override
        public synchronized void cancel() {
            try {
                if(socket != null) {
                    socket.close();
                }
            } catch (IOException ignored) {
            }
        }
    
        // 改写 newTask 中返回的 RunnableFuture 的 cancel 方法,添加调用 CancellableTask 自身的 cancel 方法来关闭套接字
        @Override
        public RunnableFuture<T> newTask() {
            return new FutureTask<T>(this) {
                @SuppressWarnings("finally")
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    try {
                        SocketUsingTask.this.cancel(); 
                    } finally {
                        return super.cancel(mayInterruptIfRunning);
                    }               
                }
            };
        }
    
    }

    SocketUsingTask 实现了 CancellableTask,并定义了 Future.cancel 来关闭套接字和调用super.cancel。如果 SocketUsingTask 通过自己的 Future 来取消,那么底层的套接字将被关闭,并且线程将被中断。因此它提高了任务对取消操作的响应性:不仅能够在调用可中断方法的同时确保响应取消操作,而且还能调用可阻塞的套接字 I/O 方法。

    扩展线程池

    扩展线程池,改写 newTaskFor 方法,当传入的 Callable 任务为 CancellableTask类型时,直接使用自身的newTaskFor 方法来创建 RunnableFuture。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.RunnableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import com.johnfnash.learn.annotation.ThreadSafe;
    
    @ThreadSafe
    public class CancellationExecutor extends ThreadPoolExecutor {
    
        public CancellationExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            if(callable instanceof CancellableTask) {
                return ((CancellableTask<T>)callable).newTask();
            } else {
                return super.newTaskFor(callable);
            }       
        }
    
    }

    四、线程优雅退出

    一般情况下,线程退出可以使用while循环判断共享变量条件的方式,当线程内有阻塞操作时,可能导致线程无法运行到条件判断的地方而导致一直阻塞下去,这个时候就需要中断来帮助线程脱离阻塞。因此比较优雅的退出线程方式是结合共享变量和中断。

    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            /*
             * 在这里为一个循环,条件是判断线程的中断标志位是否中断
             */
            while (flag&&(!Thread.currentThread().isInterrupted())) {
                try {
                    Log.i("tag","线程运行中"+Thread.currentThread().getId());
                    // 每执行一次暂停40毫秒
                    //当sleep方法抛出InterruptedException  中断状态也会被清掉
                    Thread.sleep(40);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    //如果抛出异常则再次设置中断请求
                    Thread.currentThread().interrupt();
                }
            }
        }
    });
    thread.start();

    参考

    1.详细分析Java中断机制

    2.Thread的中断机制(interrupt)

    3.Java多线程之阻塞I/O如何中断

    4.《Java 编程思想》

    5.Java 理论与实践 - 处理 InterruptedException

    展开全文
  • Spring ThreadPoolTaskExecutor 结合CountDownLatch 实现线程阻塞业务场景,大家可能都会遇到,在遍历一个list的时候,需要对list中的每个对象,做一些复杂又耗时的操作,比如取出对象的uid,远程调用一次...

    Spring ThreadPoolTaskExecutor 结合CountDownLatch 实现线程阻塞

    • 业务场景,大家可能都会遇到,在遍历一个list的时候,需要对list中的每个对象,做一些复杂又耗时的操作,比如取出对象的uid,远程调用一次userservice的getUserByUid方法,这属于IO操作了,可怕的是遍历到每个对象时,都得执行一次这种RPC的IO操作(甚至不止一次,因为可能还有别的接口需要去调)还有复杂的业务逻辑需要cpu去计算。

    • java的thread类有join发法可让主线程阻塞直到子线程执行完毕,那么如何ThreadPoolTaskExecutor是否有功能呢。

    • 例子地址 https://github.com/csy512889371/learndemo/tree/master/ctoedu-ThreadPool-TaskExecutor

    更多干货



    spring.xml

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <!-- 核心线程数 -->
            <property name="corePoolSize" value="5"/>
            <!-- 最大线程数 -->
            <property name="maxPoolSize" value="10"/>
            <!-- 队列最大长度 >=mainExecutor.maxSize -->
            <property name="queueCapacity" value="25"/>
            <!-- 线程池维护线程所允许的空闲时间 -->
            <property name="keepAliveSeconds" value="3000"/>
            <!-- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃.  -->
            <property name="rejectedExecutionHandler">
                <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
            </property>
        </bean>
    

    ThreadRunnable

    @Component
    public class ThreadRunnable {
    
        @Autowired
        private TaskExecutor taskExecutor;
    
        public void executeThread(String result, CountDownLatch latch) {
            this.taskExecutor.execute(new TaskThread(result, latch));
        }
    
        private class TaskThread implements Runnable {
            private CountDownLatch latch;
            java.text.SimpleDateFormat dateTimeFormat = new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            private String result;
    
            private TaskThread(String result, CountDownLatch latch) {
                super();
                this.result = result;
                this.latch = latch;
            }
    
            public void run() {
                try {
                    for (int i = 0; i < 10000; i++) {
                        // dateTimeFormat.format(new Date());
                    }
                    System.out.println("现在的时间为:" + dateTimeFormat.format(new Date()) + "    " + result);
    
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (this.latch != null) {
                        latch.countDown();
                    }
                }
            }
        }
    }

    ThreadRunnableTest

    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = "classpath*:spring.xml")
    public class ThreadRunnableTest extends AbstractJUnit4SpringContextTests {
    
        @Autowired
        ThreadRunnable threadRunnable;
    
        @Test
        public void test() throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(10);
            for (int i = 0; i < 11; i++) {
                threadRunnable.executeThread("架构师成长之路", latch);
            }
            latch.await();
            System.out.println("执行完毕了吗!");
        }
    }
    展开全文
  • JAVA线程池解析什么是线程池线程池带来的好处线程池可选择的阻塞队列基于数组的有界阻塞队列基于链表的有界/无界阻塞队列同步移交阻塞队列 什么是线程池 线程池顾名思义就是事先创建若干个可执行的线程放入一个池...

    什么是线程池

    线程池顾名思义就是事先创建若干个可执行的线程放入一个池(容器)中,需要的时候就从池中获取线程不用自行创建,使用完毕不需要销毁线程而是放回池中,从而减少创建和销毁线程对象的开销

    线程池带来的好处

    1. 降低资源消耗
    2. 提高相应速度
    3. 提高线程的可管理性

    线程池可选择的阻塞队列

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    

    基于数组的有界阻塞队列

     @Test
        public void arrayBlockingQueue() throws InterruptedException {
            /**
             * 基于数组的有界阻塞队列,队列容量为10
             */
            ArrayBlockingQueue queue =
                    new ArrayBlockingQueue<Integer>(10);
    
            // 循环向队列添加元素
            for (int i = 0; i < 20; i++) {
                queue.put(i);
                System.out.println("向队列中添加值:" + i);
            }
        }
    

    基于链表的有界/无界阻塞队列

        @Test
        public void linkedBlockingQueue() throws InterruptedException {
            /**
             * 基于链表的有界/无界阻塞队列,队列容量为10
             */
            LinkedBlockingQueue queue =
                    new LinkedBlockingQueue<Integer>();
    
            // 循环向队列添加元素
            for (int i = 0; i < 20; i++) {
                queue.put(i);
                System.out.println("向队列中添加值:" + i);
            }
        }
    

    同步移交阻塞队列

    (直接提交) 如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。

    工作队列的默认选项是 SynchronousQueue,此策略可以 避免在处理可能具有内部依赖性的请求集时出现锁。

    该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。

    @Test
        public void test() throws InterruptedException {
            /**
             * 同步移交阻塞队列
             */
            SynchronousQueue queue = new SynchronousQueue<Integer>();
    
            // 插入值
            new Thread(() -> {
                try {
                    queue.put(1);
                    System.out.println("插入成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            // 删除值
    
            new Thread(() -> {
                try {
                    queue.take();
                    System.out.println("删除成功");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
    
            Thread.sleep(1000L * 60);
        }
    

    线程池可选择的饱和策略

    • AbortPolicy终止策略(默认)
    • DiscardPolicy抛弃策略
    • DiscardOldestPolicy抛弃旧任务策略
    • CallerRunsPolicy调用者运行策略

    线程池的执行示意图

    在这里插入图片描述

    1. 主线程执行execute,将任务提交到核心线程池,核心线程池进行处理或新增核心线程
    2. 如果核心线程池满了,则会将任务提交到阻塞队列里面,核心线程会轮询消费阻塞队列
    3. 阻塞队列也满了则会将任务提交到最大线程池里面,让非核心线程处理
    4. 非核心线程也增加到了最大线程池,则会引起饱和策略

    在这里插入图片描述

    常用的线程池

    常用的线程池之newCachedThreadPool

    /**
         * 线程数量无限的线程池(核心线程为0,最大线程数是无限大的值,默认的阻塞队列是同步移交策略,意味着有一个任务就有有一个线程去消费,然后去接受另一个任务,这个线程池会创建无数个线程最终系统崩溃)
         *
         * @return
         */
         
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        }
    

    常用的线程池之newFixedThreadPool

        /**
         * 线程数量固定线程池(线程个数虽然固定了,但是无界的队列是没有限制的,任务队列也是会把内存挤爆的)
         * @param nThreads
         * @return
         */
         
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        }
    

    常用的线程池之newSingleThreadExecutor

        /**
         * 单一线程池
         */
        public static ExecutorService newSingleThreadExecutor() {
            return new Executors.FinalizableDelegatedExecutorService
                    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
        }
    

    向线程池提交任务

    import org.junit.Test;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class RunTest {
    
        @Test
        public void submitTest()
                throws ExecutionException, InterruptedException {
    
            // 创建线程池
            ExecutorService threadPool =
                    Executors.newCachedThreadPool();
    
            /**
             * 利用submit方法提交任务,接收任务的返回结果
             */
            Future<Integer> future = threadPool.submit(() -> {
                Thread.sleep(1000L * 10);
    
                return 2 * 5;
            });
    
            /**
             * 阻塞方法,直到任务有返回值后,才向下执行
             */
            Integer num = future.get();
    
            System.out.println("执行结果:" + num);
        }
    
        @Test
        public void executeTest() throws InterruptedException {
            // 创建线程池
            ExecutorService threadPool =
                    Executors.newCachedThreadPool();
    
            /**
             * 利用execute方法提交任务,没有返回结果
             */
            threadPool.execute(() -> {
                try {
                    Thread.sleep(1000L * 10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                Integer num = 2 * 5;
                System.out.println("执行结果:" + num);
            });
    
    
    
            Thread.sleep(1000L * 1000);
        }
    
    }
    
    

    线程池的状态

    在这里插入图片描述

    线程池饱的四种饱和策略使用与代码调试

    定义线程池

    /**
         * 线程池
         */
        private static ThreadPoolExecutor executor =
                new ThreadPoolExecutor(
                        // 核心线程数和最大线程数
                        2, 3,
    
                        // 线程空闲后的存活时间
                        60L, TimeUnit.SECONDS,
    
                        // 有界阻塞队列
                        new LinkedBlockingQueue<Runnable>(5)
               );
    

    重写线程执行

        /**
         * 任务
         */
        class Task implements Runnable {
            /**
             * 任务名称
             */
            private String taskName;
    
            public Task(String taskName) {
                this.taskName = taskName;
            }
    
            @Override
            public void run() {
                System.out.println("线程[ " + Thread.currentThread().getName()
                        + " ]正在执行[ " + this.taskName + " ]任务...");
    
                try {
                    Thread.sleep(1000L * 5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                System.out.println("线程[ " + Thread.currentThread().getName()
                        + " ]已执行完[ " + this.taskName + " ]任务!!!");
            }
        }
        
    

    终止策略

    demo

        /**
         * 终止策略
         * TODO 抛出异常,拒绝任务提交
         */
        @Test
        public void abortPolicyTest() {
            // 设置饱和策略为 终止策略
            executor.setRejectedExecutionHandler(
                    new ThreadPoolExecutor.AbortPolicy());
    
            for (int i = 1; i <= 10; i++) {
                try {
                    // 提交10个线程任务
                    executor.execute(new Task("线程任务" + i));
                } catch (Exception e) {
                    System.err.println(e);
                }
            }
    
            // 关闭线程池
            executor.shutdown();
        }
        
    

    执行结果

    在这里插入图片描述

    结果说明(线程池的执行过程)

    1. 2个任务进入核心线程
    2. 第3个到第7个任务,会暂存到任务队列中,因为有界队列定义为5
    3. 第8个任务,会启动最大线程,去执行
    4. 第9个第10任务,没有线程可以去执行,被终止抛出

    抛弃策略

    demo

       /**
        * 抛弃策略
        * TODO 直接丢弃掉新提交的任务
        */
       @Test
       public void discardPolicyTest() {
           // 设置饱和策略为 抛弃策略
           executor.setRejectedExecutionHandler(
                   new ThreadPoolExecutor.DiscardPolicy());
    
           for (int i = 1; i <= 10; i++) {
               try {
                   // 提交10个线程任务
                   executor.execute(new Task("线程任务" + i));
               } catch (Exception e) {
                   System.err.println(e);
               }
           }
    
           // 关闭线程池
           executor.shutdown();
       }
    

    执行结果

    在这里插入图片描述

    结果说明

    线程任务第9与第10因为已满则被直接抛弃

    抛弃旧任务策略

    demo

       /**
        * 抛弃旧任务策略
        * TODO 丢弃掉任务队列中的旧任务,暂存新提交的任务
        */
       @Test
       public void discardOldestPolicyTest() {
           // 设置饱和策略为 抛弃旧任务策略
           executor.setRejectedExecutionHandler(
                   new ThreadPoolExecutor.DiscardOldestPolicy());
    
           for (int i = 1; i <= 10; i++) {
               try {
                   // 提交10个线程任务
                   executor.execute(new Task("线程任务" + i));
               } catch (Exception e) {
                   System.err.println(e);
               }
           }
    
           // 关闭线程池
           executor.shutdown();
       }
    

    执行结果

    在这里插入图片描述

    结果说明

    线程3与线程4进入优先进入队列等待,也是最先被抛弃

    调用者运行策略

    demo

       /**
        * 调用者运行策略
        * TODO 借用主线程来执行多余任务
        */
       @Test
       public void callerRunsPolicyTest() {
           // 设置饱和策略为 调用者运行策略
           executor.setRejectedExecutionHandler(
                   new ThreadPoolExecutor.CallerRunsPolicy());
    
           for (int i = 1; i <= 10; i++) {
               try {
                   // 提交10个线程任务
                   executor.execute(new Task("线程任务" + i));
               } catch (Exception e) {
                   System.err.println(e);
               }
           }
    
           // 关闭线程池
           executor.shutdown();
       }
    

    执行结果

    在这里插入图片描述

    结果说明

    线程任务9与10因为满了,则让主程执行,这就是调用者运行策略

    展开全文
  • 线程阻塞和唤醒(转载)

    千次阅读 2012-12-08 11:11:19
    函数将解锁mutex参数指向的互斥锁,并使当前线程阻塞在cv参数指向的条件变量上。 被阻塞的线程可以被pthread_cond_signal函数,pthread_cond_broadcast函数唤醒,也可能在被信号中断后被唤醒。 pthread_...
  • 线程调度策略

    千次阅读 2016-08-02 11:18:21
    对于一个嵌入式多任务、多线程操作系统,所启动的应用进程至少拥有一个线程或多个线程线程在进程中执行代码。一个进程能够“同时”运行多个线程,“同时”加上引号,因为实际上,在单处理CPU平台上,任何时刻,...
  • 线程安全策略 不可变对象 不可变对象(Immutable Objects)是指对象一旦被创建它的状态(对象的数据,也即对象属性值)就不能改变,任何对它的改变都应该产生一个新的对象。 不可变对象需要满足的条件: 对象...
  • Java线程安全策略

    2016-04-23 16:51:32
    不可变 final 事实不可变 如String 无状态 没有实例域 如Servlet volatile 运算结果不依赖当前变量值 不参与不变性约束 如AQS的state 线程封闭 线程栈内私有,方法中局部变量...含有Blocking的阻塞类,如LinkedBlockin
  • -- 线程池对拒绝任务(无线程可用)的处理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,调用者的线程会执行该任务,如果执行器已关闭,则丢弃. --> <property name="rejectedExecutionHandler"> <bean ...
  • 线程下载策略分析

    千次阅读 2012-03-04 02:17:05
    线程下载控制策略: 一、 由一个控制线程负责创建下载子任务,每个子任务由一个线程来完成。即一个控制线程不断创建线程来完成下载任务。 优点:控制线程可以直接控制线程数量,控制线程超时情况。 缺点:线程...
  • ThReadPoolExecutor作为java.util.concurrent包对外提供基础实现, 以内部线程池的形式对外提供管理任务执行, 线程调度, 线程池管理等等服务. Executors方法提供的线程服务, 都是通过参数设置来实现不同的线程池机制....
  • 手动创建线程池以及线程拒绝策略

    万次阅读 2019-10-14 20:31:16
    由于之前说使用Exectors类创建线程池时候,会造成OOM,建议手动创建...corePoolSize:核心线程数,当线程数未达到核心线程时,新的任务会创建新的线程,即使有线程空闲也会创建新的线程。 maximumPoolSize:线...
  • 浅析Java的线程调度策略

    千次阅读 2019-03-13 23:52:34
    作者:杨兴强 原文来源:开点工作室(ID:kaidiancs) ...它按照什么样的策略来调度Java线程?本文将带着这样的问题,探讨Java线程的调度机制。 程序的问题还是先从代码说起吧,下面是一个广泛...
  • Java多线程的调度策略

    万次阅读 2014-11-14 20:22:51
    在Java多线程环境中,为保证所有线程的执行能按照一定的规则执行,JVM实现了一个线程调度器,它定义了线程调度的策略,对于CPU运算的分配都进行了规定,按照这些特定的机制为多个线程分配CPU的使用权。这小节关注...
  • 什么是阻塞策略呢?通过前面的学习,我们知道guava-retrying是可以设置2次重试的时间间隔的。比如第一次失败后,需要等待200ms再开始第二次尝试,也就是说线程需要等待200ms。实现200ms等待有多种方式,比如通过...
  • linux线程调度策略(转)

    千次阅读 2016-06-28 17:38:27
    对于一个嵌入式多任务、多线程操作系统,所启动的应用进程至少拥有一个线程或多个线程线程在进程中执行代码。一个进程能够“同时”运行多个线程,“同时”加上引号,因为实际上,在单处理CPU平台上,任何时刻,...
  • 线程调度器和调度策略

    千次阅读 2018-10-23 12:34:10
    线程调度器(Thread Scheduler):  操作系统的核心,它实际上就是一个常驻内存的程序,不断地对线程队列进行扫描, 利用特定算法(时间片轮转法、优先级调度法、多级反馈队列...线程调度策略(Thread scheduling p...
  • 线程的调度策略1

    千次阅读 2011-07-29 16:37:39
    函数pthread_attr_setschedpolicy和pthread_attr_getschedpolicy分别用来设置和得到线程的调度策略。 名称:: pthread_attr_getschedpolicy pthread_attr_se
  • Java多线程-任务拒绝策略

    千次阅读 2021-04-07 19:14:25
    本文通过解读 java 官方英文注解及源代码 以帮助读者了解 Java多线程开发中的 任务拒绝策略
  • 【JAVA多线程19】JAVA 阻塞队列原理

    千次阅读 2019-06-13 00:39:08
     使用非阻塞队列的时候有一个很大问题就是:它不会对当前线程产生阻塞,那么在面对类似消费者-生产者的模型时,就必须额外地实现同步策略以及线程间唤醒策略,这个实现起来就非常麻烦。但是有了阻塞队列就不一样了...
  • Windows线程优先级提升策略

    千次阅读 2010-07-11 20:30:00
    <br />Windows线程优先级提升策略   Windows实现了一个基于优先级的抢先式多处理及调度系统。通常线程可在任何可用处理机上运行,但可限制某线程只能在某处理机上运行。要了解线程优先级的变化,...
  • 使用线程池比手动创建线程好在哪里? 手工创建线程,每一个任务都创建线程问题: 第一点,反复创建线程系统开销比较大,每个线程创建和销毁都需要时间,如果任务比较简单,那么就有可能导致创建和销毁线程消耗的...
  • Java多线程-阻塞队列BlockingQueue

    千次阅读 2011-09-06 16:00:51
    前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了...
  • Doug Lea在设计AQS的线程阻塞策略使用了自旋等待和挂起两种方式,通过挂起线程前的低频自旋保证了AQS阻塞线程上下文切换开销及CUP时间片占用的最优化选择。保证在等待时间短通过自旋去占有锁而不需要挂起,而在等待...
  • 该队列每个插入操作必须等待另一个线程进行相应的操作,本身不存储数据,只有当前一个线程删除时,后一个线程才能被删除。 2 ArrayBlokingQueue 有界阻塞队列: 遵循FIFO,队满进行插入时被阻塞,队空取也会阻塞,...
  • 本篇将详细介绍BlockingQueue,以下是涉及的主要内容: ...阻塞队列的成员的概要介绍 详细介绍DelayQueue、ArrayBlockingQueue、LinkedBlockingQueue的原理 线程池与BlockingQueue 1、...
  • 线程——运行和阻塞状态详解

    千次阅读 2019-06-28 16:14:54
    所有现代的桌面和服务器操作系统都采用抢占式调度策略,但一些小型设备如手机等可能采用协作式调度策略,在这样的系统中,只有当一个线程调用了它的sleep()或yield()方法后才会放弃其所占用的资源——也就是必须有...
  • 工作中有个任务需要写多线程记录一下登陆日志,...我很常规的设置了这些参数,核心线程数:5 最大线程数:Integer.MaxValue 存活时间:60s 阻塞队列:synBlockinQueue() 饱和策略重写的 后来code review的时候发现阻塞...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 135,952
精华内容 54,380
关键字:

线程阻塞策略