-
2019-02-15 16:59:34
引言
本篇博客介绍通过“执行任务”的机制来设计应用程序时需要掌握的一些知识。所有的内容均提炼自《Java并发编程实战》中第六章的内容。
大多数并发应用程序都是围绕“任务执行”来构造的:任务通常是一些抽象的且离散的工作单元。
当围绕“任务执行”来设计应用程序结构时,第一步,就是要找出清晰的任务边界。在理想情况下,各个任务之间是相互独立的:任务并不依赖于其他任务的状态、结果或边界效应。
大多数服务器应用程序提供了一种自然的任务边界选择方式:以独立的客户请求为边界。
不好的例子:串行与“为每个任务创建线程”
在书中6.1节,介绍了由最简单的串行执行任务到为每个任务创建一个线程这两种执行任务的方式。应该说这两种方式都是不可取的。
这一节主要是为了引出下一节介绍的“任务执行框架”。
其中“串行执行任务”的缺点是在一般的服务器应用程序中,无法提高吞吐率或快速响应性。
而“为每个任务创建线程”的方式的问题在于可能导致:高性能开销、高资源消耗、影响稳定性。
【重点】在工作或面试中也会遇到这个极富针对性的问题,即大量创建线程会存在哪些问题?
1、高性能开销:创建和销毁都需要一定的代价,创建过程需要时间,延迟处理请求,也需要jvm和操作系统提供一些辅助操作。
2、高资源消耗:活跃的线程会消耗系统资源,尤其是内存。当可运行的线程数量多余可用处理器的数量,那么会有大量空闲的线程占用内存,不仅给垃圾回收带来压力,在竞争CPU的时候还将产生额外的性能开销。
3、影响稳定性:大量线程占用内存,内存不足,导致可能抛出OutOfMemoryError,系统崩溃。
线程数量的限制
书中在这里简单引出一个概念:稳定性。
根据前后文的联系,这里具体指的是:应用程序不会因为线程过多而抛出OutOfMemoryError异常。
为了达到这种稳定性,在可创建线程数量上存在一个限制。这个限制受平台以及多个因素影响,包括JVM启动参数、Thread构造函数中请求的栈大小、底层操作系统对线程的限制等。例如,在32位机器上,其中一个主要的限制因素是线程栈的地址空间。每个线程都维护两个执行栈,一个用于Java代码,另一个用于原生代码。
通常,JVM在默认情况下会生成一个复合栈,大小约0.5M~1M(这个值可以通过JVM标志 -Xss或通过Thread的构造函数来修改),那么:线程数量 ≈ 2^32(bit) / 0.5(MB) ≈几千或几万。
因此,在一定范围内,增加线程可以提高系统的吞吐率,但如果超出这个范围,再创建更多的线程只会降低程序的执行速度。
Executor接口
public interface Executor { void execute(Runnable command); }
Executor是一个非常简单的接口,只有一个execute(Runnable) 方法,它是其他的灵活且强大的异步任务框架的基础。通过这种方式,用Runnable来表示任务,可以将任务的提交过程与执行过程解耦。
Executor本身就是基于生产者消费者,提交任务相当于生产者,执行任务相当于消费者,因此,如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor。
什么是执行策略?
执行策略,定义了任务执行的“what、where、when、how”等方面,主要是描述根据不同的资源而选择不同的执行方式,一个最优执行策略应当是与硬件资源最匹配的。
线程池
先来看一下四种常用线程池的创建:
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10); ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(10); ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
其中:ExecutorService extends Executor,ScheduledExecutorService extends ExecutorService 。
1、newFixedThreadPool(int) :创建一个定额线程池,每提交一个任务创建一个线程,达到数量限制后不再增加,这时线程池的规模将不再变化(如果某个线程由于发生了未预期的异常而结束,那么线程池会补充一个新的线程)
2、NewCachedThreadPool() : 创建一个可缓存的线程池,线程池的规模不存在任何限制,当线程多余任务时,回收空闲线程;当任务增加时,创建新线程。
3、NewSingleThreadExecutor:单线程的Executor,如果这个线程异常结束,会创建另一个线程来替代。NewSingleThreadExecutor能确保依照任务在队列中的顺序串行执行(例如FIFO、LIFO、优先级)。
4、NewScheduleThreadPool:创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。
Executor的生命周期
JVM只有在所有(非守护)线程全部终止后才会退出,无法正确地关闭Executor,JVM将无法结束。
Executor以异步的方式来执行任务,导致了提交任务的状态不是立即可见的,即有些任务可能已经完成,有些可能正在执行,还有些可能正在队列中等待执行。
ExecutorSevice接口就是为了解决执行服务的生命周期问题,扩展了Executor接口。它添加了一些用于声明周期管理的方法(同时还有一些用于任务提交的便利方法):
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ......其他用于任务提交的便利方法 }
这五个方法是声明周期管理的方法,其余的都是与任务提交相关的方法,比如,可以提交比较大的集合Callable对象的方法:
invokeAll(Collection<? extends Callable<T>> tasks)
【重点】ExecutorService的三种状态:运行、关闭、已终止 。
ExecutorService在初始创建时处于运行状态。shutdown()方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow()方法将执行粗暴的关闭方式:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。
延迟任务与周期任务
Timer类负责管理延迟任务以及周期任务,但它本身存在缺陷,因此通常要用ScheduleThreadPoolExecutor的构造函数或newScheduleThreadPool工厂方法来创建该类对象。
Timer的缺陷在于,Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时精确性。
Timer还有一个问题就是,Timer线程不会捕获异常,当TimerTask抛出未检查异常时将终止定时线程。Timer也不会恢复线程的执行,而是会错误地任务整个Timer都被取消了。这就造成:已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不会被调度。称之为“线程泄漏”。
【重点】生命周期小结
Runnable和Callable等任务的生命周期:创建、提交、开始、完成、取消。
Future表示的就是一个任务的生命周期。
Thread的生命周期:创建、就绪、运行、阻塞、死亡(或结束)。
ExecutorService的生命周期(因为它继承自Executor,因此也是Executor的生命周期):创建、运行、关闭、已终止。
Callable与Future
callable
Runnable有一个局限性是没有返回值,也没办法抛出受检异常。对于某些异步获得结果的任务无法胜任,Callable应运而生。
它是Runnable的升级版,既可以使用Callable<Void>来达到Runnable一样的效果,同时也可以使用Callable<T> 来指定返回结果。
创建Callable的方式有两种:构造函数、静态的封装方法。
Callable<String> callableTask = new Callable<String>() { @Override public String call() throws Exception { return "this is a callable task...."; } };
Java 8 style:
Callable<String> callableTask = () -> { return "this is a callable task...."; };
静态方法:Executors.callable(Runnable task, T result):
Callable<String> call = Executors.callable(() -> { System.out.println("this is a runnable task..."); }, "done!");
Future
future表示一个任务的生命周期。主要提供了一些方法用于判断任务处于哪个阶段,还可以获取任务的结果甚至是取消任务。它本身还有一层隐含意义是,任务的生命周期只能前进,不能后退,当一个任务处于“完成”状态,就永远停留在“完成”状态上。这一点和ExecutorService的生命周期一样。
Future接口:
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
创建Future的方式通常是使用ExecutorService的submit()方法获取返回值。如果想通过构造器的方式显式地创建一个任务的生命周期管理对象,可以使用FutureTask。
FutureTask<String> runnFutureTask = new FutureTask<String>(runnable, "done!"); FutureTask<String> callFutureTask = new FutureTask<>(callable);
FutureTask类实现了Runnable和Future两个接口。
(说明:FutureTask是Java 5加入的类,Java 6又为它补充了一个新的RunnableFuture接口,Runnable接口和Future接口被提升到了RunnableFuture接口上,这更像是一种重构手段,我个人认为在实际开发中用途可能不及直接使用FutureTask)
由于FutureTask实现了Runnable接口,因此可以将它提交给Executor来执行,或者直接调用它的run方法。
是的,FutureTask的run()方法可以直接执行任务,而不需要什么start。
Future.get
get()方法的行为取决于任务的状态(尚未开始、正在运行、已完成)如果任务已经完成,那么get会立即返回或抛出一个Exception;如果任务没有完成,那么get将阻塞直到任务完成。如果任务抛出异常,那么get将该异常封装成ExecutionException并重新抛出,可以通过getCause来进一步获得被封装的初始异常。如果任务被取消,那么get将抛出CancellationException。
异构任务并行化存在的局限
A与B两个完全不同的任务通过并行方式可以实现小幅度的性能提升,但是如果想大幅度的提升存在一定的困难。因此,得出一个结论是,只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出真正的性能提升。
CompletionService与它的子类ExecutorCompletionService
CompletionService是Executor与BlockingQueue的融合。
回顾一下BlockingQueue的一些特性:
BlockingQueue接口是Queue的子接口,有两个最主要的实现,LinkedBlockingQueue(无界队列)和ArrayBlockingQueue(有界队列)。take()或poll()方法都是BlockingQueue的取头元素的方法,唯一不同的是当没有可用的头元素时,take会无限期等待(阻塞),poll可以设置一个超时时间,一旦超时,将返回null。
CompletionService是在任务执行的功能上加入了队列的特性,很明显是用于处理一批允许有返回值的任务。
用法:创建一个CompletionService(ExecutorCompletionService对象)。【ExecutorCompletionService的构造器允许我们传入一个ExecutorService(用于采取不同的执行策略)和一个BlockingQueue(该参数可选,默认LinkedBlockingQueue)】然后可以将一组Callable任务提交给CompletionService来执行,然后使用类似队列操作的take或poll方法来获取已完成的结果,这些结果会在完成时被封装为Future。
【扩展】ExecutorCompletionService的实现很简单。首先通过构造函数创建一个BlockingQueue来保存计算结果,然后当计算完成时,调用FutureTask的done方法,放入队列。展开:当提交某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask【回顾:FutureTask实现了Future、Runnable】的一个子类,QueueingFuture改写了FutureTask的done方法——将结果放入BlockingQueue中。take和poll方法委托给BlockingQueue方法,这些方法会在得到结果之前阻塞。
为任务设置时限
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。Future.get中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么抛出TimeoutException。
在使用时限任务时需要注意,当这些任务超市后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。
【使用Future.get为单个任务设置时限,如果希望对一组任务设置计算时限,比如前面介绍的CompletionService,那么可以使用poll方法来设置执行时间】
invokeAll方法
ExecutorServie接口中有两个重载的invokeAll方法:
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
invokeAll方法支持将多个任务提交到一个ExecutorService并获得结果。invokeAll方法的参数为一组任务,并返回一组Future。invokeAll按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而使调用者能够将各个Future与其表示的Callable关联起来。
当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll都会返回。当超过指定时限,任何还未完成的任务都会取消。当invokeAll返回后,每个任务要么正常完成,要么被取消,而客户端代码可以调用get或isCancelled来判断究竟是何种情况。
第六章小结
通过围绕任务执行来设计应用程序,可以简化开发过程,并有助于实现并发。
Executor框架将任务提交与执行策略解耦开来,同时还支持多种不同类型的执行策略。当需要创建线程来执行任务时,可以考虑使用Executor。
要想在将应用程序分解为不同的任务时获得最大的好处,必须定义清晰的任务边界。某些应用程序中存在着比较明显的任务边界,而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性。
更多相关内容 -
java中多线程执行器Executor之submit中Runnable任务和Callable任务的区别
2020-08-23 13:15:371.线程执行器分离了任务的创建和执行,通过使用执行器,只需要实现Runnable接口的对象,然后把这些对象发送给执行器即可。 2.使用线程池来提高程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的...线程执行器和不使用线程执行器的对比(优缺点)
1.线程执行器分离了任务的创建和执行,通过使用执行器,只需要实现Runnable接口的对象,然后把这些对象发送给执行器即可。
2.使用线程池来提高程序的性能。当发送一个任务给执行器时,执行器会尝试使用线程池中的线程来执行这个任务。避免了不断创建和销毁线程导致的性能开销。
3.执行器可以处理实现了Callable接口的任务。Callable接口类似于Runnable接口,却提供了两方面的增强:
a.Callable主方法名称为call(),可以返回结果
b.当发送一个Callable对象给执行器时,将获得一个实现了Future接口的对象。可以使用这个对象来控制Callable对象的状态和结果。
4.提供了一些操作线程任务的功能
使用线程执行器的例子
**A)执行继承了Runnable接口的任务类
**
package com.springboot.hello.thread; public class TaskRunnable implements Runnable{ private String name ; public String getName() { return name; } public void setName(String name) { this.name = name; } public TaskRunnable(String name ){ this.name =name ; } @Override public void run(){ try { System.out.println(name+ " 任务执行中。。。"); Thread.sleep(15000); System.out.println(name+ " 任务真的执行完了。。。"); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { ExecutorServer executorServer = new ExecutorServer(); for (int i = 0; i < 20; i++) { TaskRunnable task = new TaskRunnable(" name-"+i); executorServer.executorTask(task);//异步处理 } System.out.println("main 方法走完了。。。"); executorServer.endExecutor();//直到所有进行中的任务执行完毕后才真的关闭主线程了 System.out.println("main 方法走完了。。endExecutor"); } } 使用执行器调用任务类: ```bash package com.springboot.hello.thread; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; /** * 线程执行器调用任务类 */ public class ExecutorServer { private ThreadPoolExecutor executor ; public ExecutorServer(){ executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); } public void executorTask(TaskRunnable task){ System.out.println("===开始执行线程:"+ task.getName() ); executor.submit(task); System.out.println("===线程"+task.getName()+"执行完成:"+executor.getActiveCount()); } public void endExecutor (){ executor.shutdown(); } }
需要注意的地方:
1、ThreadPoolExecutor提供了好几种构造函数,由于这些构造函数的参数比较多,难于记忆,所以这里使用Executors类对其构造函数进行了封装,封装后的静 态函数可以通过函数名称更加直观的表述其含义。
2、执行实现Runnable接口的任务类使用的方式是:executor.execute(task);后面可以看到它和调用实现Callable接口的任务类还是有区别的。
3、使用执行器时要显示结束执行器。如果不关闭,那么执行器会一直执行而程序不会结束。如果执行器没有任务执行了,它将继续等待新任务的到来,而不会 结束执行。结束执行器这里使用的方式是shutdown();**
B) 执行实现了Callable接口的任务
**
package com.springboot.hello.thread; import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; public class TaskCallable implements Callable<String> { private String name ; public TaskCallable(String name){ this.name = name ; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String call() throws Exception { return dealData(name); } public String dealData(String name){ System.out.println(name+" dealData处理开始"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(name+" dealData处理完成"); return name +" success" ; } public static void main(String[] args) { Long startTime = System.currentTimeMillis() ; ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2); for (int i = 0; i < 20; i++) { System.out.println("开始循环处理name"+i); TaskCallable taskCallable = new TaskCallable("name"+i); Future<String > futureResult = executor.submit(taskCallable) ;//(同步处理)会等待当前线程的执行结果,获取结果后才往下走 try { if (i ==9 ){ futureResult.cancel(true); //System.out.println("task name"+i+" 取消成功结果:"+futureResult.get()); }else { System.out.println("task name"+i+" 执行结果:"+futureResult.get()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } System.out.println("耗时==="+( System.currentTimeMillis()-startTime )); } public static void main2(String[] args) { { Long startTime = System.currentTimeMillis() ; ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10); Map<Integer,Future<String>> map = new HashMap<>(); for (int i = 0; i < 20; i++) { System.out.println("开始循环处理 name"+i); TaskCallable taskCallable = new TaskCallable("name"+i); map.put(i, executor.submit(taskCallable) );// put进去后就走下一个循环了,不用等待submit结果了,相当于异步 } for (int i = 0; i < map.size(); i++) { try { // 添加了循环获取Future<String>结果集合,就会在当前for循环里面等待每一个task的submit执行结果了 System.out.println("name"+i+" 执行结果 :"+map.get(i).get()); //将主线程变为等待所有线程任务执行完成了后的同步线程了 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //扩大newFixedThreadPool的线程数, 就会加速执行 System.out.println("耗时==="+( System.currentTimeMillis()-startTime )); } } }
执行器调用submit()方法执行任务之后,返回一个Future类型对象。Future是一个异步任务的结果。意思就是任务交给执行器后,执行器就会立刻返回一个Future对象,而此时任务正在执行中。Future对象声明了一些方法来获取由Callable对象产生的结果,并管理他们的状态
完成 ## ***补充:*** 线程执行器的四种实例方式 前面提到由于ThreadPoolExecutor类的构造函数比较难记忆(参数多,形式也差不多),Java提供了一个工厂类Executors来实现执行器对象的创建。具体函数如下:  这些函数以new开头。 1、newCachedThreadPool():缓存线程池 1 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 需要注意的地方: 如果需要执行新任务,缓存线程池就会创建新线程;如果线程所运行的任务执行完成后并且这个线程可用,那么缓存线程池将会重用这些线程。 优点:减少了创建新线程所花费的时间 缺点:如果任务过多,系统的负荷会过载 使用条件:线程数量合理(不太多)或者线程运行只会运行很短的时间 2、newFixedThreadPool():固定线程池,(fixed:固定) public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 需要注定的地方: 创建了具有线程最大数量值(即线程数量 <= nThreads)的执行器。如果发送超过数量的任务给执行器,剩余的任务将被阻塞知道线程池中有可空闲的线程来处理它们。 3、newSingleThreadExecutor():单线程执行器 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 4、newScheduledThreadPool(int corePoolSize):定时执行器 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } 需要注意的地方: 使用方式如下: ScheduledExecutorService executor=(ScheduledExecutorService)Executors.newScheduledThreadPool(1); executor.schedule(task,i+1 , TimeUnit.SECONDS); 其中:task是实现了Callable接口的任务。schedule的参数含义: public <V> ScheduledFuture<V> schedule(Callable<V> callable,//即将执行的任务 long delay,//任务执行前需要等待的时间 TimeUnit unit)//时间单位
-
Java使用Executor执行Callable任务时的几种方法
2021-04-04 20:43:32多线程在需要返回值时,我们知道需要用到Callable和Future。...下面演示三种使用Executor执行Callable任务的方法。 /** * 测试任务,返回任务的序号 */ public static class TestTask implements Callable<Int多线程在需要返回值时,我们知道需要用到Callable和Future。Callable的cell方法可以返回一个值并且可抛出异常,是对Runnable的很好的补充;Future表示了一个任务的周期,它提供了判断任务状态、获取任务结果和取消任务等方法 。
下面演示三种使用Executor执行Callable任务的方法。/** * 测试任务,返回任务的序号 */ public static class TestTask implements Callable<Integer>{ int index; public TestTask(int index) { this.index = index; } @Override public Integer call() throws Exception { return index; } }
/** * 方法一:手动的保存任务的返回,这样的好处是每个任务对应的结果我们很清楚 */ @Test public void ordinaryTest() throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); List<Future<Integer>> futures = new ArrayList<>(); for(int i = 0; i < 10; i++) { TestTask testTask = new TestTask(i); Future<Integer> future = es.submit(testTask); futures.add(future); } es.shutdown(); for(int i = 0; i < 10; i++) { System.out.println("index:" + i + ",future:"+ futures.get(i).get()); } }
/** * 方法二:使用ExecutorCompletionService * ExecutorCompletionService中使用阻塞队列保存各任务的返回结果,返回是无序的,即谁先执行完成(异常、中断),谁先入队。 * 当我们不关心结果的顺序,或者需要一个任务完成时就取消其他任务的情况下,它是非常的方便的 */ @Test public void completionServiceTest() throws ExecutionException, InterruptedException { ExecutorService es = Executors.newFixedThreadPool(10); CompletionService<Integer> completionService = new ExecutorCompletionService<>(es); for(int i = 0; i < 10; i++) { TestTask testTask = new TestTask(i); completionService.submit(testTask); } es.shutdown(); for(int i = 0; i < 10; i++) { Future<Integer> future = completionService.take(); System.out.println("index:" + i + ",future:"+ future.get()); } }
/** * 方法三:ExecutorService的invokeAll方法 * invokeAll方法入参为一组任务,返回一组Future,这两个集合是有相同结构的, * 即它是按照入参的任务集合中迭代器的顺序将所有的Future添加到返回的集合中,从而任务和Future在它们各自的集合中有着同样的顺序。 * 当我们需要任务和结果的对应关系时,使用invokeAll方法来代替第一种方法 */ @Test public void invokeAllTest() throws InterruptedException, ExecutionException { ExecutorService es = Executors.newFixedThreadPool(10); List<TestTask> tasks = new ArrayList<>(); for (int i = 0; i < 10; i++){ tasks.add(new TestTask(i)); } List<Future<Integer>> futures = es.invokeAll(tasks); es.shutdown(); for (int i = 0; i < futures.size(); i++){ System.out.println("index:" + i + ",future:"+ futures.get(i).get()); } }
下面看一下ExecutorCompletionService的原理:
ExecutorCompletionService是将Executor和BlockingQueue的功能融合在一起,可将Callbale任务提交给它来执行,然后我们就可以像队列一样使用take或poll来得到已经完成的任务结果。下面是源码分析:/** *ExecutorCompletionService包含三个成员变量,最主要的是completionQueue,它的类型阻塞队列 */ private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; /** * 构造方法,需要我们传入一个Executor */ public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } /** * 提交任务的方法,其中的RunnableFuture为一个内部类,继承自FutureTask */ public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } /** * 这是QueueingFuture存在的主要原因,当任务执行完成后,将任务结果装入队列中 */ protected void done() { completionQueue.add(task); } private final Future<V> task; } /** * 从队列中获取返回值 */ public Future<V> take() throws InterruptedException { return completionQueue.take(); }
转载于:https://my.oschina.net/u/2424727/blog/2032393
-
戏(细)说Executor框架线程池任务执行全过程(上)
2021-04-16 14:30:181.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。要执行任务的人只需把Task描述清楚,然后提交即可。这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。具体点讲,提交一个...http://www.infoq.com/cn/articles/executor-framework-thread-pool-task-execution-part-01
一、前言
1.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。要执行任务的人只需把Task描述清楚,然后提交即可。这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。具体点讲,提交一个Callable对象给ExecutorService(如最常用的线程池ThreadPoolExecutor),将得到一个Future对象,调用Future对象的get方法等待执行结果就好了。经过这样的封装,对于使用者来说,提交任务获取结果的过程大大简化,调用者直接从提交的地方就可以等待获取执行结果。而封装最大的效果是使得真正执行任务的线程们变得不为人知。有没有觉得这个场景似曾相识?我们工作中当老大的老大(且称作LD^2)把一个任务交给我们老大(LD)的时候,到底是LD自己干,还是转过身来拉来一帮苦逼的兄弟加班加点干,那LD^2是不管的。LD^2只用把人描述清楚提及给LD,然后喝着咖啡等着收LD的report即可。等LD一封邮件非常优雅地报告LD^2report结果时,实际操作中是码农A和码农B干了一个月,还是码农ABCDE加班干了一个礼拜,大多是不用体现的。这套机制的优点就是LD^2找个合适的LD出来提交任务即可,接口友好有效,不用为具体怎么干费神费力。
二、 一个最简单的例子
看上去这个执行过程是这个样子。调用这段代码的是老大的老大了,他所需要干的所有事情就是找到一个合适的老大(如下面例子中laodaA就荣幸地被选中了),提交任务就好了。// 一个有7个作业线程的线程池,老大的老大找到一个管7个人的小团队的老大 ExecutorService laodaA = Executors.newFixedThreadPool(7); //提交作业给老大,作业内容封装在Callable中,约定好了输出的类型是String。 String outputs = laoda.submit( new Callable<String>() { public String call() throws Exception { return "I am a task, which submited by the so called laoda, and run by those anonymous workers"; } //提交后就等着结果吧,到底是手下7个作业中谁领到任务了,老大是不关心的。 }).get(); System.out.println(outputs);
使用上非常简单,其实只有两行语句来完成所有功能:创建一个线程池,提交任务并等待获取执行结果。
例子中生成线程池采用了工具类Executors的静态方法。除了newFixedThreadPool可以生成固定大小的线程池,newCachedThreadPool可以生成一个无界、可以自动回收的线程池,newSingleThreadScheduledExecutor可以生成一个单个线程的线程池。newScheduledThreadPool还可以生成支持周期任务的线程池。一般用户场景下各种不同设置要求的线程池都可以这样生成,不用自己new一个线程池出来。
三、代码剖析
这套机制怎么用,上面两句语句就做到了,非常方便和友好。但是submit的task是怎么被执行的?是谁执行的?如何做到在调用的时候只有等待执行结束才能get到结果。这些都是1.5之后Executor接口下的线程池、Future接口下的可获得执行结果的的任务,配合AQS和原有的Runnable来做到的。在下文中我们尝试通过剖析每部分的代码来了解Task提交,Task执行,获取Task执行结果等几个主要步骤。为了控制篇幅,突出主要逻辑,文章中引用的代码片段去掉了异常捕获、非主要条件判断、非主要操作。文中只是以最常用的ThreadPoolExecutor线程池举例,其实ExecutorService接口下定义了很多功能丰富的其他类型,有各自的特点,但风格类似。本文重点是介绍任务提交的过程,过程中涉及的ExecutorService、ThreadPoolExecutor、AQS、Future、FutureTask等只会介绍该过程中用到的内容,不会对每个类都详细展开。1、 任务提交
从类图上可以看到,接口ExecutorService继承自Executor。不像Executor中只定义了一个方法来执行任务,在ExecutorService中,正如其名字暗示的一样,定义了一个服务,定义了完整的线程池的行为,可以接受提交任务、执行任务、关闭服务。抽象类AbstractExecutorService类实现了ExecutorService接口,也实现了接口定义的默认行为。
AbstractExecutorService任务提交的submit方法有三个实现。第一个接收一个Runnable的Task,没有执行结果;第二个是两个参数:一个任务,一个执行结果;第三个一个Callable,本身就包含执任务内容和执行结果。 submit方法的返回结果是Future类型,调用该接口定义的get方法即可获得执行结果。 V get() 方法的返回值类型V是在提交任务时就约定好了的。
除了submit任务的方法外,作为对服务的管理,在ExecutorService接口中还定义了服务的关闭方法shutdown和shutdownNow方法,可以平缓或者立即关闭执行服务,实现该方法的子类根据自身特征支持该定义。在ThreadPoolExecutor中,维护了RUNNING、SHUTDOWN、STOP、TERMINATED四种状态来实现对线程池的管理。线程池的完整运行机制不是本文的重点,重点还是关注submit过程中的逻辑。
1) 看AbstractExecutorService中代码提交部分,构造好一个FutureTask对象后,调用execute()方法执行任务。我们知道这个方法是顶级接口Executor中定义的最重要的方法。。FutureTask类型实现了Runnable接口,因此满足Executor中execute()方法的约定。同时比较有意思的是,该对象在execute执行后,就又作为submit方法的返回值返回,因为FutureTask同时又实现了Future接口,满足Future接口的 public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}Submit传入的参数都被封装成了FutureTask类型来execute的,对应前面三个不同的参数类型都会封装成FutureTask。
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}Executor接口中定义的execute方法的作用就是执行提交的任务,该方法在抽象类AbstractExecutorService中没有实现,留到子类中实现。我们观察下子类ThreadPoolExecutor,使用最广泛的线程池如何来execute那些submit的任务的。这个方法看着比较简单,但是线程池什么时候创建新的作业线程来处理任务,什么时候只接收任务不创建作业线程,另外什么时候拒绝任务。线程池的接收任务、维护工作线程的策略都要在其中体现。作为必要的预备知识,先补充下ThreadPoolExecutor有两个最重要的集合属性,分别是存储接收任务的任务队列和用来干活的作业集合。
//任务队列
private final BlockingQueue workQueue;
//作业线程集合
private final HashSet workers = new HashSet();
其中阻塞队列workQueue是来存储待执行的任务的,在构造线程池时可以选择满足该BlockingQueue 接口定义的SynchronousQueue、LinkedBlockingQueue或者DelayedWorkQueue等不同阻塞队列来实现不同特征的线程池。关注下execute(Runnable command)方法中调用到的addIfUnderCorePoolSize,workQueue.offer(command) , ensureQueuedTaskHandled(command),addIfUnderMaximumPoolSize(command)这几个操作。尤其几个名字较长的private方法,把方法名的驼峰式的单词分开,加上对方法上下文的了解就能理解其功能。
因为前面说到的几个方法在里面即是操作,又返回一个布尔值,影响后面的逻辑,所以不大方便在方法体中为每条语句加注释来说明,需要大致关联起来看。所以首先需要把execute方法的主要逻辑说明下,再看其中各自方法的作用。
如果线程池的状态是RUNNING,线程池的大小小于配置的核心线程数,说明还可以创建新线程,则启动新的线程执行这个任务。
如果线程池的状态是RUNNING ,线程池的大小小于配置的最大线程数,并且任务队列已经满了,说明现有线程已经不能支持当前的任务了,并且线程池还有继续扩充的空间,就可以创建一个新的线程来处理提交的任务。
如果线程池的状态是RUNNING,当前线程池的大小大于等于配置的核心线程数,说明根据配置当前的线程数已经够用,不用创建新线程,只需把任务加入任务队列即可。如果任务队列不满,则提交的任务在任务队列中等待处理;如果任务队列满了则需要考虑是否要扩展线程池的容量。
当线程池已经关闭或者上面的条件都不能满足时,则进行拒绝策略,拒绝策略在RejectedExecutionHandler接口中定义,可以有多种不同的实现。
上面其实也是对最主要思路的解析,详细展开可能还会更复杂。简单梳理下思路:构建线程池时定义了一个额定大小,当线程池内工作线程数小于额定大小,有新任务进来就创建新工作线程,如果超过该阈值,则一般就不创建了,只是把接收任务加到任务队列里面。但是如果任务队列里的任务实在太多了,那还是要申请额外的工作线程来帮忙。如果还是不够用就拒绝服务。这个场景其实也是每天我们工作中会碰到的场景。我们管人的老大,手里都有一定HC(Head Count),当上面老大有活分下来,手里人不够,但是不超过HC,我们就自己招人;如果超过了还是忙不过来,那就向上门老大申请借调人手来帮忙;如果还是干不完,那就没办法了,新任务咱就不public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
4) addIfUnderCorePoolSize方法检查如果当前线程池的大小小于配置的核心线程数,说明还可以创建新线程,则启动新的线程执行这个任务。private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
//如果当前线程池的大小小于配置的核心线程数,说明还可以创建新线程
if (poolSize < corePoolSize && runState == RUNNING)
// 则启动新的线程执行这个任务
t = addThread(firstTask);
return t != null;
} 和上一个方法类似,addIfUnderMaximumPoolSize检查如果线程池的大小小于配置的最大线程数,并且任务队列已经满了(就是execute方法试图把当前线程加入任务队列时不成功),说明现有线程已经不能支持当前的任务了,但线程池还有继续扩充的空间,就可以创建一个新的线程来处理提交的任务。private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
// 如果线程池的大小小于配置的最大线程数,并且任务队列已经满了(就
是execute方法中试图把当前线程加入任务队列workQueue.offer(command)时候不成功
),说明现有线程已经不能支持当前的任务了,但线程池还有继续扩充的空间
if (poolSize < maximumPoolSize && runState == RUNNING)
//就可以创建一个新的线程来处理提交的任务
t = addThread(firstTask);
return t != null;
} 在ensureQueuedTaskHandled方法中,判断如果当前状态不是RUNING,则当前任务不加入到任务队列中,判断如果状态是停止,线程数小于允许的最大数,且任务队列还不空,则加入一个新的工作线程到线程池来帮助处理还未处理完的任务。private void ensureQueuedTaskHandled(Runnable command) { // 如果当前状态不是RUNING,则当前任务不加入到任务队列中,判断如 果状态是停止,线程数小于允许的最大数,且任务队列还不空 if (state < STOP && poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) //则加入一个新的工作线程到线程池来帮助处理还未处理完的任务 t = addThread(null); if (reject) reject(command); }
7) 在前面方法中都会调用adThread方法创建一个工作线程,差别是创建的有些工作线程上面关联接收到的任务firstTask,有些没有。该方法为当前接收到的任务firstTask创建Worker,并将Worker添加到作业集合HashSet workers中,并启动作业。
private Thread addThread(Runnable firstTask) { //为当前接收到的任务firstTask创建Worker Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); w.thread = t; //将Worker添加到作业集合HashSet<Worker> workers中,并启动作业 workers.add(w); t.start(); return t; }
至此,任务提交过程简单描述完毕,并介绍了任务提交后ExecutorService框架下线程池的主要应对逻辑,其实就是接收任务,根据需要创建或者维护管理线程。
维护这些工作线程干什么用?先不用看后面的代码,想想我们老大每月辛苦地把老板丰厚的薪水递到我们手里,定期还要领着大家出去happy下,又是定期的关心下个人生活,所有做的这些都是为什么呢?木讷的代码工不往这边使劲动脑子,但是猜还是能猜的到的,就让干活呗。本文想着重表达细节,诸如线程池里的Worker是怎么工作的,Task到底是不是在这些工作线程中执行的,如何保证执行完成后,外面等待任务的老大拿到想要结果,我们将在下篇文章中详细介绍
戏(细)说Executor框架线程池任务执行全过程(下)
http://www.infoq.com/cn/articles/executor-framework-thread-pool-task-execution-part-02?utm_source=infoq&utm_campaign=user_page&utm_medium=link -
异步任务执行器Executor简介
2020-03-12 00:27:24基本接口1)Executor接口2)Future接口2.用法示例3.基本原理1)ExecutorService实现2)Future实现 以前线程Thread既表示执行的任务,又表示执行的机制。在JDK1.5中,java并发框架提供了一种“执行服务”的相关API,它将... -
Executor 线程池框架详解
2021-10-06 09:31:57线程用于执行异步任务,单个的线程既是工作单元也是执行机制,从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,他是一个用于统一创建与运行的接口。Executor框架实现的就是线程池的功能。 ... -
spark提交命令 spark-submit 的参数 executor-memory、executor-cores、num-executors、spark.default....
2019-07-19 15:39:28Impala 操作/读写 Kudu,使用druid连接池 Kudu 原理、API使用、代码 Kudu Java API 条件查询 spark读取kudu表导出数据为parquet文件(spark kudu parquet) kudu 导入/导出 数据 Kudu 分页查询的两种方式 map... -
Executor框架及线程池总结
2022-04-07 19:23:10概述Executor作为一个灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程进行了解耦开发,基于生产者和消费者模型,还提供了对生命周期的支持,以及统计... -
spark源码分析之Executor启动与任务提交篇
2017-04-30 18:55:15接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit通过启动脚本的方式启动它的主类,这里以WordCount为例子spark-... -
关于ThreadPoolExecutor调用了submit之后发生了什么
2022-02-06 18:03:08关于ThreadPoolExecutor调用了submit之后发生了什么 文章目录关于...同时您需要了解一点可重入锁ReentrantLock的使用方法,了解线程池的生命周期和线程的生命周期 前置的杂乱知识点 Executors的基本特点 -
如何使用Java 5 Executor框架创建线程池
2020-05-12 09:55:39Java 5以Executor框架的形式在Java中引入了线程池,它允许Java程序员将任务提交与任务执行分离。 如果要使用Java进行服务器端编程,则线程池是维护系统可伸缩性,鲁棒性和稳定性的重要概念。 对于那些不熟悉Java中的... -
Spark应用运行架构(Driver、Executor)和提交应用(spark-submit)
2019-05-15 16:42:10这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的Java 进程运行。驱动器节点和所有的执行器节点一起... -
Java线程池_Executor与ExecutorService原理分析
2021-03-06 19:02:30在java中我们想在新线程中执行一个任务很简单,有以下两种方式:// 1. 将任务放在Runnable的run方法中Thread thread = new Thread(new Runnable() {@Overridepublic void run() {// 执行任务}});thread.start();// 2.... -
Executor任务执行框架
2017-02-24 10:56:43官方文档中Executor是一个将任务Runnable的提交与执行进行分离的对象。Executor接口中唯一入口是execute方法,类似于Runnable的run方法。在实际的应用系统中,Runnable作为Thread的传入参数(或者运行对象),由... -
Java Executor并发框架(六)Executor框架线程池任务执行全过程(上)
2016-10-08 20:14:53基于Executor接口中将任务提交和任务执行解耦的设计,ExecutorService和其各种功能强大的实现类提供了非常简便方式来提交任务并获取任务执行结果,封装了任务执行的全部过程。本文尝试通过对该部分源码的解析以... -
[spark] 从spark-submit开始解析整个任务调度流程
2017-11-01 21:59:35spark应用程序可以以Client模式和Cluster启动,区别在于Client模式下的Driver是在执行spark-submit命令节点上启动的,而Cluster模式下是Master随机选择的一台Worker通过DriverWrapper来启动Driver的。 -
说Executor框架线程池任务执行全过程(1)
2015-06-28 18:53:001.5后引入的Executor框架的最大优点是把任务的提交和执行解耦。要执行任务的人只需把Task描述清楚,然后提交即可。这个Task是怎么被执行的,被谁执行的,什么时候执行的,提交的人就不用关心了。具体点讲,提交一个... -
Executor、ExecutorService、Executors、ThreadPoolExecutor、Future、Runnable、Callable
2021-08-24 10:26:09通常将为任务显示地创建线程替换为使用Executor来执行任务。比如,当有多个线程任务时,为每个任务创建一个线程并启动new Thread(new Runnable(){}).start()的代码,可以替换为: Executor executor = anExecutor; ... -
Java高并发编程中Executor、ExecutorService的使用及详细介绍-刘宇
2020-08-12 23:59:21一、什么是Executor框架二、Executor框架示意图三、ExecutorService的方法详解1、submit方法2、invokeAny方法3、invokeAll方法4、awaitTermination方法5、isShutdown方法6、shutdown方法7、shutdownNow方法8、is... -
JAVA并发(一)任务执行框架Executor
2019-12-13 15:44:25Executor框架是JAVA并发包中的重要框架,负责线程池的创建与任务的执行,本文将从Executor框架出发,逐步介绍任务的分类,以及线程池的创建,包含了BlockingQueue、任务拒绝策略、线程池大小、线程池选择等,最后,... -
Spark 查看某个正在执行的或已结束的任务中executor与driver日志
2019-07-26 15:28:44任务正在运行 如果运行在 YARN 模式,可以在 ResourceManager 节点的 WEB UI 页面根据 任务状态、用户名 或者 applicationId Search 到应用。 点击表格中 Tracking UI 列的History 链接; 点击相关的 ApplicationId... -
executor线程池框架_如何使用Java 5 Executor框架创建线程池
2020-06-10 01:55:52Java 5以Executor框架的形式在Java中引入了线程池,它允许Java程序员将任务提交与任务执行分离。 如果要使用Java进行服务器端编程,则线程池是维护系统可伸缩性,鲁棒性和稳定性的重要概念。 对于那些不熟悉Java中...