精华内容
下载资源
问答
  • 任务在Executor线程执行器当中是异步执行的,而有些任务是需要返回执行结果的,故在Executor派生接口ExecutorService接口中定义了带返回结果的提交方法submit,其中返回结果为Future接口的对象。 Future接口主要...

    概述

    • 任务在Executor线程执行器当中是异步执行的,而有些任务是需要返回执行结果的,故在Executor派生接口ExecutorService接口中定义了带返回结果的提交方法submit,其中返回结果为Future接口的对象。

    • Future接口主要提供了异步返回任务执行结果,取消任务执行,获取任务执行状态的功能,接口定义如下:

      public interface Future<V> {
          // 取消任务执行
          // mayInterruptIfRunning用于控制如果任务正在执行,是否中断对应的执行线程来取消该任务
          // 成功cancel,则isCancelled和isDoned都返回true。
          boolean cancel(boolean mayInterruptIfRunning);
      
          // 任务是否取消
          boolean isCancelled();
      
          // 正常执行,被取消,异常退出都返回true
          boolean isDone();
      
          // 阻塞等待执行结果
          // CancellationException:任务被取消
          // ExecutionException:任务执行异常
          // InterruptedException:该等待结果线程被中断
          V get() throws InterruptedException, ExecutionException;
      
          // 阻塞等待执行结果指定时间,除了以上异常,
          // TimeoutException:等待超时
          V get(long timeout, TimeUnit unit)
              throws InterruptedException, ExecutionException, TimeoutException;
      }
      

    FutureTask

    • Future接口的主要实现类为FutureTask,FutureTask同时实现了Runnable和Future接口,故对应的对象实例可以作为任务提交到Executor线程执行器中执行,然后通过自身来获取任务执行结果或者取消任务执行:

    • 即FutureTask的对象实例被Executor线程执行器内部线程池的某个工作线程和调用get方法等待获取结果的应用主线程所共享,故Executor内部线程池的工作线程在执行完这个任务后,可以通知和唤醒调用get阻塞等待执行结果的应用主线程,应用主线程也可以取消该任务的执行,然后通知工作线程。

      // 被线程池ExecutorService和调用get方法等待节点的主线程共享
      // 线程池的执行线程worker调用run方法;等待结果的线程调用get等待结果,
      // get通过自旋和LockSupport.park来先休眠,后等待执行线程在执行完run方法,调用LockSupport.unpark来唤醒,
      // 调用get的线程被唤醒后,由于存在自旋,故再次检查是否有结果了,由结果则可以返回了。
      public class FutureTask<V> implements RunnableFuture<V> {
          ...
      }
      
      public interface RunnableFuture<V> extends Runnable, Future<V> {
          /**
           * Sets this Future to the result of its computation
           * unless it has been cancelled.
           */
          void run();
      }
      
    • 在FutureTask中定义了volatile修饰的状态变量state来进行Executor中的工作线程和应用主线程之间的交互,即工作线程产生任务执行结果,通知应用主线程获取;应用主线程请求取消任务执行,通知工作线程停止该任务执行。在内部实现中通过将state与以下状态常量进行大小比较来获取任务执行情况,如是正常执行成功还是异常退出,被取消等。

      /*
       * NEW -> COMPLETING -> NORMAL
       * NEW -> COMPLETING -> EXCEPTIONAL
       * NEW -> CANCELLED
       * NEW -> INTERRUPTING -> INTERRUPTED
       */
      private volatile int state;
      private static final int NEW          = 0;
      private static final int COMPLETING   = 1;
      // 正常执行而结束
      private static final int NORMAL       = 2;
      // 不是正常执行而结束
      private static final int EXCEPTIONAL  = 3;
      private static final int CANCELLED    = 4;
      private static final int INTERRUPTING = 5;
      private static final int INTERRUPTED  = 6;
      
      /** The result to return or exception to throw from get() */
      private Object outcome; // non-volatile, protected by state reads/writes
      /** The thread running the callable; CASed during run() */
      private volatile Thread runner;
      

    提交任务到Executor线程执行器

    • 提交任务到Executor线程执行器,对应AbstractExecutorService的submit方法实现如下:在submit中创建了一个FutureTask对象来包装应用定义的Runnable接口实现类task,调用execute将该对象交给Executor线程执行器执行,然后返回该对象引用给应用主线程。

      protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
          return new FutureTask<T>(callable);
      }
      
      /**
       * @throws RejectedExecutionException {@inheritDoc}
       * @throws NullPointerException       {@inheritDoc}
       */
      public Future<?> submit(Runnable task) {
          if (task == null) throw new NullPointerException();
          // 将应用代码中的task包装成一个FutureTask,然后交给线程池执行
          // 线程池调度一个线程来执行FutureTask的run,在FutureTask的run中调用task的run
          RunnableFuture<Void> ftask = newTaskFor(task, null);
          execute(ftask);
          // 返回这个包装了task的FutureTask,在应用代码中可以通过get来获取执行结果
          return ftask;
      }
      

    应用主线程调用get等待执行结果

    • 在FutureTask中的get方法实现如下:

      public V get() throws InterruptedException, ExecutionException {
          int s = state;
          if (s <= COMPLETING)
              // 休眠等待执行结果
              s = awaitDone(false, 0L);
          // 休眠返回了,调用report获取结果
          return report(s);
      }
      
    • 任务状态state小于等于COMPLETING表示任务还没开始执行,则应用主线程调用awaitDone阻塞休眠,等待Executor的工作线程执行任务并通知唤醒该应用主线程。具体过程如下:

      private int awaitDone(boolean timed, long nanos)
          throws InterruptedException {
          final long deadline = timed ? System.nanoTime() + nanos : 0L;
          // 线程等待节点
          WaitNode q = null;
          boolean queued = false;
          // 自旋,休眠等待结果
          for (;;) {
              // 如果等待线程自身被中断了,则移除自身对应的等待节点
              if (Thread.interrupted()) {
                  removeWaiter(q);
                  throw new InterruptedException();
              }
      
              int s = state;
              // 在set中,将状态从COMPLETING变成了NORMAL,并调用finishCompletion唤醒等待线程集合的所有线程,
              // NORMAL大于COMPLETING,故可以返回了。
              // 另外其他不是正常执行而结束的情况,如CANCELED, INTERRUPTED, EXCEPTIONAL等也是在这里返回
              if (s > COMPLETING) {
                  if (q != null)
                      q.thread = null;
                  return s;
              }
              else if (s == COMPLETING) // cannot time out yet
                  Thread.yield();
              else if (q == null)
                  // 下一轮自旋到下一个else if (!queued),然后入队waiters
                  q = new WaitNode();
              else if (!queued)
                  // 入队等待执行结果
                  queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                       q.next = waiters, q);
              else if (timed) {
                  nanos = deadline - System.nanoTime();
                  if (nanos <= 0L) {
                      removeWaiter(q);
                      return state;
                  }
                  LockSupport.parkNanos(this, nanos);
              }
              else
                  // 被finishCompletion唤醒,自旋回到for循环重新往下执行
                  LockSupport.park(this);
          }
      }
      
    • 在FutureTask内部维护了一个单向链表waiters,用于存放当前等待该任务执行结果的线程,在任务执行完成时,遍历该链表,唤醒每个等待线程。

      /** Treiber stack of waiting threads */
      private volatile WaitNode waiters;
      static final class WaitNode {
          volatile Thread thread;
          volatile WaitNode next;
          // thread为调用get方法的线程,通常为主线程
          WaitNode() { thread = Thread.currentThread(); }
      }
      

    Executor工作线程执行任务

    • Executor的工作线程执行该任务时,会调用该任务的run方法,即FutureTask的run方法,如下为FutureTask的run方法定义:首先检查任务状态state是否为NEW,是,即还没执行过也没有被取消等,则进行往下执行。执行完成之后,产生执行结果result,调用set方法来处理这个结果。

      public void run() {
          // 赋值runner,指向线程池中当前执行这个task的线程
          if (state != NEW ||
              !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                           null, Thread.currentThread()))
              return;
          try {
              Callable<V> c = callable;
              if (c != null && state == NEW) {
                  V result;
                  boolean ran;
                  try {
                      // 应用代码的task的run方法执行
                      result = c.call();
                      ran = true;
                  } catch (Throwable ex) {
                      result = null;
                      ran = false;
                      setException(ex);
                  }
                  // 执行成功设置结果result,并通知get方法
                  if (ran)
                      set(result);
              }
          } finally {
              // runner must be non-null until state is settled to
              // prevent concurrent calls to run()
              runner = null;
              // state must be re-read after nulling runner to prevent
              // leaked interrupts
              int s = state;
              if (s >= INTERRUPTING)
                  handlePossibleCancellationInterrupt(s);
          }
      }
      
    • set方法的定义如下:将执行结果赋值给FutureTask的成员变量outcome,更新任务执行状态state为NORMAL,最后调用finishCompletion通知所有等待这个任务执行结果的线程。

      protected void set(V v) {
          if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
              outcome = v;
              // 修改状态为NORMAL,NORMAL大于COMPLETING
              UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
              // 唤醒调用get方法等待结果的线程
              finishCompletion();
          }
      }
      
    • finishCompletion的实现如下:遍历任务等待线程链表,使用LockSupport.unpart唤醒对应的线程,然后将该等待线程从链表中移除。

      private void finishCompletion() {
          // assert state > COMPLETING;
          // waiters为调用了get方法的线程集合,通常为主线程
          for (WaitNode q; (q = waiters) != null;) {
              if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                  for (;;) {
                      Thread t = q.thread;
                      if (t != null) {
                          q.thread = null;
                          // 唤醒等待线程,即调用了get方法的线程
                          LockSupport.unpark(t);
                      }
                      WaitNode next = q.next;
                      if (next == null)
                          break;
                      q.next = null; // unlink to help gc
                      q = next;
                  }
                  break;
              }
          }
      
          done();
      
          callable = null;        // to reduce footprint
      }
      
    • 然后回到get方法,应用主线程从awaitDone阻塞返回后,通过report方法来检测执行状态并返回任务执行结果。

      public V get() throws InterruptedException, ExecutionException {
          int s = state;
          if (s <= COMPLETING)
              // 休眠等待执行结果
              s = awaitDone(false, 0L);
          // 休眠返回了,调用report获取结果
          return report(s);
      }
      private V report(int s) throws ExecutionException {
          Object x = outcome;
          if (s == NORMAL)
              return (V)x;
          // 如果该需要获取结果的线程已经被取消掉了,则抛异常
          if (s >= CANCELLED)
              throw new CancellationException();
          throw new ExecutionException((Throwable)x);
      }
      

    应用主线程取消任务

    • 在应用主线程中,可以通过调用FutureTask的cancel方法来取消该任务的执行,cancel方法的定义如下:主要是更新任务的状态state为INTERRUPTING或者CANCELLED,然后根据mayInterruptIfRunning来控制如果该任务已经在执行,是否中断对应的工作线程来中止该任务的执行,最后调用finishCompletion方法来唤醒等待这个任务执行结果的线程,避免该任务被取消后,这些线程还在阻塞等待结果。

      public boolean cancel(boolean mayInterruptIfRunning) {
          // 只能取消还没被执行的task,即state为NEW表示还没执行
          if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
              return false;
          try {    // in case call to interrupt throws exception
              // 如果执行完上面检查,到执行到这里,刚好线程池分配了一个线程来执行这个任务,则检查在运行时,是否可以中断该执行线程
              if (mayInterruptIfRunning) {
                  try {
                      Thread t = runner;
                      // 中断执行线程,这样线程池在之后会补回来一个执行线程
                      if (t != null)
                          t.interrupt();
                  } finally { // final state
                      UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                  }
              }
          } finally {
              // 唤醒调用了get方法等待获取该任务的结果的线程,避免该任务被取消了,这些线程还在等待。
              finishCompletion();
          }
          return true;
      }
      
    • 在Executor的工作线程执行这个任务时,会调用FutureTask的run方法,在run方法中是先检查任务的状态state,如果发现不是NEW,即可能是CANCELLED,INTERRUPTING等,则直接返回,退出该任务的执行。

      public void run() {
          // 赋值runner,指向线程池中当前执行这个task的线程
          if (state != NEW ||
              !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                           null, Thread.currentThread()))
              return;
          }
          
          ...
          
      }
      
    展开全文
  • celery获取任务执行结果

    万次阅读 2019-02-18 16:51:54
    在某些特定情境下,我们会先发起celery任务,过一段时间之后再获取其执行结果,而不用一直等待。 代码如下: from celery.result import AsyncResult res=AsyncResult("62051878-ca77-4895-a61f-6f9525681347&...

    在某些特定情境下,我们会先发起celery任务,过一段时间之后再获取其执行结果,而不用一直等待。
    代码如下:

    from celery.result import AsyncResult
    res=AsyncResult("62051878-ca77-4895-a61f-6f9525681347") # 参数为task id
    res.result
    
    展开全文
  • 26 - Future 获取异步任务结果

    千次阅读 2020-07-05 10:42:25
      在上一篇文章《25 - ThreadPoolExecutor 线程池》中,我们详细介绍了如何创建正确的线程池,也粗略了讲了一下如何启动线程池,execute() submit() 都是用来执行线程池任务的,它们最主要的区别是,submit() ...

      在上一篇文章《25 - ThreadPoolExecutor 线程池》中,我们详细介绍了如何创建正确的线程池,也粗略了讲了一下如何启动线程池,execute() 和 submit() 都是用来执行线程池任务的,它们最主要的区别是,submit() 方法可以接收线程池执行的返回值,而 execute() 不能接收返回值。 submit() 方法可以配合 Futrue 来接收线程执行的返回值,下面我们就来详细看看到底该如何获取任务的执行结果。

      

    1. 如何获取任务的执行结果

    1.1 ThreadPoolExecutor.submit

      Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下:

    // 提交Runnable任务
    Future<?> submit(Runnable task);
    
    // 提交Callable任务
    <T> Future<T> submit(Callable<T> task);
    
    // 提交Runnable任务及结果引用  
    <T> Future<T> submit(Runnable task, T result);
    

      你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone() 以及 2个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    
    // 判断任务是否已取消  
    boolean isCancelled();
    
    // 判断任务是否已结束
    boolean isDone();
    
    // 获得任务执行结果
    get();
    
    // 获得任务执行结果,支持超时
    get(long timeout, TimeUnit unit);
    

    这 3 个 submit() 方法之间的区别在于方法参数不同,下面我们简要介绍一下:

    1. 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join();
    2. 提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果;
    3. 提交 Runnable 任务及结果引用 submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
    public class TestFuture {
    
        public static void main(String[] args) throws Exception {
            Result result = new Result();
            result.setId(10086L);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
                    60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));
            Future<Result> resultFuture = threadPoolExecutor.submit(new Task(result), result);
            Result result1 = resultFuture.get();
            System.out.println(result == result1);
            System.out.println(result1.getId());
            System.out.println(result1.getDesc());
        }
    
        static class Task implements Runnable {
            private Result result;
    
            public Task(Result result) {
                this.result = result;
            }
    
            @Override
            public void run() {
                System.out.println("Result id is:" + result.getId());
                result.setDesc("这个一个 Future 的测试...");
            }
        }
    
        static class Result {
            private Long id;
            private String desc;
    
            public Long getId() {
                return id;
            }
    
            public void setId(Long id) {
                this.id = id;
            }
    
            public String getDesc() {
                return desc;
            }
    
            public void setDesc(String desc) {
                this.desc = desc;
            }
        }
    }
    
    # 运行结果如下:
    Result id is:10086
    true
    10086
    这个一个 Future 的测试...
    

      

    1.2 FutureTask 工具类

      前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。

    FutureTask(Callable<V> callable);
    
    FutureTask(Runnable runnable, V result);
    

      那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。

    
    // 创建FutureTask
    FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
    
    // 创建线程池
    ExecutorService es = Executors.newCachedThreadPool();
    
    // 提交FutureTask 
    es.submit(futureTask);
    
    // 获取计算结果
    Integer result = futureTask.get();
    

      FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。

    // 创建FutureTask
    FutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);
    
    // 创建并启动线程
    Thread T1 = new Thread(futureTask);
    T1.start();
    
    // 获取计算结果
    Integer result = futureTask.get();
    

      

    2. 实现最优的“烧水泡茶”程序

      记得以前初中语文课文里有一篇著名数学家华罗庚先生的文章《统筹方法》,这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这样:

    在这里插入图片描述
      下面我们用程序来模拟一下这个最优工序。我们专栏前面曾经提到,并发编程可以总结为三个核心问题:分工、同步和互斥。编写并发程序,首先要做的就是分工,所谓分工指的是如何高效地拆解任务并分配给线程。对于烧水泡茶这个程序,一种最优的分工方案可以是下图所示的这样:用两个线程 T1 和 T2 来完成烧水泡茶程序,T1 负责洗水壶、烧开水、泡茶这三道工序,T2 负责洗茶壶、洗茶杯、拿茶叶三道工序,其中 T1 在执行泡茶这道工序时需要等待 T2 完成拿茶叶的工序。对于 T1 的这个等待动作,你应该可以想出很多种办法,例如 Thread.join()、CountDownLatch,甚至阻塞队列都可以解决,不过今天我们用 Future 特性来实现。

    在这里插入图片描述
      下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

    public class TestFutureTask {
    
        public static void main(String[] args) throws Exception {
            FutureTask<String> ft1 = new FutureTask<>(new Task1());
            FutureTask<String> ft2 = new FutureTask<>(new Task2(ft1));
            Thread t1 = new Thread(ft1);
            t1.start();
            Thread t2 = new Thread(ft2);
            t2.start();
            System.out.println(ft2.get());
        }
    
        static class Task2 implements Callable<String> {
            private FutureTask<String> futureTask;
    
            public Task2(FutureTask futureTask) {
                this.futureTask = futureTask;
            }
    
            @Override
            public String call() throws Exception {
                System.out.println("线程2:吸水壶");
                Thread.sleep(1000);
                System.out.println("线程2:烧开水");
                Thread.sleep(1000);
                String result = futureTask.get();
                System.out.println("线程2:拿到茶叶:" + result);
                System.out.println("线程2:泡茶");
                Thread.sleep(1000);
                return "上茶喽...";
            }
        }
    
        static class Task1 implements Callable<String> {
            @Override
            public String call() throws Exception {
                System.out.println("线程1:洗茶壶");
                Thread.sleep(1000);
                System.out.println("线程1:洗茶杯");
                Thread.sleep(1000);
                System.out.println("线程1:拿茶叶");
                Thread.sleep(1000);
                return "龙井";
            }
        }
    }
    
    # 运行结果如下:
    线程1:洗茶壶
    线程2:吸水壶
    线程1:洗茶杯
    线程2:烧开水
    线程1:拿茶叶
    线程2:拿到茶叶:龙井
    线程2:泡茶
    上茶喽...
    

      

    3. 总结

      利用 Java 并发包提供的 Future 可以很容易获得异步任务的执行结果,无论异步任务是通过线程池 ThreadPoolExecutor 执行的,还是通过手工创建子线程来执行的。Future 可以类比为现实世界里的提货单,比如去蛋糕店订生日蛋糕,蛋糕店都是先给你一张提货单,你拿到提货单之后,没有必要一直在店里等着,可以先去干点其他事,比如看场电影;等看完电影后,基本上蛋糕也做好了,然后你就可以凭提货单领蛋糕了。

      利用多线程可以快速将一些串行的任务并行化,从而提高性能;如果任务之间有依赖关系,比如当前任务依赖前一个任务的执行结果,这种问题基本上都可以用 Future 来解决。在分析这种问题的过程中,建议你用有向图描述一下任务之间的依赖关系,同时将线程的分工也做好,类似于烧水泡茶最优分工方案那幅图。对照图来写代码,好处是更形象,且不易出错。

    展开全文
  • java 异步任务结果

    千次阅读 2016-06-22 22:49:43
    Callable, Future, FutureTaskCallable 与RunnableRunnable 介绍Runnable只是一个接口,它可以被任何类继承,它的实例通过线程执行Callable 与Runnable区别代码public interface ...当做任务被线程执行。 特点 当做

    Callable, Future, FutureTask

    Callable 与Runnable

    Runnable 介绍

    Runnable只是一个接口,它可以被任何类继承,它的实例通过线程执行

    Callable 与Runnable区别
    代码
    public interface Runnable {
    
        public abstract void run();
    }
    作用:
    1. 当做线程使用
    2. 当做任务被线程执行。
    特点
    1. 当做任务没有时run()方法没有返回值;
    2. 不能抛出异常;
    Callable 介绍
    源码
    public interface Callable<V> {
    
        V call() throws Exception;
    }

    Callable是一个任务接口,它定义了一个带有返回值的并且抛出异常的任务。

    作用
    1. 当做任务
    特点
    1. 方法call() 具有返回值
    2. 可抛出异常,支持泛型

    Future

    future 介绍

    future 代表异步操作运算的结果,它也是一个接口;
    常用于获取运行结果,查询是否完成,对任务结果进行取消

    public interface Future<V> {
    
        /**
    
           尝试取消当前任务,如果任务已经被完成,这个操作就失败返回fail,    
           参数 mayInterruptIfRunning 为ture 时,代表任务在执行中,也可以取消任务; 为false时,只能取消未被执行的任务。
    
         */
        boolean cancel(boolean mayInterruptIfRunning);
    
        /**
         * Returns {@code true} if this task was cancelled before it completed
         * normally.
         *
         * @return {@code true} if this task was cancelled before it completed
         */
        boolean isCancelled();
    
        /**
         返回任务是否完成。
         *
         * @return {@code true} if this task completed
         */
        boolean isDone();
    
        /**
         * 获取返回结果,有必要一直等在结果返回;
         */
        V get() throws InterruptedException, ExecutionException;
    
        /**
         * 获取执行结果,有必要就等待timeout , 指定时间内结果没有执行完,返回Null
         *
         * @param timeout the maximum time to wait
         * @param unit the time unit of the timeout argument
         * @return the computed result
         * @throws CancellationException if the computation was cancelled
         * @throws ExecutionException if the computation threw an
         * exception
         * @throws InterruptedException if the current thread was interrupted
         * while waiting
         * @throws TimeoutException if the wait timed out
         */
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    功能
    1. 取消任务
    2. 获取任务执行结果
    3. 判断任务是否完成

    FutureTask

    简述

    FutureTask 是一个类,

    部分源码
    public class FutureTask<V> implements RunnableFuture<V> {
    .....
    
    }
    
    RunnableFuture
    /**
    
     */
    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }

    RunnableFuture 继承了Runnable, 和Future,它既可以当做任务被执行,又可以判断结果是完成,但获取不到具体的值,因为它的父类是Callable 而不是Runnable。

    FutureTask 是RunnableFuture 的子类实现。

    示例
    
    public class AsyncTask {
    
        public static void main(String args[]) throws InterruptedException, ExecutionException {
    
            ExecutorService service = Executors.newCachedThreadPool();
            MyTask1 task1 = new MyTask1();
    
            Future<String> result1 = service.submit(task1);
            System.out.println("...等待完成任务1。。执行结果是:" + result1.get());
    
            MyTask2 task2 = new MyTask2();
            Future<?> result2 = service.submit(task2);
            System.out.println("...等待完成任务2.。执行结果是。" + result2.get());
    
            FutureTask<String> ft = new FutureTask<String>(task1);
    
            Future<String> reuslt2 = (Future<String>) service.submit(ft);
    
            System.out.println("...等待任务3.。执行结果是:" + result2.get());
    
            service.shutdown();
    
        }
    
    
    }
    
    class MyTask2 implements Runnable {
    
        @Override
        public void run() {
    
            System.out.println("..执行Runnable任务。。");
    
        }
    
    }
    
    
    class MyTask1 implements Callable<String> {
    
        @Override
        public String call() throws Exception {
            System.out.println("...线程执行callable任务。");
            return "success";
        }
    
    }
    执行结果
    ...线程执行callable任务。
    ...等待完成任务1。。执行结果是:success
    ..执行Runnable任务。。
    ...等待完成任务2.。执行结果是。null
    ...等待任务3.。执行结果是:null
    ...线程执行callable任务。
    
    展开全文
  • 1、Future Future模式是多线程设计常用的一种设计模式。Future模式可以理解成:我有一个任务,提交给了Future,Future替我完成这...能够获取任务执行的结果 向线程池中提交任务的submit方法不是阻塞方法,而Future...
  • 1.Callable接口 ...Callable的接口Runnable接口的区别是:Callable有一个call方法能够得到任务执行结果,而Runnable的run方法无法得到返回结果。Callable的接口的定义如下: public interface Callable {
  • 这篇文章中对sleeptick的原理做了介绍 总的来说 Sleep是使用睡眠完成定时,结束后继续往下执行循环来实现定时任务。 Tick函数是使用channel阻塞当前协程,完成定时任务的执行 现在来看一下 两种方法实现出来的...
  • setTimeoutPromise区别(宏任务和任务

    千次阅读 多人点赞 2018-10-10 00:14:40
    javascript的宏任务和任务任务有Event Table、Event Queue,微任务有Event Queue 1.宏任务:包括整体代码script,setTimeout,setInterval; 2.微任务:Promise,process.nextTick 注:Promise立即执行,...
  • java多线程提交任务并返回结果

    千次阅读 2018-08-16 21:30:06
    最近工作中有需要短时间内提交大量请求,并获取响应结果,最终选择了CompletionService接口来实现,它整合了ExecutorBlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take方法...
  • javascript的宏任务和任务

    万次阅读 多人点赞 2018-04-11 20:28:19
    结果凉凉了。但是还是学到了很多东西。其中在面试时面试官问了我一道题 setTimeout(function(){ console.log('1') }); new Promise(function(resolve){ console.log('2'); resolve(); }).the...
  • 多线程并发获取任务结果

    千次阅读 2018-06-07 16:03:44
    一般在进行多线程开发时,常用...Runnable的源码如下,表明该任务不支持结果返回,同时不外抛异常。@FunctionalInterface public interface Runnable { public abstract void run(); }Callable的源码如下,表明该任...
  • 首先说一下想要达到的效果,就是网页有一个按钮,用户可以通过按钮提交任务到spark,spark集群运行并得出结果结果能够返回给页面或者服务器。主要就是有两个问题。第一:如何通过服务器提交spark任务,让spark跑...
  • Future:未来对任务结果的获取 CompletionService:及时获取已完成任务的结果 Executor:任务执行 关系:Executor &amp;amp;amp;lt;- ExecutorService =&amp;amp;amp;gt; ...
  • 代码如下 : ... import java.util.HashSet; import java.util.List;...import java.util.Set;...import java.util.concurrent.*;...执行结果如下 : future task: task2 future task: task1 future task: task3
  • Django中celery执行任务结果的保存

    千次阅读 2018-05-23 11:10:52
    pip3 install django-celery-results ...注意异步任务views.py中调用时,想要记录结果必须是“任务函数.delay(*args)”,这样才能写入数据库的表中 具体记录结果可以查看表:django_celery_results_taskresult;
  • 多线程 | CompletionService异步获取并行任务执行结果,主要分两点来讨论CompletionService: 1)使用方式 2)源码分析 1)使用方式 使用FutureCallable可以获取线程执行结果,但获取方式确实阻塞的,...
  • RxJava实现串行任务和并行任务

    千次阅读 2019-09-16 14:53:14
    串行并行,是针对任务这个概念而来的,也就是串行任务和并行任务。 那我们需要了解一下什么是任务。 用一个http网络请求来看,这一个网络请求就是一个任务。它包含了发送请求、后台处理、处理返回数据这几个步骤。...
  • 如题,开发了一个spark任务,通过java web 提交到spark集群,如果获取计算返回的 结果
  • Windows Server 2003下面这样执行任务计划没问题,而就Windows Server 2008 R2下面出现意外。 正常情况下我们的任务计划会有反馈数值,通过它可以判断这个任务计划上次是否运行正常。 ·代码...
  • 写一个程序,在线程池中提交多个任务,每个任务最终都有一个执行结果,需求是对每个任务的执行结果进行汇总(样例中是把结果加在一起)。这里使用线程池的submit方法Future实现。 定义一个任务类 import java....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,335,204
精华内容 534,081
关键字:

任务和结果