精华内容
下载资源
问答
  • 文章目录现象结论相关代码查看堆栈:查看submitCallThreadPoolExecutor#execute最终调用了 RunnableFuture#run方法从代码层面判断 futureTask.get超时只影响了业务线程(调用futureTask.get的线程),不影响工作线程...

    1. 现象

    线上发短信、邮箱验证码 的时候超时

    在这里插入图片描述

    2. 结论

    1. SocketInputStream.socketRead0导致线程阻塞,阻塞后占用了线程池的线程。多次阻塞后最终占用了全部的core线程。新提交的任务只能入队,没有线程来处理。
      由于 socket.read占用了corePoolSize 个 线程池的工作线程worker.thread , 这里一共有10个,全都阻塞了。

      而execute提交一个runnable的时候, 在达到corePoolSize后, 会将其放入workQueue中。直到workQueue满。

      新的任务只能入队(enQueue),不能被消费。

      所以 futureTask.get 一直超时。

    2. futureTask.get(timeout,timeunit)不会导致线程池的工作线程异常。工作线程会继续执行。

    3. 相关代码

    sendVcWorkerThreadPool是ThreadPoolExecutor的子类WorkerThreadPool
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    WorkerThreadPool
    这里workQueue本身是一个优先队列,这里会无限扩容
    ps:由于无限扩容, 这里maxinumPoolSize是无效的

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    先查找相关调用链的底层日志,发现根本没有调用底层方法。

    而其他应用能调用到该底层,一直在输出日志,说明是execute本身有问题。

    4. 查看堆栈:

    搜索AsyncWorker(ps:自定义的线程池一定要重命名,找问题的时候方便),发现该线程池的10个core线程都处于runnable,且所有线程都在

    sendEmail

    在这里插入图片描述

    5. 查看submitCall

    虽然future.get有超时,但是这只能保证业务线程不阻塞。
    future.get并不能打断线程池的线程。
    在这里插入图片描述
    这里的sendVcWorkerThreadPool#submitCall与ThreadPoolExecutor
    ThreadPoolExecutor#submit类似
    在这里插入图片描述
    由于继承ThreadPoolExecutor,所以调用了ThreadPoolExecutor的execute
    在这里插入图片描述

    5.1 ThreadPoolExecutor#execute最终调用了 RunnableFuture#run方法

    • 调用链
      addWorker()->w.start()->treahd.run()->Worker.runWorker(Worker w)->task.run();

    • task即RunnableFuture ,newTaskFor创建了子类FutureTask

    因此 查看FutureTask的run方法

    • FutureTask是对Callable的一层封装。

    • 超时只影响了业务线程(调用futureTask.get的线程),不影响工作线程。

    5.2 从代码层面判断 futureTask.get超时只影响了业务线程(调用futureTask.get的线程),不影响工作线程。

    FutureTask.run
    在这里插入图片描述

    运行完毕,设置结果
    此时可以使用future.get出结果
    在这里插入图片描述

    这里让【因为future.get,调用park方法使得等待】的线程 恢复。
    在这里插入图片描述

    5.3 future.get

    在这里插入图片描述

    死循环检测是否完成, 超时后,直接return 当前state

    Unsafe.park()本地方法休眠当前线程, HotSpot在Linux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。
    在这里插入图片描述
    休眠
    在这里插入图片描述

    检测的时候,先将当前线程添加到waitNode

    在这里插入图片描述

    5.3.1 测试future.get并不能打断线程池的线程。

    在这里插入图片描述

    6. 查看工作线程为何阻塞

    虽然已经证明了futureTask.get超时后不会打断线程池的worker.thread,还是需要查看工作线程为何阻塞。
    再回顾一下堆栈

    在这里插入图片描述
    execute的调用链是addWorker()->w.start()->treahd.run()->Worker.runWorker(Worker w)->task.run();
    而FutureTask是对Callable的一层封装。

    本身是SendEmailCall本身是一个Callable

    我们只需要查看SendEmailCall的call方法为何一直在运行。

    在这里插入图片描述
    transport.connect
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    阻塞到readLine

    在这里插入图片描述
    到这里就很明显了, 是socket的inputStream调用read的时候阻塞。

    在这里插入图片描述

    socket是可以设置timeout的。

    查找timeout的设置
    在这里插入图片描述
    getSocket
    在这里插入图片描述

    to为read超时时间,
    cto为连接超时时间
    如果不设置则都为永久

    在这里插入图片描述

    而创建的时候,没有设置timeout
    到此就明白了, 设置timeout即可。
    在这里插入图片描述

    6.1 修复

    props.put("mail.smtp.connectiontimeout", "3000");
    props.put("mail.smtp.timeout", "3000");
    

    在这里插入图片描述
    在这里插入图片描述

    debug测试
    在这里插入图片描述

    修改后

    在这里插入图片描述

    修改为超短时间 会报错。
    在这里插入图片描述

    7. 相关资料

    线程池中的线程何时死亡?
    SocketInputStream.socketRead0引起线程池提交任务后,futureTask.get超时
    socket连接代理socketRead0(Native Method) 线程阻塞处理

    展开全文
  • FutureTask.get(timeOut)执行原理浅析

    千次阅读 2020-08-23 17:54:48
    使用java多线程解决问题的时候,为了提高效率,我们常常会异步处理一些计算任务并在最后异步的获取计算结果,这个过程的实现离不开Future接口及其实现类FutureTaskFutureTask类实现了Runnable, Future接口,接下来...

    使用java多线程解决问题的时候,为了提高效率,我们常常会异步处理一些计算任务并在最后异步的获取计算结果,这个过程的实现离不开Future接口及其实现类FutureTask。FutureTask类实现了Runnable, Future接口,接下来我会通过源码对该类的实现进行详解。

    我们先看下FutureTask中的主要方法如下,可以看出FutureTask实现了任务及异步结果的集合功能。看到这块的方法,大家肯定会有疑问,Runnable任务的run方法返回空,FutureTask如何依靠该方法获取线程异步执行结果

    //以下五个方法实现接口Future中方法
    public boolean isCancelled(); 
    public boolean isDone(); 
    public boolean cancel();
    public V get() throws InterruptedException, ExecutionException;
    public V get(long timeout, TimeUnit unit);
    //实现接口Runnable中方法
    public void run();

    我们在使用中会构造一个FutureTask对象,然后将FutureTask扔到另一个线程中执行,而主线程继续执行其他业务逻辑,一段时间后主线程调用FutureTask的get方法获取执行结果。

    
    public class FutureTaskTest {
    	private static ExecutorService executorService = Executors.newFixedThreadPool(1);
    	public static void main(String []args) {
    		Callable callable = new AccCallable(2, 3);
    		FutureTask futureTask = new FutureTask(callable);
    		executorService.execute(futureTask);
    		System.out.println("go to do other things in main thread");
    		try {
    			Thread.sleep(1000);
    		}
    		catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.println("go back in main thread");
    		try {
    			int result = (int) futureTask.get();
    			System.out.println("result is " + result);
    		}
    		catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		catch (ExecutionException e) {
    			e.printStackTrace();
    		}
    	}
    	static class AccCallable implements Callable<Integer> {
    		private int a;
    		private int b;
    		public AccCallable(int a, int b) {
    			this.a = a;
    			this.b = b;
    		}
    		@Override
    		public Integer call() throws Exception {
    			System.out.println("acc a and b in threadId = " + Thread.currentThread().getName());
    			return a + b;
    		}
    	}
    }

    在分析实现前,我们先想下如果让我们实现一个类似FutureTask的功能,我们会如何做?因为需要获取执行结果,需要一个Object对象来存执行结果。任务执行时间不可控性,我们需要一个变量表示执行状态。其他线程会调用get方法获取结果,在没达到超时的时候需要将线程阻塞或挂起。

    因此需要一个队列类似的结构存储等待该结果的线程信息,这样在任务执行线程完成后就可以唤醒这些阻塞或挂起的线程,得到结果。FutureTask的实际实现也是类似的逻辑,具体如下

    //futureTask执行状态
    private volatile int state;
    //具体的执行任务,会在run方法中抵用callable.call()
    private Callable<V> callable;
    //执行结果
    private Object outcome; 
    //获取结果的等待线程节点
    private volatile WaitNode waiters;
    /**
    * Possible state transitions:
    * NEW -> COMPLETING -> NORMAL
    * NEW -> COMPLETING -> EXCEPTIONAL
    * NEW -> CANCELLED
    * NEW -> INTERRUPTING -> INTERRUPTED
    */
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;
    public FutureTask(Callable<V> callable) {
    	if (callable == null)
    	throw new NullPointerException();
    	this.callable = callable;
    	this.state = NEW;
    	// ensure visibility of callable
    }
    public FutureTask(Runnable runnable, V result) {
    	//构造函数传入runnable对象时调用静态工具类Executors的方法转换为一个callable对象
    	this.callable = Executors.callable(runnable, result);
    	this.state = NEW;
    	// ensure visibility of callable
    }

    如前所述,FutureTask的执行线程中会调用其run()方法执行任务,我们看下这块逻辑

    public void run() {
    	//1.如果执行状态不是NEW或者有其他线程执行该任务,直接返回
    	if (state != NEW ||
    	!UNSAFE.compareAndSwapObject(this, runnerOffset,
    	null, Thread.currentThread()))
    	return;
    	try {
    		Callable<V> c = callable;
    		//2.如果执行状态是NEW,即任务还没执行,直接调用callable.call()方法获取执行结果
    		if (c != null && state == NEW) {
    			V result;
    			Boolean ran;
    			try {
    				result = c.call();
    				ran = true;
    			}
    			catch (Throwable ex) {
    				result = null;
    				ran = false;
    				//3.发生异常,更新status为EXCEPTIONAL,唤醒挂起线程
    				setException(ex);
    			}
    			//4.如果结果成功返回,调用set方法将设置outcome,更改status执行状态,唤醒挂起线程
    			if (ran)
    			set(result);
    		}
    	}
    	finally {
    		// runner must be non-null until state is settled to
    		// prevent concurrent calls to run()
    		runner = null;
    		// state must be re-read after nulling runner to prevent
    		// leaked interrupts
    		int s = state;
    		if (s >= INTERRUPTING)
    		handlePossibleCancellationInterrupt(s);
    	}
    }
    protected void set(V v) {
    	//将执行状态变更为COMPLETING
    	if (UNSAFE.compareAndSwapint(this, stateOffset, NEW, COMPLETING)) {
    		//设置执行结果
    		outcome = v;
    		//设置执行状态为NORMAL
    		UNSAFE.putOrderedint(this, stateOffset, NORMAL);
    		// final state
    		//执行完成后处理操作,具体就是遍历阻塞链表,删除链表节点,并唤醒每个节点关联的线程
    		finishCompletion();
    	}
    }

    以上就是任务执行线程做的逻辑,以上逻辑也回答了FutureTask如何得到执行结果的疑问。下面我们看下用户调用get方法获取执行结果时的实现逻辑,这个时候FutureTask可能处理各种状态,即可能没有执行,执行中,已完成,发生异常等

    public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    	if (unit == null)
    	throw new NullPointerException();
    	int s = state;
    	//执行状态是NEW或者COMPLETING时执行awaitDone将线程加入等待队列中并挂起线程
    	if (s <= COMPLETING &&
    	(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    	throw new TimeoutException();
    	//根据执行状态status进行结果封装
    	return report(s);
    }
    //我理解这块是get的核心逻辑
    private int awaitDone(Boolean timed, long nanos)
    throws InterruptedException {
    	//如果设置了超时时间,计算还有多长时间超时
    	final long deadline = timed ? System.nanoTime() + nanos : 0L;
    	WaitNode q = null;
    	Boolean queued = false;
    	for (;;) {
    		//如果当前线程被中断,删除等待队列中的节点,并抛出异常
    		if (Thread.interrupted()) {
    			removeWaiter(q);
    			throw new InterruptedException();
    		}
    		int s = state;
    		//如果执行状态已经完成或者发生异常,直接跳出自旋返回
    		if (s > COMPLETING) {
    			if (q != null)
    			q.thread = null;
    			return s;
    		}
    		//如果执行状态是正在执行,说明线程已经被加入到等待队列中,放弃cpu进入下次循环(真正的自旋) else if (s == COMPLETING) // cannot time out yet
    		Thread.yield();
    		//第一次进入循环,创建节点 else if (q == null)
    		q = new WaitNode();
    		//将节点加入到等待队列中,waiters相当于头阶段,不断将头结点更新为新节点 else if (!queued)
    		queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
    		q.next = waiters, q); else if (timed) {
    			//如果设置了超时时间,在进行下次循环前查看是否已经超时,如果超时删除该节点进行返回
    			nanos = deadline - System.nanoTime();
    			if (nanos <= 0L) {
    				removeWaiter(q);
    				return state;
    			}
    			//挂起当前节点
    			LockSupport.parkNanos(this, nanos);
    		} else
    		LockSupport.park(this);
    	}
    }

    这里需要说明一点,FutureTask中的阻塞队列新加入的节点都在头结点并且next指向之前的头结点,waitars指针总是指向新加入节点,通过waitars可以遍历整个等待队列,具体截图如下。此外等待队列节点结构很简单成员变量只有线程引用和next指针,这里再列出器接口

     

    展开全文
  • Callable、FutureTask中阻塞超时返回的坑点

     

    本文转载自:http://www.cnblogs.com/starcrm/p/5010863.html

    案例1:

    package com.net.thread.future;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
     
    /**
     * @author 
     * @Time:2017年8月18日 上午10:49:07
     * @version 1.0
     * @description
     */ 
      import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ExecutorService;
     import java.util.concurrent.Executors;
     import java.util.concurrent.FutureTask;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.TimeoutException;
      
     public class FutureTaskExample {
      
         public static void main(String[] args) {
             MyCallable callable1 = new MyCallable(1000);
             MyCallable callable2 = new MyCallable(5000);
      
             FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
             FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
      
             ExecutorService executor = Executors.newFixedThreadPool(2);
             executor.execute(futureTask1);
             executor.execute(futureTask2);
              
             while (true) 
             {
                 try {
                     if(futureTask1.isDone() && futureTask2.isDone()){
                         System.out.println("Done");
                         //shut down executor service
                         executor.shutdown();
                         return;
                     }
                      
                     if(!futureTask1.isDone()){
                     //阻塞futureTask1
                     System.out.println("FutureTask1 output="+futureTask1.get());
                     }
                     
                     if(!futureTask2.isDone()){
                     //阻塞futureTask2
                     System.out.println("FutureTask2 output="+futureTask2.get(1000,TimeUnit.MILLISECONDS));
                     }
    
                 } catch (InterruptedException | ExecutionException e) {
                     e.printStackTrace();
                 }catch(Exception e){
                     //do nothing
                 }
             }
              
         }
         
         static class MyCallable implements Callable<String> {
          
             private long waitTime;
              
             public MyCallable(int timeInMillis){
                 this.waitTime=timeInMillis;
             }
             @Override
             public String call() throws Exception {
                 Thread.sleep(waitTime);
                 return Thread.currentThread().getName();
             }
          
         }
     }

    运行结果很简单,必须是:

    FutureTask1 output=pool-1-thread-1
    FutureTask2 output=pool-1-thread-2
    Done

     

    案例2:

    package com.net.thread.future;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
     
    /**
     * @author 
     * @Time:2017年8月18日 上午10:49:07
     * @version 1.0
     * @description
     */
    public class FutureTaskExample2 {
     
        public static void main(String[] args) {
            MyCallable callable1 = new MyCallable(1000);
            MyCallable callable2 = new MyCallable(5000);
     
            FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
            FutureTask<String> futureTask2 = new FutureTask<String>(callable2);
     
            ExecutorService executor = Executors.newFixedThreadPool(2);
            executor.execute(futureTask1);
            executor.execute(futureTask2);
             
            while (true) 
            {
                 try {
                     if(futureTask1.isDone() && futureTask2.isDone()){
                         System.out.println("Done");
                         //shut down executor service
                         executor.shutdown();
                         return;
                     }
                      
                     if(!futureTask1.isDone()){
                     //阻塞futureTask1
                     System.out.println("FutureTask1 output="+futureTask1.get());
                     }
                      
                     System.out.println("Waiting for FutureTask2 to complete");
                     String s = futureTask2.get(1000, TimeUnit.MILLISECONDS); //阻塞500毫秒
                     if(s !=null){
                         System.out.println("FutureTask2 output="+s);
                     }
                     else{
                         System.out.println("FutureTask2 output is null");
                     }
                 } catch (InterruptedException | ExecutionException e) {
                     e.printStackTrace();
                 }catch(Exception e){
                     //do nothing
                 }  
            }
        }
        
        
        static class MyCallable implements Callable<String> {
         
            private long waitTime;
             
            public MyCallable(int timeInMillis){
                this.waitTime=timeInMillis;
            }
            @Override
            public String call() throws Exception {
                Thread.sleep(waitTime);
                return Thread.currentThread().getName();
            }
         
        }
     
    }

    运行结果:

    FutureTask1 output=pool-1-thread-1
    Waiting for FutureTask2 to complete
    Waiting for FutureTask2 to complete
    Waiting for FutureTask2 to complete
    Waiting for FutureTask2 to complete
    FutureTask2 output=pool-1-thread-2
    Done

     

     

    说明:

    1、get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

    2、get(long timeout, TimeUnit unit)用来获取执行结果,如果超过指定时间,直接结束执行下面的代码;如果是在循环中,则跳出本次循环进行下一次轮训(continue功能类似)。

     

    展开全文
  • FutureTaskget(timeout) 的超时是怎么玩的? 昨天晚上在一个交流群里一位群友提出了一个问题,他想实现一种客户端功能,可以让客户端调用其他接口的时候,如果超时,就返回 null。这个问题好处理,直接使用 ...

    FutureTask 中 get(timeout) 的超时是怎么玩的?

    昨天晚上在一个交流群里一位群友提出了一个问题,他想实现一种客户端功能,可以让客户端调用其他接口的时候,如果超时,就返回 null。这个问题好处理,直接使用 Future 即可,即这个方法:

    public interface Future<V> {
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    但是这位群友又提出了一个有意思的问题:如果使用线程池的话,达到最大核心线程数了,需要进入队列等待,超时时间的起始时间应该是在执行的时候算的吧?
    首先关于线程池的执行流程,这个没毛病,问题的关键是 Future#get 是从啥时候开始算超时的呢,比如现在这个任务 01:00:00 被丢到了线程池中执行,但是 01:00:05 才开始执行这个任务,那算获取任务执行结果是否超时的时候总得有个开始时间吧,那么开始时间是从 01:00:00(即丢到线程池的时候)开始算还是从 01:00:05(任务具体开始执行的时候)开始算呢?这个问题其实不难想,但是还是有点偏,突然被问到的时候还是有点愣的感觉。
    接下来先看一个例子:

    package dongguabai.demo.juc.future.demo;
    
    import java.util.Date;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author Dongguabai
     * @Description
     * @Date 创建于 2021-01-07 00:35
     */
    public class FutureTaskDemo {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService service = Executors.newFixedThreadPool(1);
            final Future<Object> submit = service.submit(new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    System.out.println(new Date().toLocaleString() + "--开始执行");
                    Thread.sleep(5000);
                    System.out.println(new Date().toLocaleString() + "--执行完成");
                    return "OK";
                }
            });
            Thread.sleep(4000);
            try {
                final Object o = submit.get(2, TimeUnit.SECONDS);
                System.out.println();
            } catch (TimeoutException e) {
                System.out.println(new Date().toLocaleString() + "--超时了");
            }
            service.shutdown();
        }
    }
    

    运行结果:

    2021-1-7 0:39:21--开始执行
    2021-1-7 0:39:26--执行完成
    

    可以发现超时时间既不是从任务被丢到线程池的时候开始算也不是从任务具体开始执行的时候开始算,而是啥时候 get 就从啥时候开始算,换句话说,这里的超时是对调用线程来说的,与执行任务的线程没啥关系(这其实是一个很容易被忽视的一点,就是调用方觉得失败了,但是这个任务是有可能执行成功的)。因为如果从任务丢到线程池的时候开始算,中途 main 线程已经休眠了 4s 了,早该超时了,如果是从任务具体开始执行的时候开始算,这个任务要执行至少 5s,如果要超时,也早就超时了。
    接下来就简单分析一下 java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) 这个方法:

        /**
         * @throws CancellationException {@inheritDoc}
         */
        public V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (unit == null)
                throw new NullPointerException();
            int s = state;
            if (s <= COMPLETING &&
                (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
                throw new TimeoutException();
            return report(s);
        }
    

    一开始会判断 state 状态是否小于等于 COMPLETING。回过头看一下 java.util.concurrent.AbstractExecutorService#submit(java.util.concurrent.Callable<T>) 方法:

        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    

    可以发现传入的 Callable 被封装成了 FutureTask

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;       // ensure visibility of callable
        }
    

    此时 FutureTaskstateNEWFutureTask 有这么几种状态:

        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    

    这里看一下 NEWCOMPLETING,从名字都可以看出 NEW 是一个初始状态,此时任务还没有开始执行。接着看下 java.util.concurrent.FutureTask#run 方法,这个也是具体执行任务的方法:

        public void run() {
            if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                             null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    本质上执行的是 FutureTaskCallablecall 方法,获得执行结果,执行过程中会可能发生异常也可能顺利执行,执行成功则会执行 set(result) 方法保存结果,出现异常会执行 setException(ex) 保存异常:

        protected void set(V v) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    
        protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    

    可以发现 COMPLETING 状态就是一个中间状态,处于任务执行完成或出现异常,但是还未将结果保存起来的过程中,正常执行完成且结果保存后状态为 NORMAL ,执行失败异常保存后状态为 EXCEPTIONAL
    再回到 java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit) 方法,判断 s 是否小于等于 COMPLETING 其实就是在判断任务是否执行完成或者出现异常了。
    再看 java.util.concurrent.FutureTask#awaitDone 方法:

        private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            WaitNode q = null;
            boolean queued = false;
            for (;;) {
       			//线程是否被中断
                if (Thread.interrupted()) {
                    //第一次 q == null
                    removeWaiter(q);
                    throw new InterruptedException();
                }
    
                int s = state;
                //如果任务已经完成
                if (s > COMPLETING) {
                    if (q != null)
                        //去除引用
                        q.thread = null;
                    return s;
                }
                //如果处于中间态,稍等一下
                else if (s == COMPLETING) // cannot time out yet
                    Thread.yield();
                else if (q == null)
                    //第一次循环 q == null,new 一个 WaitNode,创建一个头节点
                    q = new WaitNode();
                else if (!queued)
                    //第二次就是把 q 设置为 waiters,即 waiters = q = new WaitNode()
                    queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                         q.next = waiters, q);
                else if (timed) {
                    
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        //如果时间到了,直接返回 state,还没有执行完成或者抛出异常 state 则仍然为 COMPLETING
                        removeWaiter(q);
                        return state;
                    }
                    //时间还没到,就再阻塞一会
                    LockSupport.parkNanos(this, nanos);
                }
                else
                    LockSupport.park(this);
            }
        }
    

    其实到这里群友提的问题就已经有答案了,超时时间是从 get 的时候开始算的,啥时候 get 啥时候开始算。
    最后看一下 WaitNode,主要是用来存储因为 get 被挂起的线程:

        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }
    

    setsetException 方法可以看出最终都会调用 finishCompletion 方法:

        private void finishCompletion() {
            // assert state > COMPLETING;
            for (WaitNode q; (q = waiters) != null;) {
                if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                    for (;;) {
                        Thread t = q.thread;
                        if (t != null) {
                            q.thread = null;
                            LockSupport.unpark(t);
                        }
                        WaitNode next = q.next;
                        if (next == null)
                            break;
                        q.next = null; // unlink to help gc
                        q = next;
                    }
                    break;
                }
            }
    
            done();
    
            callable = null;        // to reduce footprint
        }
    

    主要就是用来唤醒等待执行结果的线程。

    欢迎关注公众号
    ​​​
    在这里插入图片描述

    展开全文
  • 同样可以用future.get(),不设置执行超时时间取得结果 future.cancel(true); } catch (InterruptedException e) { future.cancel(true); throw e; } catch (ExecutionException e) { future....
  • 2 future1.get发生了超时,此时至少已经等待了4秒了。但是future2.get是可以正常返回的,说明超时时间是call方法中执行的时间。   另外的小发现,try 块中如果第一句发生了异常,那么try 块中剩余语句均不执行...
  • try { Integer result = future.get(5, TimeUnit.MILLISECONDS); System.out.println("任务正常," + result); } catch (InterruptedException | ExecutionException e) { System.out.println("aaaa"); }catch ...
  • java超时关闭线程,用future.get() 实现

    万次阅读 2019-04-18 19:20:46
    这个我研究了两天,好多都是用的future.get(超时时间,单位)来实现,只是不同方式而已。下边来看下: 第一种: //先新建一个 ExecutorService ExecutorService exec = Executors.newSingleThreadExecutor(); //...
  • System.out.println("FutureTask2 output="+futureTask2.get(1000 ,TimeUnit.MILLISECONDS)); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } catch (Exception e){ ...
  • 说明了一件事,即在超时期限内,如果未能获取线程返回值,futureTask2.get(500L, TimeUnit.MILLISECONDS) 将不对继续执行后面的代码,而是进行下一次的while操作了(并不是返回null),while的下一次循环,直到获取...
  • 场景描述:一个程序如果超过5秒还未执行完成,希望调用别的程序实现该功能 解决方法:用户ExecutorService+FutureTask实现超过一定时间后,自动执行下面程序优点:对程序的执行时间进行控制、...FutureTask.get(int,S

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 8,564
精华内容 3,425
关键字:

futuretask.get超时