精华内容
下载资源
问答
  • JAVA手写FutureTask,带你了解FutrueTask的基本原理。

    JAVA手写FutureTask,带你了解FutrueTask的基本原理

    1、写一个callable的接口,里面放入具体的业务代码
    代码

    package springboot.myrunable;
    
    /**
     * @author liuhongya328
     *
     */
    public interface MyCallable<V> {
    
    	V call() throws Exception;
    
    }
    
    

    2、手写自己的FutureTask,代码很少,注释很详细
    代码

    package springboot.myrunable;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.locks.LockSupport;
    
    /**
     * 手写自己的FutureTask,只能实现其核心原理,真实jdk中有很多异常及特殊状态的判断。
     * 
     */
    public class MyFutureTask<T> implements Runnable {
    
    	// 包装了业务代码逻辑,让线程去执行
    	MyCallable<T> myCallable;
    
    	T result;
    
    	// 任务执行状态,用来判断run方法的结果已经执行完毕
    	volatile String state = "NEW";
    
    	// 当前正在等待任务执行完毕的线程队列,实际上这个队列里面只会有一个thread
    	LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    
    	public MyFutureTask(MyCallable<T> myCallable) {
    		this.myCallable = myCallable;
    	}
    
    	// futuretask的核心就是run方法执行callable中的call方法
    	@Override
    	public void run() {
    		try {
    			result = myCallable.call();
    		} catch (Exception e) {
    			// state = "EXCEPTION"; //异常状态,因状态太多就不一一例举
    			e.printStackTrace();
    		} finally {
    			state = "END";
    		}
    
    		// 任务执行完毕,则通知停车场可以放行。告诉外部线程可以获取返回值。
    		Thread waiter = waiters.poll();
    		while (waiter != null) {
    			LockSupport.unpark(waiter);
    			waiter = waiters.poll();
    		}
    	}
    
    	// 获取结果
    	// 线程是异步执行的,外部线程调用get的时候,上面的run方法还没结束,则等待结果
    	public T get() {
    		if ("END".equals(state)) {
    			return result;
    		} // else()....
    
    		// 没有执行完毕,则等待, park/unpark ,控制线程的停和走。
    		// 将外部线程加入到等待线程队列
    		waiters.add(Thread.currentThread());
    		while (!"END".equals(state)) {
    			// 进入停车场
    			// park 方法还可以在其他任何时间"毫无理由"地返回,因此通常必须在重新检查返回条件的循环里调用此方法。从这个意义上说,park
    			// 是“忙碌等待”的一种优化,它不会浪费这么多的时间进行自旋,但是必须将它与 unpark 配对使用才更高效
    			LockSupport.park();
    		}
    
    		return result;
    	}
    
    }
    
    

    总结
    自己写的futuretask只是实现核心原理,帮助你理解futuretask,写demo的时候可以替换jdk的futuretask,看看效果。

    展开全文
  • FutrueTask原理分析

    2020-11-25 09:44:18
    通常一个请求分为请求-处理-返回,如果通过异步线程去完成一个任务,我们通常会选择FutureTask +Submit+ Callable()来实现获取返回值。
     
    
        通常一个请求分为请求-处理-返回,如果通过异步线程去完成一个任务,我们通常会选择FutureTask +Submit+ Callable()来实现获取线程的返回值。 FutureTask继承了Future和Runnable,Future代表了线程的生命周期的状态机,而Runnable则通过这个状态去获取处理的结果。

    1、应用实例

    问题思考: 如何让一个线程有返回值?
    应用代码示例:通过FutureTask + Callable来实现线程的返回值;
    public class FutureTaskTest implements Callable<String> {
    
        @Override
        public String call() throws Exception {
            return "hello callable";
        }
    
    
        public static void main(String[] args) throws Exception {
    
            FutureTaskTest futureTaskTest = new FutureTaskTest();
            FutureTask<String> futureTask = new FutureTask<>(futureTaskTest);
            new Thread(futureTask).start();       //线程一:线程池或者创建线程;
            System.out.println(futureTask.get()); //线程二:阻塞获取结果
            
            //线程池的支持
            ExecutorService executors = new ThreadPoolExecutor(5, 10, 20, TimeUnit.MINUTES,
                new LinkedBlockingQueue<Runnable>(1000));
            //submit不会抛异常,除非调用future.get(); execute()会抛出
            Future<String> future = executors.submit(futureTaskTest);
            System.out.println(future.get());
        }
    }

    2、Future

    Future表示一个任务线程的生命周期,通过线程状态机(完成-异常-正在执行等)来控制线程,提供相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等,方法比较简单。
    具体线程功能控制函数如下:
    public interface Future<V> {
            // 发送取消命令给线程并返回是否发送成功;
        boolean cancel(boolean mayInterruptIfRunning);
    
        // 当前的Future是否被取消,返回true表示已取消
        boolean isCancelled();
    
        // 当前Future是否已结束。包括运行完成、抛出异常以及取消,都表示当前Future已结束;
        boolean isDone();
    
        // 获取Future的结果值。如果当前Future还没有结束,那么当前线程就等待,直到Future运行结束,那么会唤醒等待结果值的线程的。
        V get() throws InterruptedException, ExecutionException;
    
        // 获取Future的结果值。与get()相比较多了允许设置超时时间
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }

    3、FutureTask-实现原理

    FutureTask的类图:

    3.1、生产消费者模型

     从class类图可以看出,FutureTask是Runnable和Future的结合。我们可以把Runnable比作是生产者,Future比作是消费者,生产者运行任务run()方法计算结果,消费者通过get()方法获取结果。 作为生产者消费者模式,有一个很重要的机制,就是如果生产者数据还没准备的时候,消费者会被阻塞;当生产者数据准备好了以后会唤醒消费者继续执行。 
    • 生产者:Runnable 生产计算结果;
    • 消费者:future- 线程的生命周期,通过 线程的状态机来判断线程是否执行结束;从而异步获取结果;
    • futureTask:将Future和Runnable两者结合起来
    这里结果的获取get()涉及到2个线程的协作:
    • 第一个线程:执行Callable()的任务:执行callable业务方法call方法,返回结果值result;
    • 第二个线程:通过 回调函数get()方法来获取 线程的执行状态state来异步获取结果;
      • 如果没有执行完毕:则被- awaitDone() - park()阻塞加入到阻塞链表中等待线程执行;
      • 如果执行完毕: finishCompletion(),唤醒阻塞的线程获取结果;
    • 单链表实现阻塞线程的存储;
    • 生产消费模式通过Supportlock实现线程的阻塞park()和唤醒unpark();

    3.2、state的含义

    表示FutureTask当前的状态,分为七种状态:
    private volatile int state;
    // NEW 新建状态,表示这个FutureTask还没有开始运行
    private static final int NEW          = 0;
    // COMPLETING 完成状态, 表示FutureTask任务已经计算完毕了但是还有一些后续操作,例如唤醒等待线程操作,还没有完成;
    private static final int COMPLETING   = 1;
    // FutureTask任务完结,正常完成,没有发生异常;
    private static final int NORMAL       = 2;
    // FutureTask任务完结,因为发生异常;
    private static final int EXCEPTIONAL  = 3;
    // FutureTask任务完结,因为取消任务;
    private static final int CANCELLED    = 4;
    // FutureTask任务完结,也是取消任务,不过发起了中断运行任务线程的中断请求;
    private static final int INTERRUPTING = 5;
    // FutureTask任务完结,也是取消任务,已经完成了中断运行任务线程的中断请求;
    private static final int INTERRUPTED  = 6;

    4、源码分析

    线程一:run()方法调用call方法执行任务task;

    4.1、run()方法

        从类图发现FutureTask实现了Runnable(),所以一定有一个run()方法,看看run方法里做了什么?
        其实run()方法作用非常简单,就是调用callable的call方法返回结果值result,根据是否发生异常,调用 set(result)或 setException(ex)方法表示 FutureTask 任务完结。不过因为FutureTask任务都是在多线程环境中使用,所以要注意并发冲突问题。
        注意在run()方法中,我们没有使用Synchronized代码块或者Lock来解决并发问题,而是使用了CAS这个乐观锁来实现并发安全,保证只有一个线程能运行FutureTask任务.
    public class FutureTask<V> implements RunnableFuture<V> {
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    
    public void run() {
        if (state != NEW ||            //线程安全保证:使用cas来保证只有一个线程能运行task任务,而不是使用synchronized或者lock
            !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;  //传进来的任务实例对象 c不为null且状态为新建的状态;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); //调用任务实例的call方法,并不是创建一个新的线程;
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); //设置异常结果
                }
                if (ran)
                    set(result);      //设置程序的运行结果
            }
        } finally {
            runner = null;
    
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    当线程执行结束以后,唤醒等待结果的Runnable线程,通过set设置返回结果:

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();                 // 执行完成,通知获取结果的线程获取结果
        }
    }
    当执行任务的线程执行结束,然后唤醒等待结果的get线程获取结果:
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t); //唤醒获取结果的阻塞的线程
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
    
        done();
    
        callable = null;        // to reduce footprint
    }

    4.2、get()方法

    阻塞获取结果的get函数,如果发现执行任务的线程没有结束,则直接进入阻塞等待状态。get()的主要作用:
    • 线程未执行结束的时候,状态小于complete,调用awaitDone方法,其实也是调用 park() 阻塞获取结果的线程,将该线程插入到 单链表等待队列;
    •  run()方法执行完set值的时候通过调用unpark()唤醒获取结果的线程,获取结果或者抛出异常;
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L); //如果线程还没有执行完,就会被阻塞;
        return report(s);
    }
    如果当前的结果还没有被执行完,把当前线程线程和插入到等待队列,阻塞取结果线程,直至任务线程执行结束时被唤醒unpark():
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;           //wait节点,构建一条单链表记录等待的线程
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }// 当状态大于COMPLETING时,表示FutureTask任务已结束。
            else if (s == COMPLETING) // cannot time out yet 
                Thread.yield();
            //  将当前阻塞线程插入到等待线程链表中;
            else if (q == null)
                q = new WaitNode();
            else if (!queued)//使用CAS函数将新节点添加到链表中;
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q); //加入链表
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos); //超时阻塞
            }
            else
                LockSupport.park(this);             //没有设置超时时间的阻塞;
        }
    }
    //等待节点结构
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next; //单向链表
        WaitNode() { thread = Thread.currentThread(); } //保存当前的阻塞的线程
    }
    

    4.3、report()

    report方法就是根据传入的状态值 s,来决定是抛出异常,还是返回结果值,这个两种情况都 表示 FutureTask 完结了。
    private V report(int s) throws ExecutionException {
        Object x = outcome; //表示call的返回值;
        if (s == NORMAL)    //表示正常完结状态;
            return (V)x;
        if (s >= CANCELLED)// 大于或等于CANCELLED,都表示手动取消FutureTask任务,所以抛出CancellationException异常;
            throw new CancellationException();
        // 否则就是运行过程中,发生了异常,这里就抛出异常
        throw new ExecutionException((Throwable)x);
    }

    到这里,通过futureTask来获取线程的返回值的流程就分析完了,因为线程是异步执行,要想获取它的结果,必须的有另外一个线程来监视它的执行过程,然后才能获取执行结果。

    5、submit和execute的区别

    回头再来看看线程池执行任务的两种方法:submit 和 execute,两者有什么区别呢?

    5.1、execute

    • 1、execute 只能接受一个Runnable任务;
    • 2、execute如果 出现异常会直接抛出
    • 3、execute 没有返回值;
    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    5.2、submit

    • 1、submit可以接受Runnable 和Callable这两种类型的参数;
    • 2、对于submit方法,如果传入一个Callable,可以得到一个Future的返回值; Callable + Future
    • 3、submit方法调用 不会抛异常,除非调用 Future.get ();
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        // 将task封装了一个RunnableFuture
        RunnableFuture<T> ftask = newTaskFor(task); 
        //交由线程池区处理ftask任务;
        execute(ftask);
        return ftask;
    }

    结论:通过源码可以看出,调用的submit方法,这里其实相对于execute方法来说,只多做了一步操作,就是 封装了一个RunnableFuture,然后调用execute方法,这里就是调用线程池里的worker线程来调用过ftask的run方法,执行任务,而这个ftask其实就是FutureTask里面最终实现的任务逻辑。

    6、使用注意事项

    1. 异步阻塞获取结果需要加上超时时间,否则一致阻塞;
    2. 单个请求单个try-catch,否则所有的请求都会当成异常来处理;
    3. 超时后,结果返回null,但要注意服务端可能是完成了;
    4. 带返回值的submit (CallableTask),如果不调用future.get()是不会抛出异常的;
    5. 不带返回值的excute(Runnable)直接抛异常;
    6. @Async注解,当没有返回值的时候,需要自定义异常实现捕获异常,有返回值调用future.get()OK;

    7、小结

        为了实现数据的安全性和一致性,有时需要返回线程的执行结果,这里就用到了FutureTask来实现线程的返回值,但是需要注意阻塞和异常的处理,需要设置超时时间和异常必须调用get()捕获。
     
        OK---我自横刀向天笑,去留肝胆两昆仑。
     
     
    水滴石穿,积少成多。学习笔记,内容简单,用于复习,梳理巩固。
     
     
     
    展开全文
  • 多线程 FutrueTask

    2019-05-12 21:53:16
    继承结构 FutureTask同时实现了Runnable接口和Future接口,因此可以直接提交给线程池执行,同时也可以返回线程的...主线程调用了FutrueTask的run()方法,主线程等待FutrueTask方法执行完了之后才会继续往下执行

    继承结构

    在这里插入图片描述
    FutureTask同时实现了Runnable接口和Future接口,因此可以直接提交给线程池执行,同时也可以返回线程的执行的结果

    任务运行状态

        private static final int NEW          = 0;
        private static final int COMPLETING   = 1;
        private static final int NORMAL       = 2;
        private static final int EXCEPTIONAL  = 3;
        private static final int CANCELLED    = 4;
        private static final int INTERRUPTING = 5;
        private static final int INTERRUPTED  = 6;
    
    • NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。
    • COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
    • NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
    • EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
    • CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
    • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
    • INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。

    状态的流转
    在这里插入图片描述

    get/cancle

    在这里插入图片描述
    FutrueTask是基于AQS实现的

    FutureTask使用

    A线程调用了B线程的FutureTask.run()方法,则A线程会等待B线程执行完之后才会继续执行
    1.多个线程同时执行同一任务,每个任务只能执行一次
    2.其它线程要等待这个任务执行完了才能继续执行

    public class FutureTaskTest {
    
        private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();
        private String executeTask(final String taskName) throws ExecutionException, InterruptedException {
            while (true) {
                Future<String> future = taskCache.get(taskName);
                if (future == null) {
                    Callable<String> callable = () -> {
                        Thread.sleep(1000);
                        return taskName;
                    };
                    FutureTask<String> futureTask = new FutureTask<>(callable);
                    //使用ConcurrentHashMap保证并发安全
                    future = taskCache.putIfAbsent(taskName,futureTask);
                    //保证同名的任务只会被执行一次
                    if(future == null){
                        future = futureTask;
                        futureTask.run();
                    }
                }
                    try {
                        return future.get();
                    }catch (CancellationException e){
                        taskCache.remove(taskName, future);
                    }
                }
            }
    
            public static void main(String[] args) throws ExecutionException, InterruptedException {
                FutureTaskTest futureTaskTest = new FutureTaskTest();
                System.out.println(Thread.currentThread().getName() + futureTaskTest.executeTask("abc"));
                System.out.println("等待任务执行完毕");
            }
    }
    

    执行结果如下
    在这里插入图片描述
    主线程调用了FutrueTask的run()方法,主线程等待FutrueTask方法执行完了之后才会继续往下执行

    展开全文
  • 1.前言 相信很多人了解到FutureTask是因为ThreadPoolExecutor.submit方法,根据ThreadPoolExecutor.submit的使用,我们可以先猜一下FutureTask的原理。 public static void main(String[] args) throws ...

    1.前言

    相信很多人了解到FutureTask是因为ThreadPoolExecutor.submit方法,根据ThreadPoolExecutor.submit的使用,我们可以先猜一下FutureTask的原理。

    public static void main(String[] args) throws ExecutionException, InterruptedException {

    FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {

    @Override
    public Integer call() throws Exception {
    TimeUnit.SECONDS.sleep(3);
    return 1;
    }
    });
    new Thread(futureTask).start();

    System.out.println(futureTask.get());
    }

    上面这个代码会在启动后三秒打印出1,FutureTask.get()方法调用时会直到Callable中的代码执行完才会返回,所以FutureTask需要在这里阻塞。因为可能多个线程进行get,所以需要一个阻塞队列。

    如果Callable三秒执行完,调用方过了五秒才调用get的话,FutureTask就需要把Callable中的执行结果存起来,并且也要把异常catch住存起来,所以需要一个变量存放结果。使用一个api然后想去研究它的原理,源码时,其实可以

    先想一下,它可能是怎么做的,如果是我写应该怎样设计,这样能提高自己的设计能力。

    2.原理

    FutureTask的原理其实和前言中的猜想类似,下面简述一下FutureTask的原理。

    FutureTask有两个非常重要的方法,run方法和get方法,run方法是实现了Runnable然后在run里面跑Callable的代码,

    get方法就是我们常用的获取数据的方法。run方法运行Callable中的代码然后catch住异常,然后将正常结果或者异常结果

    存起来,并且唤醒因为调用get方法阻塞的线程。get方法是去判断是否已经计算出结果,如果计算完成,返回结果否则进行

    阻塞。

    3.源码分析

    建议大家在阅读源码时,先看一下文档,虽然文档是英文的,但是自己读一下搭配翻译看懂应该不难,这里给大家介绍一个IDEA的功能,点击View->QuickDocumentation能让文档读起来更加方便。

    下面我就分析一下源码:

      
       FutureTask中的状态维护
        private volatile int state;
        private static final int NEW          = 0; //初始状态
        private static final int COMPLETING   = 1; //执行完成但是执行结果没有保存
        private static final int NORMAL       = 2; //执行完成并且保存了结果
        private static final int EXCEPTIONAL  = 3; //出现了异常
        private static final int CANCELLED    = 4; //取消
        private static final int INTERRUPTING = 5; //打断中,可以进行打断线程了
        private static final int INTERRUPTED  = 6; //线程已经被置成打断状态
    
        private Callable<V> callable; //入参
       
        private Object outcome; //执行成功结果保存到这个变量
       
        private volatile Thread runner; //正在执行的线程
       
        private volatile WaitNode waiters;//等待队列
     如果你尝试用idea追踪者这些变量在哪里赋值了,你会发现你找不到,这是因为这些变量的赋值都是通过Unsafe类完成的,这个类会直接改这些变量内存地址上对应的值。
      Unsafe可以通过对象+字段的offset找到字段对应的内存地址从而修改数据,了解了这些,在去看FutureTask的代码就很容易了
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
    try {
    UNSAFE = sun.misc.Unsafe.getUnsafe();
    Class<?> k = FutureTask.class;
    stateOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("state"));
    runnerOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("runner"));
    waitersOffset = UNSAFE.objectFieldOffset
    (k.getDeclaredField("waiters"));
    } catch (Exception e) {
    throw new Error(e);
    }
    }

    下面看一下run方法是怎样执行的

     public void run() {
          //runner置成当前线程
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran;
              //成功设置result失败设置Exception
    try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

    下面看看成功都做了些什么

    protected void set(V v) {
          //执行成功后状态扭转成完成中,扭转成功后将值存入outcome然后执行finishCompletion
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }

    下面看看失败做了什么

        protected void setException(Throwable t) {
          //与成功类似不再多讲
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }

    成功和失败都执行了finishCompletion,下面看看这个方法里干了什么

        /**
         * Removes and signals all waiting threads, invokes done(), and
         * nulls out callable.
         */

      注释已经非常清楚了。唤醒等待的节点,执行done,将callable置成null
    private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc //这里为什么能帮助gc呢,如果q在老年代,q.next在年轻代的话就可以了,详情看https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6806875 q = next; } break; } } done(); callable = null; // to reduce footprint }

    到这里run方法已经很清楚了,下面看一下get方法

    public V get() throws InterruptedException, ExecutionException {
            int s = state;
            if (s <= COMPLETING)
            //很明显需要看这个方法,记住这个传参false s
    = awaitDone(false, 0L); return report(s); }
    private int awaitDone(boolean timed, long nanos)
            throws InterruptedException {
        //timed = false
    final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) {
            //线程已经被打断了
    if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }        int s = state;
            // 已经完成了返回状态
    if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet //这里直接让出线程,让runner去赋值 Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) //加入队列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { //ture的话 等待一段时间。false的话直接阻塞 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }

    这里如果阻塞了,就等run方法执行完成的释放了,代码逻辑很清晰,jdk并发包中的实现用了很多for(;;)这其实是作者写C的习惯的while(true)会多一些指令,在java中编译成

    字节码这两个是完全一样的。下面看一下获取到状态后执行的report方法

      //正常直接返回结果,异常封装一下抛出,这里有个退出,退出的代码这里就不再继续分析了,看完上述的分析,相信你也能快速看懂退出的代码
    private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

    4.使用

    在实际开发中,大部分情况都要用到db,http,rpc这些IO操作,在一个方法中需要多次进行这些操作时,如果没有前后关联,可以使用Future充分

    使用多核cpu,比如你需要查多个表拼接成一个VO返回给前端,就可以用Future提高接口的响应时间。

     

    转载于:https://www.cnblogs.com/zhandouBlog/p/11336225.html

    展开全文
  • 1、FutureCallableFutureTask 源码说明  JDK内置的Future主要使用到了Callable接口和FutureTask类。  Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其他线程执行的任务。...
  • public interface ExecutorService extends Executor {...} ExecutorService#submit有三个重载方法,之前一直没注意,今天仔细以研究,还是有好些知识点的 1.第一个就不说了 &lt;...2.返回的futu...
  • FutureTask FutureTask实现了RunnableFuture接口,RunnableFuture接口继承于Runnable, Future<V> 这使得FutureTask既可以当做一个任务执行,也可以有 返回值。 public class FutureTask<...
  • 为什么80%的码农都做不了架构师?>>> ...
  • 1、Future 异步执行获取执行结果 我们知道通过Thread+Runnable可异步执行某项操作,但是如何异步执行后,并获取执行结果? Thread和Runnable并没有提供获取执行结果的操作。 Runnable 是对任务的封装。...
  • FutrueTask深入理解

    2021-08-09 10:53:33
    0.前言 在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建多线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就...
  • 文章目录Lock 接口 (重点)1、ReentrantLock 类2、Lock与Synchronized的区别 面试3、防止线程虚假唤醒解决虚假唤醒分析 面试4、Condition 接口 JDK 1.55、Condition实现精准通知唤醒6、关于锁的问题 面试解决集合类...
  • Java Executor框架

    2020-03-17 19:47:27
    Executor 类结构 Runnable和Callable FutrueTask
  • 多线程Callable

    2019-07-19 13:46:04
    Runnable不能获取线程块的返回值,而Callable可以通过FutrueTask获取返回值 2、Callable的使用 创建2个线程,分别获取userInfo信息与stuInfo信息 Callable<JSONObject> userInfoCall...
  • JAVAday31

    2021-01-07 22:34:25
    1.Callable<Object> 是接口 在并发包下的 Runnable , 提供了 带有... FutrueTask t = new FutrueTask(callable); new Thread(Runnable a); t.get(); 获取call方法的返回值 t.cancel(); 终止 线程执行...
  • JUC笔记

    2021-06-01 10:54:33
    创建线程的三种方式: 1)继承Thread类 ...FutrueTask这个适配类也可以传递Runnable接口,形成一种对称使用的局面,FutrueTask的get方法,可以获得任务的执行结果。 线程的创建形式有且只有一种即:.
  • 线程池学习

    2021-03-04 21:23:39
    1、runnable 2、callable 3、futrue 4、futrueTask
  • 文章目录前言FutrueTaskFutrueTask 增加自定义代码逻辑实现 Callable 接口并对象传入 FutureTask 构造函数FutureTask 的启动使用 Thread.start() 执行 FutureTask使用线程池执行 FutureTask线程池和 FutureTask ...
  •  Android应用开发中一般会用到访问网络请求,可以使用socket,webservice等。...由于androidUI主线程中不允许启动线程,这里使用FutrueTask启动线程获得返回结果。代码如下: public class HttpUtil { ...
  • 如何获取线程的返回值?

    千次阅读 2020-06-22 17:50:12
    获取线程的返回值 ...通过Callable接口实现call()获取线程返回值(通过FutrueTask Or 线程池获取,推荐使用) FutrueTask的构造方法可以传入Callable实现类的实例; isDone()可以判断call是否执行结束;
  • 实现Ruunable接口、3.futrueTask方法。 1.1.1 Thread类的实现和运行 /** * @date 2020-1-21 * @author 49800 */ public class ThreadTest { public static class MyThread extends Thread{ @Override public...
  • 实现Callable接口 与使用Runnable相比, Callable功能更强大些 相比run()方法,可以有返回值 ... FutrueTask是Futrue接口的唯一的实现类 FutureTask 同时实现了Runnable, Future接口。它既可
  • 创建线程方式: 实现Callable接口 比较实现Runnable 接口方式,方法可以有返回值,并且可以返回异常 执行Callable方式,需要...FutrueTask可用于闭锁 package com.pccc.pactera.juc01; import java.util.concur...
  • 创建线程的方式三: 实现Callable接口 新增方式一:实现Callable接口 与使用Runnable相比,Callable功能更强大些 相比run()方法,可以有...FutrueTask是Futrue接口的唯一的实现类 FutureTask同时实现了Runnable,Futur
  • 与使用Runnable相比, Callable功能更强大些 ...②FutrueTask是Futrue接口的唯一的实现类 ③FutureTask同时实现了Runnable,Future接口,它既可以作为Runnable被线程执行,又可以作为Future得到Callable的
  • 创建多线程的方式三 : 实现Callable接口 --JDK5.0新增 与Runnable相比,Callable功能更强大些 相比与run方法,可以有返回值 ...FutrueTask是Futrue接口的唯一的实现类 FutureTask 同时实现了Runnable, Future接
  • JAVA学习笔记(多线程二)——多线程的创建(二)(线程池非常重要) 新增方式一:实现Callable接口 ...FutrueTask是Futrue接口的唯一的实现类 FutureTask同时实现了Runnable, Future接口。它既可以作为Runnable
  • 第14章 网络编程

    2020-01-02 20:00:15
    ③实现Callable接口 call 可以返回值, FutrueTask, Thread 线程同步: ①synchronized 同步代码块 同步方法 ②Lock lock.lock() lock.unlock(); 死锁: ①锁的嵌套使用 ②锁的顺序不一致 生产者和消费者模型 生产者和...
  • 新增方式一:实现Callable接口 与使用Runnable相比, Callable功能更强大些 (1)call()方法相比run()方法,...FutrueTask是Futrue接口的唯一的实现类 FutureTask 同时实现了Runnable, Future接口。它既可以作为 Run
  • 创建线程相信大家都很熟悉,常用的就是继承Thread类和实现Runnable接口。 今天学习第三种方法,Callable接口和Future 来创建线程。...Callable需要依赖FutrueTask来接收结果,FutureTask是Future接口的实现类...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 959
精华内容 383
关键字:

futruetask