-
2022-01-02 11:08:56
引言
我们都知道Runable接口,它非常简单。但是它有一个问题——无法获取执行结果,以及一旦可以获取执行结果,什么时候可以获取执行结果?
public interface Runnable { public abstract void run(); }
FutureTask就是为了解决这个问题的,将Runnable包装为FutureTask之后,就可以get获取任务执行结果,如果任务没有执行完,那么当前线程就会阻塞。
FutureTask确实非常好用,但我一直以来都比较好奇,将Runnable包装为FutureTask,为何就能实现执行结果的自动获取?或者换句话说,FutureTask的原理究竟是什么?概述
我们先来概述一下整体的结构以及整体的设计。
FutureTask实现了RunnaleFuture接口,而后者则是继承自两个接口Future和Runnable,相当于两个接口的混合(接口支持多继承)。
事实上,Future接口也就定义了FutureTask类特性的交互协议。同样,它也非常简单。那么如何实现阻塞等待执行结果呢?public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
考虑到多线程的环境——可能会有多个线程同时获取结果,一起阻塞,所以很自然地,我们需要将这些线程阻塞等待在一个列表之上,当任务执行完成之后,唤醒这些线程获取结果。
这也是FutureTask整体的设计思想,接下来,我们通过源码解析,探究它如何通过CAS的方式实现任务阻塞等待,以及维护等待队列。源码解析
回调
Runnable是不返回结果的,所以首先,他会将Runnalbe执行和它关联的执行结果Result包装为带返回结果的Callable。这些都非常简单。
public interface Callable<V> { V call() throws Exception; } static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } // FutureTask构造函数,将二者包装为callable public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
状态
FutureTask中设置了多种状态变量,用于标志任务执行的状态。理解源代码,首先需要理解这些状态的变化协议。
state有四种可能的状态转换:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
其中
- NEW为初始状态,任务执行过程也为该状态。
- COMPLETING 为中间状态,表示任务已经执行完,有线程正在调用set设置结果
- INTERRUPTING 为中间状态,表示正在中断中
- NORMAL,EXCEPTIONAL,CANCELLED,INTERRUPTED则为终结状态,分别表示正常结束,异常结束,任务取消,被中断
源码注释
有了上述知识,FutureTask的源码应该算比较简单,这里提供了核心代码的注释,从以下两个方面切入即可:
- run,任务执行,执行完调用set,set会调用finishCompletion唤醒所有等待线程
- get,获取结果,如果没有完成则阻塞当前线程插入队列
其次,FutureTask广泛使用了CAS,例如:
UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
compareAndSwapObject(对象,字段偏移量,期待的值,新的值),它会判断对象中的字段是否为期待值,如果是,则设置为新的值,并且返回true,否则返回false。注意CAS底层为native实现的原子方法。
public class FutureTask<V> implements RunnableFuture<V> { /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; /** * 初始化回调函数和状态 */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * 执行任务,CAS修改runner持有执行线程,保证只有一个线程能够执行任务 */ public void run() { // 状态不为NEW或者当前runner持有线程,则直接返回。只有一个线程能执行run方法 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行方法 result = c.call(); ran = true; } catch (Throwable ex) { // 执行失败,设置异常 result = null; ran = false; setException(ex); } // 设置执行结果 if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * 获取结果,如果仍未执行完成,则阻塞 */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 阻塞当前线程 s = awaitDone(false, 0L); return report(s); } /** * 等待任务执行完成,阻塞等待线程 */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果已经标志中断信号,则移除等待列表,抛出InterruptedException if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果大于COMPLETING,表明task执行完成,直接返回结果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果等于COMPLETING,说明已经有线程正在执行set方法,让出执行权限 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果状态为NEW(小于COMPLETING),则通过两次循环入队等待 else if (q == null) // 封装当前线程为等待节点 q = new WaitNode(); else if (!queued) // q.next = waiters,CAS修改waiters,之后queued=true,当前线程循环不会再进入 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果设置了等待时间,则判断超过等待时间则返回,否则阻塞当前线程(设置阻塞时间) else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 如果没有设置等待时间,则直接阻塞 else LockSupport.park(this); } } /** * 该方法为任务执行结束的回调,可以覆盖,CompletionService中的嵌套类就覆盖了它,执行完插入阻塞队列 */ protected void done() { } /** * 执行完设置结果,通过CAS的方式设置,只有状态为NEW才允许设置,只有一个能执行成功,保证执行一次 */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; // CAS设置状态 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 执行收尾方法 finishCompletion(); } } /** * 同上,设置异常结果 */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } /** * 收尾方法,移除并唤醒所有等待节点,调用done回调函数 */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { // 进入if之后CAS将等待列表头部节点置为null,保证只唤醒一次 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; // 唤醒当前节点上的线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } // next WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 执行回调 done(); callable = null; // to reduce footprint } /** * 如果状态为NEW,尝试取消 * 1. 不为NEW表示已经处于完成、中断、取消状态,这些状态不能被取消,直接返回false * 2. 参数mayInterruptIfRunning表示是否取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。 */ public boolean cancel(boolean mayInterruptIfRunning) { // 如果当前状态为NEW则CAS修改为中断中或取消 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 状态不为NEW或CAS失败,不为NEW表示已经处于完成、中断、取消状态,这些状态不能被取消,直接返回false return false; try { // in case call to interrupt throws exception // 如果允许运行中取消 if (mayInterruptIfRunning) { try { // 中断当前线程的阻塞 Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 修改最终状态 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 调用收尾函数 finishCompletion(); } return true; } /** * 等待节点链表,阻塞等待的线程会被封装为一个等待节点放入队列 */ static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } // 设置内存偏移量 private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } } }
参考
更多相关内容 -
futuretask用法及使用场景介绍
2020-08-29 04:28:39主要介绍了futuretask用法及使用场景介绍,小编觉得挺不错的,这里分享给大家,供大家参考。 -
Java线程池FutureTask实现原理详解
2020-08-28 01:57:17主要介绍了Java线程池FutureTask实现原理详解,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下 -
Java FutureTask类使用案例解析
2020-08-19 07:03:21主要介绍了Java FutureTask类使用案例解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值析,需要的朋友可以参考下 -
比较java中Future与FutureTask之间的关系
2020-08-26 22:21:42在本篇文章里我们给大家分享了java中Future与FutureTask之间的关系的内容,有需要的朋友们可以跟着学习下。 -
FutureTask:FutureTask原始解析与重组-源码解析
2021-03-24 16:56:10FutureTask原始码解析 一,FutureTask是什么? FutureTask是可取消的异步的计算任务,它可以通过线程池和线程对象执行,一般来说是FutureTask用于耗时的计算。 二,FutureTask继承图 三,未来任务源码 FutureTask的... -
java多线程编程同步器Future和FutureTask解析及代码示例
2020-08-28 19:23:21主要介绍了java多线程编程同步器Future和FutureTask解析及代码示例,对二者进行了详细介绍,分析了future的源码,最后展示了相关实例代码,具有一定参考价值 ,需要的朋友可以了解下。 -
【并发编程】 — Runnable、Callable、Future和FutureTask之间的关系
2021-01-21 16:52:422 如何使用FutureTask 、Future、Callable、线程池实现线程2.1 FutureTask + Callable实现多线程2.2 线程池+Future+Callable 实现多线程3 Runnable、Callable、Future和FutureTask之间的关系3.1 整体关系介绍3.2 ... -
Java中Future、FutureTask原理以及与线程池的搭配使用
2020-08-25 14:41:01主要为大家详细介绍了Java中Future、FutureTask原理以及与线程池的搭配使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下 -
Java中的Runnable,Callable,Future,FutureTask的比较
2020-08-31 09:00:46主要介绍了Java中的Runnable,Callable,Future,FutureTask的比较的相关资料,需要的朋友可以参考下 -
FutureTask
2021-01-04 16:35:35FutureTask实现了Future接口,FutureTask提供了启动和取消异步任务,查询异步任务是否计算结束以及获取最终的异步任务的结果的一些常用方法。通过get()方法来获取异步任务的结果,但是会阻塞当前线程直至异步任务...简介
在Executors框架体系中,FutureTask用来表示表示可获取结果的异步任务。FutureTask实现了Future接口,FutureTask提供了启动和取消异步任务,查询异步任务是否计算结束以及获取最终的异步任务的结果的一些常用方法。通过get()方法来获取异步任务的结果,但是会阻塞当前线程直至异步任务执行结束。一旦任务执行结束,任务不能重启或取消,除非调用runAndReset()方法。在FutureTask的源码中为其定义了这些状态;
FutureTask.run()方法的执行的时机,FutureTask分为了3种状态:- 未启动:FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,还没有执行FutureTask.run()方法之前,FutureTask处于未启动状态。
- 已启动:FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
- 已完成:FutureTask.run()方法执行结束,或调用FutureTask.cancel(…)方法取消任务,或者在执行任务期间抛出异常,这些情况都称之为FutureTask的已完成状态。
由于FutureTask具有这三种状态,因此执行FutureTask的get方法和cancel方法,当前处于不同的状态对应的结果也是大不相同。这里对get方法和cancel方法做个总结: - get方法
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()将导致调用线程阻塞。如果FutureTask处于已完成状态,调用FutureTsk.get()方法将导致调用线程立即返回结果或者抛出异常。 - cannel方法
当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将此任务永远不会执行;
当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断线程的方式来阻止任务继续进行,如果执行FutureTask.cancel(false)将不会对正在执行任务的线程有任何影响;
当FutureTask处于已完成状态时,执行FutureTask.cancel(…)方法将返回false。
对Future的get()方法和cannel()方法用下图进行总结
FutureTask的基本使用
FutureTask除了实现Future接口外,还实现了Runnable接口。因此FutureTask可以交给Executor执行,也可以由调用的线程直接执行(FutureTask.run())。另外,FutureTask的获取也可以通过ExecutorService.submit()方法返回一个FutureTask对象,然后在通过FutureTask.get()或者FutureTask.cancel()方法。
应用场景:
当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask。假设有多个线程执行若干任务,每个任务最多只能被执行一次,当多个线程试图执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完后才能继续执行。 -
futuretask源码分析(推荐)
2020-08-29 04:26:43主要介绍了futuretask源码分析(推荐),小编觉得还是挺不错的,这里给大家分享下,供各位参考。 -
java多线程返回值使用示例(callable与futuretask)
2020-09-04 12:37:36主要介绍了多线程返回值使用示例(callable与futuretask),需要的朋友可以参考下 -
FutureTask源码解析
2022-01-19 19:36:41FutureTask继承体系3. 源码分析3.1 成员变量3.2 构造方法3.3. 成员方法3.3.1 run()方法及与其相关的方法3.3.2 get()方法及与其相关的方法3.3.3 cancel(boolean mayInterruptIfRunning)方法4. 总结 1. 前言 在创建...目录
1. 前言
在创建线程的方式中,我们可以直接继承Thread和实现Callable接口来创建线程,但是这两种创建线程的方式不能返回执行的结果。于是从JDK1.5开始提供了Callable接口和Future接口,这两种创建线程的方式可以在执行完任务之后返回执行结果。
Future模式可以让线程异步获得执行的结果,而不用等到线程等到run()方法里面执行完再执行之后的逻辑。
这里解释一下什么是异步:
A口为主线程,B口为辅助线程,当B口阻塞时并不影响A口通水。
我们写个小demo证明一下Future执行线程是异步的。
// Future是一个接口,FutureTask是Future的实现类 package FutureTask; import java.util.concurrent.*; public class Demo02 { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { long start = System.currentTimeMillis(); FutureTask<Integer> futureTask1 = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { // 睡了2000毫秒 Thread.sleep(2000); return 1; } }); FutureTask<Integer> futureTask2 = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { // 睡了1000毫秒 Thread.sleep(1000); return 2; } }); // 这个线程要睡2000毫秒 new Thread(futureTask1).start(); // 这个线程要睡1000毫秒 new Thread(futureTask2).start(); Integer s1 = futureTask1.get(); Integer s2 = futureTask2.get(); Integer s3 = s1 + s2; long end = System.currentTimeMillis(); System.out.println("结果:" + s3); // 输出总共执行的时间 System.out.println("花费时间:" + (end - start) + "毫秒"); } } /* 如果Future不是异步的,那最后输出的时间一定大于3000毫秒,那我们接下来来看一下执行的结果。 */
结果如下:
我们可以看到,花费的时间小于3000毫秒,这也说明利用Future创建出的辅助线程与主线程是异步执行的。Future是一个接口,如果想要使用Future的功能,必须实现接口。我们接下来要介绍的FutureTask就是Future的实现类
2. FutureTask继承体系
我们看到FutureTask实现了RunnableFuture接口,其中RunnableFuture接口继承了Runnable和Future接口,因此我们可以说FutureTask实现了Future接口和Runnable接口,但是我们需要注意的是,FutureTask也实现了Callable接口,虽然这并没有在继承体系中看出,但是可以从FutureTask的源码中看出。接下来我们来看看FutureTask、RunnableFuture的继承体系部分的源码。
- FutureTask继承体系
// <V>指的是泛型,里面传入要返回的结果的类型 public class FutureTask<V> implements RunnableFuture<V>
- RunnableFuture继承体系
// <V>指的是泛型,里面传入的是要返回的结果的类型 public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
# FutureTask使用场景 FutureTask用于异步执行或取消任务的场景。通过传入Runnable的实现类或Callable的实现类给FutureTask对象, 之后将FutureTask对象传给Thread对象或线程池。可以通过FutureTask对象.get()方法可以异步返回执行结果, 如果任务还没有执行完,则调用get()方法的线程会陷入阻塞,直到任务执行完。无论有多少个线程调用get()方法, FutureTask中的run()逻辑只会执行一次。我们还可以调用FutureTask对象的cancel()方法取消掉当前任务。
3. 源码分析
3.1 成员变量
state表示当前任务的状态:
- NEW:表示新创建状态,任务尚未执行。
- COMPLETING:表示当前任务即将结束,但是还未完全结束,返回值还未写入,处于一种临界状态。
- NORMAL:表示当前任务处于正常结束状态(没有发生异常,中断,取消)。
- EXCEPTIONAL:表示当前任务因为出现异常而中断,处于非正常结束状态。
- CANCELLED:表示当前任务因调用cancel而处于被取消状态。
- INTERRUPTING:表示当前任务处于中断中,但是还没有完全中断的阶段。
- INTERRUPTED:表示当前任务处于已完全中断的阶段。
// 表示当前任务的状态 private volatile int state; // 表示当前任务的状态是新创建的,尚未执行 private static final int NEW = 0; // 表示当前任务即将结束,还未完全结束,值还未写,一种临界状态 private static final int COMPLETING = 1; // 表示当前任务正常结束 private static final int NORMAL = 2; // 表示当前任务执行过程中出现了异常,内部封装的callable.call()向上抛出异常了 private static final int EXCEPTIONAL = 3; // 表示当前任务被取消 private static final int CANCELLED = 4; // 表示当前任务中断中 private static final int INTERRUPTING = 5; // 表示当前任务已中断 private static final int INTERRUPTED = 6; // 我们在使用FutureTask对象的时候,会传入一个Callable实现类或Runnable实现类,这个callable存储的就是 // 传入的Callable实现类或Runnable实现类(Runnable会被使用修饰者设计模式伪装为)callable //submit(callable/runnable):其中runnable使用了装饰者设计模式伪装成了callable private Callable<V> callable; // 正常情况下,outcome保存的是任务的返回结果 // 不正常情况下,outcome保存的是任务抛出的异常 private Object outcome; // non-volatile, protected by state reads/writes // 保存的是当前任务执行期间,执行任务的线程的引用 private volatile Thread runner; // 因为会有很多线程去get结果,这里把线程封装成WaitNode,一种数据结构:栈,头插头取 private volatile WaitNode waiters; static final class WaitNode { // 线程对象 volatile Thread thread; // 下一个WaitNode结点 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
3.2 构造方法
构造方法有两种,一种是只传入一个Callable对象,Callable对象返回的结果就是FutureTask对象返回的结果;另一种是传入一个Runnable对象和一个泛型变量,其中Runnable对象会被伪装成Runnable对象、传入的泛型变量就是FutureTask执行任务后返回的结果。
// 这个构造方法传入一个callable,调用get返回的结果就是callable返回的结果 public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // 设置状态为新创建 this.state = NEW; // ensure visibility of callable } // 这个构造方法传入一个runnable,一个result变量,调用get方法返回的结果就是传入的result变量 public FutureTask(Runnable runnable, V result) { // 装饰者模式,将runnable转化为callable接口 this.callable = Executors.callable(runnable, result); // 设置状态为新创建 this.state = NEW; // ensure visibility of callable }
3.3. 成员方法
3.3.1 run()方法及与其相关的方法
run()里面执行的是当前任务的具体逻辑。里面涉及了setException方法、set方法、handlePossibleCancellationInterrupt方法、finishCompletion方法,这些方法我们都会讲到。
run()方法
里面会调用Callable对象的run()方法任务的具体逻辑和一些关于任务状态、返回结果的逻辑。
public void run() { // 当前任务状态不为new或者runner旧值不为null,说明已经启动过了,直接返回,这里也说明了run()里面的具体逻辑只会 // 被执行一次。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 只有当任务状态为new并且runner旧值为null才会执行到这里 try { // 传入的callable任务 Callable<V> c = callable; // 当任务不为null并且当前任务状态为新建时才会往下执行 // 条件1:防止空指针异常 // 条件2:防止外部线程cacle掉当前任务 if (c != null && state == NEW) { // 储存任务的返回结果 V result; // 储存执行是否成功 boolean ran; try { // 调用callable.run()并返回结果 result = c.call(); // 正常执行设置ran为true ran = true; } catch (Throwable ex) { // callable的run()方法抛出了异常 // 设置结果为null result = null; // 执行失败设置ran为false ran = false; // 内部设置outcome为抛出的异常, //并且更新任务状态为EXCEPTIONAL(执行过程中出现了异常)并且唤醒阻塞的线程 setException(ex); } // 如果执行成功(正常执行) if (ran) // 内部设置outcome为callable执行的结果,并且更新任务的状态为NORMAL(任务正常执行)并且唤醒阻塞的线程 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() // 将当前任务的线程设置为null runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts // 当前任务的状态 int s = state; // 如果state>=INTERRUPTING,说明当前任务处于中断中或已中断状态 if (s >= INTERRUPTING) // 如果当前任务处于中,则执行这个方法线程会不断让出cpu直到任务处于已中断状态 handlePossibleCancellationInterrupt(s); } }
setException(Throwable t)方法
如果在执行run()方法的过程中出现了异常会执行这个方法,里面具体的逻辑是:
- 设置任务的状态为EXCEPTIONAL(表示因为出现异常而非正常完成)
- 设置outcome(返回结果)为Callable对象的run()方法抛出的异常
- 执行finishCompletion()方法唤醒因为调用get()方法而陷入阻塞的线程。
protected void setException(Throwable t) { // 如果当前任务的状态是新建状态,则设置任务状态为临界状态(即将完成) if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 设置outcome(结果)为callable.run()抛出的异常 outcome = t; // 设置当前任务的状态为出现中断异常 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state // 唤醒调用get()的所有等待的线程并清空栈 finishCompletion(); } }
set(V v)方法
如果执行run()方法正常结束(没有出现异常)会执行这个方法,里面的具体逻辑是:
- 设置任务的状态为NORMAL(表示正常结束)
- 设置outcome(返回结果)为Callable对象调用run()方法返回的结果
- 唤醒因为调用get()方法而陷入阻塞的线程。
protected void set(V v) { // 如果当前任务的状态为新建状态,则设置当前任务的状态为临界状态(即将完成) if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 设置outcome(结果)为callable.run()返回的结果 outcome = v; // 设置当前任务的状态为正常结束 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 唤醒调用get()的所有等待的线程并清空栈 finishCompletion(); } }
handlePossibleCancellationInterrupt(int s)方法
这个方法在run()方法里可能会执行,当任务的状态为中断中时,抢到cpu的线程会释放cpu资源,直到任务状态更新为已中断状态。
private void handlePossibleCancellationInterrupt(int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
finishCompletion()方法
任务执行完成(正常结束和非正常结束都代表任务执行完成)会调用这个方法来唤醒所有因调用get()方法而陷入阻塞的线程。
private void finishCompletion() { // assert state > COMPLETING; // 如果条件成立,说明当前有陷入阻塞的线程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { // 获取当前节点封装的thread Thread t = q.thread; // 防止空指针异常 if (t != null) { // 设置q.thread为null,方便GC q.thread = null; // 唤醒当前节点封装的线程 LockSupport.unpark(t); } // 获取下一个WaitNode节点 WaitNode next = q.next; // 如果next为空,说明栈现在已经为空,调用get()陷入阻塞的线程已经全部唤醒,直接break if (next == null) break; // 执行到这里说明还有因调用get()而陷入阻塞的线程,自旋接着唤醒 // 这里q.next设置为null帮助GC(垃圾回收) q.next = null; // unlink to help gc // 下一个WaitNode节点 q = next; } // 中断 break; } } // 空方法,子类可以重写 done(); // 将callable设置为null,方便GC callable = null; // to reduce footprint }
3.3.2 get()方法及与其相关的方法
get()方法
get()方法获取的是任务执行完后返回的结果。对于空参的get()方法来说,如果任务还没有执行完就有线程调用get()方法获取结果,则该线程会陷入阻塞,阻塞的具体方法是awaitDone方法,我们下面会学习。
public V get() throws InterruptedException, ExecutionException { int s = state; // 判断当前任务的状态是否小于COMPLETING,如果成立说明当前任务的状态要么为新建状态要么为临界状态 if (s <= COMPLETING) // 条件成立会调用awaitDone方法自旋等待直到任务完成 s = awaitDone(false, 0L); return report(s); }
对于指定时间含参的get方法来说,如果在指定时间内没有返回结果,则会抛出时间超时异常(TimeoutException)
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
awaitDone(boolean timed, long nanos)方法
这个方法是用来阻塞所有因调用get()方法获取结果但是FutureTask任务还没有执行完的线程。awaitDone方法在**get()**方法里面会被调用。
上面的话有点绕,我们再理一下,awaitDone用来阻塞线程时需要满足的条件:
- 任务还没有执行完
- 线程调用了**get()**方法
// 这个方法的作用是等待任务被完成(正常完成或出现异常完成都算完成),被中断,或是被超时 private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 这个WaitNode其实存储的是当前线程 WaitNode q = null; // 表示当前线程代表的WaitNode对象是否入栈成功 boolean queued = false; for (;;) { // 如果当前线程出现中断异常,则将该线程代表的WaitNode结点移出栈并抛出中断异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 获取当前任务的状态 int s = state; // 如果当前任务状态大于COMPLETING,说明当前任务已经有结果了(任务完成、中断、取消),直接返回任务状态 if (s > COMPLETING) { if (q != null) // 设置q.thread为null,方便GC q.thread = null; return s; } // 当前任务处于临界状态,即将完成,则当前线程释放cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次自旋,如果当前WitNode为null,new一个WaitNode结点 else if (q == null) q = new WaitNode(); // 第二次自旋,如果当前WaitNode节点没有入队,则尝试入队 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 第三次自旋,到这里表示是否定义了超时时间 else if (timed) { nanos = deadline - System.nanoTime(); // 超出了指定时间,就移除当前节点并返回任务状态 if (nanos <= 0L) { removeWaiter(q); return state; } // 未超出时间,挂起当前线程一定时间 LockSupport.parkNanos(this, nanos); } else // 挂起当前线程,该线程会休眠(什么时候该线程会继续执行呢?除非有其他线程调用unpark()或者中断该线程) LockSupport.park(this); } }
removeWaiter方法
出现中断时,清空栈中的结点。
private void removeWaiter(WaitNode node) { if (node != null) { // 帮助GC node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; // 后继节点不为空 if (q.thread != null) pred = q; // 前驱结点不为空 else if (pred != null) { // 前驱结点的后继结点指向当前结点的后继结点,就相当于将当前节点删去 pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
report(int s)方法
这个方法是真正用来获取任务的返回结果的,这个方法在**get()**方法里面会被调用,如果该方法被调用,说明任务已经执行完了。
private V report(int s) throws ExecutionException { // 获取outcome的值 Object x = outcome; // 如果当前任务的状态为正常结束,则返回outcome的值 if (s == NORMAL) return (V)x; // 如果当前任务的状态 >= CANCELLED 说明当前任务状态为被取消、或是在中断中、或是已经中断完成 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
3.3.3 cancel(boolean mayInterruptIfRunning)方法
这个方法可以取消当前任务。
public boolean cancel(boolean mayInterruptIfRunning) { // 条件1成立说明当前任务正在运行中或者处于线程池队列中 // 条件2成立说明CAS成功可以执行下面的逻辑了,否则返回false,代表cancel失败 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { // 获取当前正在执行的线程,也有可能是null,是null的情况代表当前任务正在队列中,线程还没有获取到它呢 Thread t = runner; // 给Thread一个中断信号,如果你的程序是响应中断的,则走中断逻辑;如果你的程序不是响应中断的,则什么也不做 if (t != null) t.interrupt(); } finally { // final state // 设置任务状态为已中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 唤醒所有因调用get()而陷入阻塞的线程 finishCompletion(); } return true; }
4. 总结
- FutureTask采用的是异步的执行方式。
- FutureTask对象可以使用get()方法返回执行的结果,如果任务还没有执行完,则调用get()的线程会陷入阻塞,直到任务执行完。
- FutureTask对象可以调用cancel方法取消任务的执行。
-
Java Review - 线程池使用FutureTask的小坑
2021-11-21 00:08:53线程池使用FutureTask时如果把拒绝策略设置为 DiscardPolicy和 DiscardOldestPolicy,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞。 问题复现 import java.util.concurrent.*...
概述
先说结论
线程池使用
FutureTask
时如果把拒绝策略设置为DiscardPolicy
和DiscardOldestPolicy
,并且在被拒绝的任务的Future对象上调用了无参get方法,那么调用线程会一直被阻塞。
问题复现
import java.util.concurrent.*; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/11/21 0:11 * @mark: show me the code , change the world */ public class FutureTest { // 1 线程池单个线程,队列大小为1 - 初始化线程池 private final static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), new ThreadPoolExecutor.DiscardPolicy()); public static void main(String[] args) throws ExecutionException, InterruptedException { // 2 添加你任务1 Future futureOne = tpe.submit(() -> { System.out.println("开始处理业务1"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务1执行结束"); return "Result1"; }); // 3 添加你任务2 Future futureTwo = tpe.submit(() -> { System.out.println("开始处理业务2"); System.out.println("业务2执行结束"); return "Result2"; }); // 4 添加任务3 Future futureThree = null; try { futureThree = tpe.submit(() -> System.out.println("开始处理业务3")); } catch (Exception e) { System.out.print(e.getLocalizedMessage()); } // 5 等待任务1执行完毕 System.out.println("任务1返回结果: " + futureOne.get()); // 6 等待任务2执行完毕 System.out.println("任务2返回结果: " + futureTwo.get()); // 7 等待任务3执行完毕 System.out.println("任务3返回结果: " + futureThree==null?null:futureThree.get()); //关闭线程池,阻塞知道所有任务执行完毕 tpe.shutdown(); } }
输出
让我们来分析下
-
代码(1)创建了一个单线程和一个队列元素个数为1的线程池,并且把拒绝策略设置为 DiscardPolicy。
-
代码(2)向线程池提交了一个异步任务one,并且这个任务会由唯一的线程来执行,任务在打印【开始处理业务1】 后会阻塞该线程2s。
-
代码(3)向线程池提交了一个异步任务two,这时候会把任务two放入阻塞队列。
-
代码(4)向线程池提交任务three,由于队列已满所以触发拒绝策略丢弃任务three。
-
从执行结果看,在任务one阻塞的5s内,主线程执行到了代码(5)并等待任务one执行完毕,当任务one执行完毕后代码(5)返回,主线程打印出【任务1 null】。任务one执行完成后线程池的唯一线程会去队列里面取出任务two并执行,所以输出【开始处理业务2】,然后代码(6)返回,这时候主线程输出【任务2 null】。然后执行代码(7)等待任务three执行完毕。
-
从执行结果看,代码(7)会一直阻塞而不会返回,至此问题产生。如果把拒绝策略修改为DiscardOldestPolicy,也会存在有一个任务的get方法一直阻塞,只是现在是任务two被阻塞。
-
但是如果把拒绝策略设置为默认的AbortPolicy则会正常返回,并且会输出如下结果
开始处理业务1 Task java.util.concurrent.FutureTask@27bc2616 rejected from java.util.concurrent.ThreadPoolExecutor@3941a79c[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]业务1执行结束 任务1返回结果: Result1 开始处理业务2 业务2执行结束 任务2返回结果: Result2 Exception in thread "main" java.lang.NullPointerException at com.artisan.bfzm.chapter11.FutureTest.main(FutureTest.java:58)
源码分析
要分析这个问题,需要看线程池的submit方法都做了什么,submit方法的代码如下
/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 1 装饰Runnable对象为Future对象 RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); // 6 返回Future对象 return ftask; }
看下 execute方法
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 2 如果线程个数小于核心线程数量,则新增线程处理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 3. 如果线程都在工作且当前线程个数已经达到核心线程数,就把任务放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 4 尝试新增处理线程 else if (!addWorker(command, false)) // 5 新增失败则触发拒绝策略 reject(command); }
-
代码(1)装饰Runnable为FutureTask对象,然后调用线程池的execute方法。
-
代码(2)判断如果线程个数小于核心线程数则新增处理线程。
-
代码(3)判断如果当前线程个数已经达到核心线程数则将任务放入队列 。
-
代码(4)尝试新增处理线程。失败则执行代码(5),否则直接使用新线程处理。
-
代码(5)执行具体拒绝策略,从这里也可以看出,使用业务线程执行拒绝策略。
所以要找到上面例子中问题所在,只需要看代码(5)对被拒绝任务的影响,这里先看下拒绝策略DiscardPolicy的代码。
/** * A handler for rejected tasks that silently discards the * rejected task. */ public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
拒绝策略的rejectedExecution方法什么都没做,代码(4)调用submit后会返回一个Future对象。
Future是有状态的,Future的状态枚举值如下
在代码(1)中使用newTaskFor方法将Runnable任务转换为FutureTask,protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
继续
/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
而在FutureTask的构造函数里面设置的状态就是NEW。
所以使用DiscardPolicy策略提交后返回了一个状态为NEW的Future对象。
那么我们下面就需要看下当调用Future的无参get方法时Future变为什么状态才会返回,那就要看下FutureTask的get()方法代码。
public V get() throws InterruptedException, ExecutionException { int s = state; //当前状态值 <= COMPLETING需要等待,否则调用report返回 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
/** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; // 状态值为NORMAL的时候正常返回 if (s == NORMAL) return (V)x; // 状态值大于等于CANCELLED的时候抛出异常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
-
也就是说,当Future的状态>COMPLETING时调用get方法才会返回,而明显DiscardPolicy策略在拒绝元素时并没有设置该Future的状态,后面也没有其他机会可以设置该Future的状态,所以Future的状态一直是NEW,所以一直不会返回。
-
同理,DiscardOldestPolicy策略也存在这样的问题,最老的任务被淘汰时没有设置被淘汰任务对应Future的状态。
-
那么默认的AbortPolicy策略为啥没问题呢?其实在执行AbortPolicy策略时,代码(5)会直接抛出RejectedExecutionException异常,也就是submit方法并没有返回Future对象,这时候futureThree是null。
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { handler.rejectedExecution(command, this); }
解决办法
-
所以当使用Future时,尽量使用带超时时间的get方法,这样即使使用了DiscardPolicy拒绝策略也不至于一直等待,超时时间到了就会自动返回。
-
如果非要使用不带参数的get方法则可以重写DiscardPolicy的拒绝策略,在执行策略时设置该Future的状态大于COMPLETING即可。但是我们查看FutureTask提供的方法,会发现只有cancel方法是public的,并且可以设置FutureTask的状态大于COMPLETING,则重写拒绝策略的具体代码如下。
import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; /** * @author 小工匠 * @version 1.0 * @description: TODO * @date 2021/11/21 1:40 * @mark: show me the code , change the world */ public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()){ if (null != r && r instanceof FutureTask) { ((FutureTask) r).cancel(true); } } } }
使用这个策略时,由于在cancel的任务上调用get()方法会抛出异常,所以代码(7)需要使用try-catch块捕获异常,因此将代码(7)修改为如下所示。
执行结果为当然这相比正常情况多了一个异常捕获操作。最好的情况是,重写拒绝策略时设置FutureTask的状态为NORMAL,但是这需要重写FutureTask方法,因为FutureTask并没有提供接口让我们设置。
小结
通过案例介绍了在线程池中使用FutureTask时,当拒绝策略为DiscardPolicy和DiscardOldestPolicy时,在被拒绝的任务的FutureTask对象上调用get()方法会导致调用线程一直阻塞,所以在日常开发中尽量使用带超时参数的get方法以避免线程一直阻塞。
-
-
FutureTask简单用法,为何单个任务仅执行一次?
2022-01-15 00:23:06且一个FutureTask仅执行一次,不会出现重复的查询 经过权衡,我们选择了后者 一、FutureTask用法 解决方案要用到线程池搭配FutureTask,这里我们就不用了,简化点 public class Test { //计算结果 int count=0;... -
什么是 Callable? 什么又是 FutureTask
2022-02-05 18:28:38目录 前言 Callable FutureTask 如何创建线程? FutureTask.run() FutureTask.get() 等待线程队列 WaitNode waiters 模拟几个执行流程 怎么唤醒被阻塞的线程呢? 总结 首先梳理一下程序执行的流程 思考 为什么使用 ... -
spring线程池ThreadPoolExecutor配置以及FutureTask的使用
2014-12-14 16:26:14最代码,http://www.zuidaima.com/share/1724478138158080.htm 的代码及例子 -
Java多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask详解
2020-08-18 17:10:22主要介绍了Java多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 -
线程处理(FutureTask+Excutors)包
2019-02-18 11:30:06这是一个集齐了runnable与callnable的线程处理包,自动集齐全部功能,只需引用即可 -
FutureTask 原理剖析
2020-10-28 21:40:00戳蓝字「TopCoder」关注我们哦!编者注:FutureTask用于在异步操作场景中,FutureTask作为生产者(执行FutureTask的线程)和消费者(获取FutureTask... -
多线程 | FutureTask 执行流程
2022-04-27 16:36:29本文章来介绍使用 FutureTask 创建线程,以及其流程。 Thread 和 Runnable的问题 众所周知,使用 Thread、Runnable 创建线程是非常方便的,只要实现 线程的 run 方法即可。但是通过 Thread、Runnable 实现 run ... -
FutureTask详解
2019-08-20 22:25:35Future接口和实现Future接口的FutureTask类,代表异步计算的结果。 FutureTask简介 FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行... -
浅谈Java多线程之FutureTask
2021-11-08 21:51:42Runnable和Callable是多线程中的两个任务接口,实现接口的类将拥有多线程的功能,FutureTask类与这两个类是息息相关! FutureTask继承体系 看下这张图,原来FutureTask类实现了Runnable和Future,既然是... -
FutureTask使用
2021-02-25 19:09:51FutureTask futureTask = new FutureTask(new Callable() { @Override public String call() throws Exception { System.out.println(Thread.currentThread().getName() + ":" + "开始烧开水..."); // 模拟烧开水... -
Future、FutureTask、CompletableFuture简介
2022-03-06 16:06:40Future、FutureTask、CompletableFuture简介