-
多线程并发执行任务
2019-08-11 01:09:38NULL 博文链接:https://mammahao.iteye.com/blog/2226890 -
springboot2.0 多线程并发执行任务
2020-01-14 11:22:29springboot2.0版本 执行多线程方式,个人学习了两种,一种直接是继承父类Thread或实现Runnable 接口类,,重写run方法;第二种是通过springboot的支持注解@Async的方式。 第一种:自定义类继承Thread类或继承...springboot2.0版本 执行多线程方式,个人学习了两种,一种直接是继承父类Thread或实现Runnable 接口类,,重写run方法;第二种是通过springboot的支持注解@Async的方式。
第一种:自定义类继承Thread类或继承Runnnable接口,重写run方法
import com.xxx.xx.taskphone.model.PhoneCallin; import com.xxx.xxx.taskphone.service.PhoneCallinService; public class PhoneCallInThread implements Runnable{ private PhoneCallinService phoneCallinService; public PhoneCallInThread(PhoneCallinService phoneCallinService) { super(); this.phoneCallinService = phoneCallinService; } @Override public void run() { System.out.println("-------------------"); //phoneCallinService.test();自定义逻辑业务 } }
开启线程
@Component public class TaskScheduling { @Autowired private PhoneCallinService phoneCallinService; @Scheduled(fixedRate = 2000) public void savePhoneCallin(){ long size = 5; if(size>0){ new Thread(new PhoneCallInThread(phoneCallinService)).start(); } } }
注:因为线程处理是执行逻辑的,所以spring无法通过注解的形式来说完成代理的类对象(如此处的phoneCallInService);但是可以通过获取ApplicationSpringContext上下文来获取代理对象。
我这里用的定时任务@scheduled(fixedRate = 2000) 每隔2秒钟执行一次。
第二种:springboot2.0使用@Async进行异步调用多线程的配置类
package com.xxx.perform.config; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration public class TaskPoolConfig { @Bean("taskExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10);//核心线程数目 executor.setMaxPoolSize(20);//指定最大线程数 executor.setQueueCapacity(200);//队列中最大的数目 executor.setKeepAliveSeconds(60);//线程空闲后的最大存活时间 executor.setThreadNamePrefix("taskExecutor-");//线程名称前缀 executor.setWaitForTasksToCompleteOnShutdown(true);//设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,确保 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。 executor.setAwaitTerminationSeconds(60);//设置线程池中 任务的等待时间 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
启动类上开启线程支持
@EnableAsync public class PerformAdminApplication { public static void main(String[] args) { SpringApplication.run(AdminApplication.class, args); } }
使用@Async注解表明当前类或当前方是一个线程类,并支持多线程处理
@Scheduled(fixedRate = 2000) @Async public void test(){ System.out.println("---------"); }
需要注意的事,如果多次调用被@Async注解的方法,会每次都会生成一个线程来处理方法逻辑。这样就需要及时处理逻辑后的结果。需要提供方法的返回处理逻辑。
多线程的异步回调
在被注明是线程处理的方法提供回调信息类
@Async public Future<String> doTaskCallback() throws Exception { super.doTaskTwo(); return new AsyncResult<>("任务二完成"); }
然后直接正常获取返回值就可以了
List<String> result = new Test().doTaskCallback();
最后个人建议在执行下一次的调用之前先判断当前线程是否已经完成,result.isDone()。如果线程未结束,可以Thead.sleep(1000),休眠一定时间后重新执行。
-
springboot2 多线程写入数据_springboot2.0 多线程并发执行任务
2020-12-20 21:35:31springboot2.0版本 执行多线程方式,个人学习了两种,一种直接是继承父类Thread或实现Runnable 接口类,,重写run方法;[emailprotected]第一种:自定义类继承Thread类或继承Runnnable接口,重写run方法import ...springboot2.0版本 执行多线程方式,个人学习了两种,一种直接是继承父类Thread或实现Runnable 接口类,,重写run方法;[email protected]
第一种:自定义类继承Thread类或继承Runnnable接口,重写run方法
import com.xxx.xx.taskphone.model.PhoneCallin;
import com.xxx.xxx.taskphone.service.PhoneCallinService;
public class PhoneCallInThread implements Runnable{
private PhoneCallinService phoneCallinService;
public PhoneCallInThread(PhoneCallinService phoneCallinService) {
super();
this.phoneCallinService = phoneCallinService;
}
@Override
public void run() {
System.out.println("-------------------");
//phoneCallinService.test();自定义逻辑业务
}
}
开启线程
@Component
public class TaskScheduling {
@Autowired
private PhoneCallinService phoneCallinService;
@Scheduled(fixedRate = 2000)
public void savePhoneCallin(){
long size = 5;
if(size>0){
new Thread(new PhoneCallInThread(phoneCallinService)).start();
}
}
}
注:因为线程处理是执行逻辑的,所以spring无法通过注解的形式来说完成代理的类对象(如此处的phoneCallInService);但是可以通过获取ApplicationSpringContext上下文来获取代理对象。
多线程的配置类
package com.xxx.perform.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class TaskPoolConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);//核心线程数目
executor.setMaxPoolSize(20);//指定最大线程数
executor.setQueueCapacity(200);//队列中最大的数目
executor.setKeepAliveSeconds(60);//线程空闲后的最大存活时间
executor.setThreadNamePrefix("taskExecutor-");//线程名称前缀
executor.setWaitForTasksToCompleteOnShutdown(true);//设置 线程池关闭 的时候 等待 所有任务都完成后,再继续 销毁 其他的 Bean,确保 异步任务 的 销毁 就会先于 数据库连接池对象 的销毁。
executor.setAwaitTerminationSeconds(60);//设置线程池中 任务的等待时间
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
启动类上开启线程支持
@EnableAsync
public class PerformAdminApplication {
public static void main(String[] args) {
SpringApplication.run(AdminApplication.class, args);
}
}
@Scheduled(fixedRate = 2000)
@Async
public void test(){
System.out.println("---------");
}
多线程的异步回调
在被注明是线程处理的方法提供回调信息类
@Async
public FuturedoTaskCallback() throws Exception {
super.doTaskTwo();
return new AsyncResult<>("任务二完成");
}
然后直接正常获取返回值就可以了
Listresult = new Test().doTaskCallback();
最后个人建议在执行下一次的调用之前先判断当前线程是否已经完成,result.isDone()。如果线程未结束,可以Thead.sleep(1000),休眠一定时间后重新执行。
-
【转】多线程并发执行任务,取结果归集。终极总结:Future、FutureTask、CompletionService、...
2019-02-21 19:16:00【转】多线程并发执行任务,取结果归集。终极总结:Future、FutureTask、CompletionService、CompletableFuture 见 https://www.cnblogs.com/dennyzhangdd/p/7010972.html ... -
多线程并发执行任务,取结果归集
2019-07-16 13:03:31作者:只会一点java ... 建议:此种方法可实现基本目标,任务并行且按照完成顺序获取结果。使用很普遍,老少皆宜,就是CPU有消耗,可以使用! 2.FutureTask 原理: demo: 建议:demo1在特...转载自: https://www.cnblogs.com/dennyzhangdd/p/7010972.html
作者:只会一点java
目录
- 1.Futrue
- 2.FutureTask
- 原理:
- demo:
- 建议:demo1在特定场合例如有十分耗时的业务但有依赖于其他业务不一定非要执行的,可以尝试使用。如统计总公司+分公司利润是否达标100万,达标则不再继续统计 demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使用。
- 3.CompletionService:
- 4.CompletableFuture
- 5.总结:
正文
开启线程执行任务,不管是使用Runnable(无返回值不支持上报异常)还是Callable(有返回值支持上报异常)接口,都可以轻松实现。那么如果是开启线程池并需要获取结果归集的情况下,如何实现,以及优劣,老司机直接看总结即可。
任务执行完,结果归集时,几种方式:
1.Futrue
原理:
如下图,Future接口封装了取消,获取线程结果,以及状态判断是否取消,是否完成这几个方法,都很有用。
demo:
使用线程池提交Callable接口任务,返回Future接口,添加进list,最后遍历FutureList且内部使用while轮询,并发获取结果
package com.hezm.thread.day1.pool; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author denny.zhang * @ClassName: FutureDemo * @Description: Future多线程并发任务结果归集 * @date 2016年11月4日 下午1:50:32 */ public class FutureDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //开启多线程 ExecutorService exs = Executors.newFixedThreadPool(10); try { //结果集 List<Integer> list = new ArrayList<Integer>(); List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); //1.高速提交10个任务,每个任务返回一个Future入list for (int i = 0; i < 10; i++) { futureList.add(exs.submit(new CallableTask(i + 1))); } Long getResultStart = System.currentTimeMillis(); System.out.println("结果归集开始时间=" + new Date()); //顺序获取结果集【实际操作中执行效率较高】 Iterator<Future<Integer>> iterable = futureList.iterator(); while(iterable.hasNext()){ Future<Integer> future = iterable.next(); Integer i = future.get(); System.out.println("任务i=" + i + "获取完成!" + new Date()); list.add(i); } // //2.按照任务执行完成先后顺序结果归集, // //用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除 // //【实际操作中造成长时间等待任务执行】 // while(futureList.size()>0){ // Iterator<Future<Integer>> iterable = futureList.iterator(); // //遍历一遍 // while(iterable.hasNext()){ // Future<Integer> future = iterable.next(); // //如果任务完成取结果,否则判断下一个任务是否完成 // if (future.isDone() && !future.isCancelled()){ // //获取结果 // Integer i = future.get(); // System.out.println("任务i=" + i + "获取完成,移出任务队列!" + new Date()); // list.add(i); // //任务完成移除任务 // iterable.remove(); // }else{ // Thread.sleep(1);//避免CPU高速运转,这里休息1毫秒,CPU纳秒级别 // } // } // } System.out.println("list=" + list); System.out.println("总耗时=" + (System.currentTimeMillis() - start) + ",取结果归集耗时=" + (System.currentTimeMillis() - getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } static class CallableTask implements Callable<Integer> { Integer i; public CallableTask(Integer i) { super(); this.i = i; } @Override public Integer call() throws Exception { if (i == 1) { Thread.sleep(10000);//任务1耗时3秒 } else if (i == 5) { Thread.sleep(5000);//任务5耗时5秒 } else { Thread.sleep(1000);//其它任务耗时1秒 } System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!"+ new Date()); return i; } } }
如上图,开启定长为10的线程池:ExecutorService exs = Executors.newFixedThreadPool(10);+任务1耗时3秒,任务5耗时5秒,其他1秒。控制台打印如下:
结果归集开始时间=Fri Jun 01 09:59:33 CST 2018----起始33秒 task线程:pool-1-thread-3任务i=3,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-2任务i=2,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-4任务i=4,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-6任务i=6,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-7任务i=7,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-8任务i=8,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-9任务i=9,完成!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-10任务i=10,完成!Fri Jun 01 09:59:34 CST 2018 任务i=2获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018---一般任务耗时1秒,33+1=34,验证通过! 任务i=3获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 任务i=4获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 任务i=6获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 任务i=7获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 任务i=8获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 任务i=9获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 任务i=10获取完成,移出任务队列!Fri Jun 01 09:59:34 CST 2018 task线程:pool-1-thread-1任务i=1,完成!Fri Jun 01 09:59:36 CST 2018 任务i=1获取完成,移出任务队列!Fri Jun 01 09:59:36 CST 2018---任务1 耗时3秒 33+3=36,验证通过! task线程:pool-1-thread-5任务i=5,完成!Fri Jun 01 09:59:38 CST 2018 任务i=5获取完成,移出任务队列!Fri Jun 01 09:59:38 CST 2018---任务5 耗时5秒 33+5=38,验证通过! list=[2, 3, 4, 6, 7, 8, 9, 10, 1, 5]--》多执行几遍,最后2个总是1,5最后加进去的,可实现按照任务完成先手顺序获取结果! 总耗时=5012,取结果归集耗时=5002---》符合逻辑,10个任务,定长10线程池,其中一个任务耗时3秒,一个任务耗时5秒,由于并发高速轮训,耗时取最长5秒
建议:此种方法可实现基本目标,任务并行且按照完成顺序获取结果。使用很普遍,老少皆宜,就是CPU有消耗,可以使用!
2.FutureTask
原理:
是接口RunnableFuture的唯一实现类。类图如下(网上截取来的。。。我的eclipse类图插件还没装好):
如上图,可见RunnableFuture接口继承自Future<V>+Runnable:
1.Runnable接口,可开启单个线程执行。
2.Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞获取结果。
FutureTask的构造方法有两种,其实最终都是赋值callable。如下图:
demo:
demo1:两个步骤:1.开启单个线程执行任务,2.阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑。
/** * * @ClassName:FutureTaskDemo * @Description:FutureTask弥补了Future必须用线程池提交返回Future的缺陷,实现功能如下: * 1.Runnable接口,可开启线程执行。 * 2.Future<v>接口,可接受Callable接口的返回值,futureTask.get()阻塞获取结果。 * 这两个步骤:一个开启线程执行任务,一个阻塞等待执行结果,分离这两步骤,可在这两步中间穿插别的相关业务逻辑。 * @author diandian.zhang * @date 2017年6月16日上午10:36:05 */ public class FutureTaskContorlDemo { public static void main(String[] args) { try { System.out.println("=====例如一个统计公司总部和分部的总利润是否达标100万=========="); //利润 Integer count = 0; //1.定义一个futureTask,假设去远程http获取各个分公司业绩. FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask()); Thread futureTaskThread = new Thread(futureTask); futureTaskThread.start(); System.out.println("futureTaskThread start!"+new Date()); //2.主线程先做点别的事 System.out.println("主线程查询总部公司利润开始时间:"+new Date()); Thread.sleep(5000); count+=10;//北京集团总部利润。 System.out.println("主线程查询总部公司利润结果时间:"+new Date()); //总部已达标100万利润,就不再继续执行获取分公司业绩任务了 if(count>=100){ System.out.println("总部公司利润达标,取消futureTask!"+new Date()); futureTask.cancel(true);//不需要再去获取结果,那么直接取消即可 }else{ System.out.println("总部公司利润未达标,进入阻塞查询分公司利润!"+new Date()); //3总部未达标.阻塞获取,各个分公司结果 Integer i = futureTask.get();//真正执行CallableTask System.out.println("i="+i+"获取到结果!"+new Date()+new Date()); } } catch (Exception e) { e.printStackTrace(); } } /** * * @ClassName:CallableTask * @Description:一个十分耗时的任务 * @author diandian.zhang * @date 2017年6月16日上午10:39:04 */ static class CallableTask implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("CallableTask-call,查询分公司利润,执行开始!"+new Date()); Thread.sleep(10000); System.out.println("CallableTask-call,查询分公司利润,执行完毕!"+new Date()); return 10; } }
执行结果如下:
=====例如一个统计公司总部和分部的总利润是否达标100万========== futureTaskThread start!Fri Jun 16 11:14:54 CST 2017----》futureTaskThread 已开始运行 CallableTask-call,查询分公司利润,执行开始!Fri Jun 16 11:14:54 CST 2017 主线程查询总部公司利润开始时间:Fri Jun 16 11:14:54 CST 2017------》主线程耗时5秒 主线程查询总部公司利润结果时间:Fri Jun 16 11:14:59 CST 2017 总部公司利润未达标,进入阻塞查询分公司利润!Fri Jun 16 11:14:59 CST 2017 CallableTask-call,查询分公司利润,执行完毕!Fri Jun 16 11:15:04 CST 2017----》futureTaskThread 执行完毕,耗时10秒 i=10获取到结果!Fri Jun 16 11:15:04 CST 2017Fri Jun 16 11:15:04 CST 2017
如上图,分离之后,futureTaskThread耗时10秒期间,主线程还穿插的执行了耗时5秒的操作,大大减小总耗时。且可根据业务逻辑实时判断是否需要继续执行futureTask。
demo2:当然FutureTask一样可以并发执行任务并获取结果,如下:
package thread; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; /** * * @ClassName:FutureTaskDemo * @Description:FutureTask实现多线程并发执行任务并取结果归集 * @author diandian.zhang * @date 2017年6月16日上午10:36:05 */ public class FutureTaskDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //开启多线程 ExecutorService exs = Executors.newFixedThreadPool(10); try { //结果集 List<Integer> list = new ArrayList<Integer>(); List<FutureTask<Integer>> futureList = new ArrayList<FutureTask<Integer>>(); //启动线程池,10个任务固定线程数为5 for(int i=0;i<10;i++){ FutureTask<Integer> futureTask = new FutureTask<Integer>(new CallableTask(i+1)); //提交任务,添加返回,Runnable特性 exs.submit(futureTask); //Future特性 futureList.add(futureTask); } Long getResultStart = System.currentTimeMillis(); System.out.println("结果归集开始时间="+new Date()); //结果归集 while(futureList.size()>0){ Iterator<FutureTask<Integer>> iterable = futureList.iterator(); //遍历一遍 while(iterable.hasNext()){ Future<Integer> future = iterable.next(); if (future.isDone()&& !future.isCancelled()) { //Future特性 Integer i = future.get(); System.out.println("任务i=" + i + "获取完成,移出任务队列!" + new Date()); list.add(i); //任务完成移除任务 iterable.remove(); }else { //避免CPU高速轮循,可以休息一下。 Thread.sleep(1); } } } System.out.println("list="+list); System.out.println("总耗时="+(System.currentTimeMillis()-start)+",取结果归集耗时="+(System.currentTimeMillis()-getResultStart)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } }
CallableTask:
1 package thread; 2 3 import java.util.Date; 4 import java.util.concurrent.Callable; 5 6 /** 7 * @Description 回调方法 8 * @author denny 9 * @date 2018/8/17 下午3:16 10 */ 11 public class CallableTask implements Callable<Integer> { 12 Integer i; 13 14 public CallableTask(Integer i) { 15 super(); 16 this.i = i; 17 } 18 19 @Override 20 public Integer call() throws Exception { 21 if (i == 1) { 22 Thread.sleep(3000);//任务1耗时3秒 23 } else if (i == 5) { 24 Thread.sleep(5000);//任务5耗时5秒 25 } else { 26 Thread.sleep(1000);//其它任务耗时1秒 27 } 28 System.out.println("task线程:" + Thread.currentThread().getName() + "任务i=" + i + ",完成!"+ new Date()); 29 return i; 30 } 31 }
结果:
结果归集开始时间=Fri Aug 17 15:51:54 CST 2018 task线程:pool-1-thread-2任务i=2,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-3任务i=3,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-4任务i=4,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-6任务i=6,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-7任务i=7,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-8任务i=8,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-10任务i=10,完成!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-9任务i=9,完成!Fri Aug 17 15:51:55 CST 2018 任务i=8获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=9获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=10获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=2获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=3获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=4获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=6获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 任务i=7获取完成,移出任务队列!Fri Aug 17 15:51:55 CST 2018 task线程:pool-1-thread-1任务i=1,完成!Fri Aug 17 15:51:57 CST 2018 任务i=1获取完成,移出任务队列!Fri Aug 17 15:51:57 CST 2018 task线程:pool-1-thread-5任务i=5,完成!Fri Aug 17 15:51:59 CST 2018 任务i=5获取完成,移出任务队列!Fri Aug 17 15:51:59 CST 2018 list=[8, 9, 10, 2, 3, 4, 6, 7, 1, 5] 总耗时=5014,取结果归集耗时=5007
建议:demo1在特定场合例如有十分耗时的业务但有依赖于其他业务不一定非要执行的,可以尝试使用。demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使用。
3.CompletionService:
原理:内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。
demo:
package thread.future; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * * @ClassName: CompletionServiceDemo * @Description: CompletionService多线程并发任务结果归集 * @author denny.zhang * @date 2016年11月4日 下午1:50:32 * */ public class CompletionServiceDemo{ public static void main(String[] args) { Long start = System.currentTimeMillis(); //开启3个线程 ExecutorService exs = Executors.newFixedThreadPool(5); try { int taskCount = 10; //结果集 List<Integer> list = new ArrayList<Integer>(); //1.定义CompletionService CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(exs); List<Future<Integer>> futureList = new ArrayList<Future<Integer>>(); //2.添加任务 for(int i=0;i<taskCount;i++){ futureList.add(completionService.submit(new Task(i+1))); } //==================结果归集=================== //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果 // for (Future<Integer> future : futureList) { // System.out.println("===================="); // Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照 // System.out.println("任务result="+result+"获取到结果!"+new Date()); // list.add(result); // } // //方法2.使用内部阻塞队列的take() for(int i=0;i<taskCount;i++){ Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到 System.out.println("任务i=="+result+"完成!"+new Date()); list.add(result); } System.out.println("list="+list); System.out.println("总耗时="+(System.currentTimeMillis()-start)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown();//关闭线程池 } } static class Task implements Callable<Integer>{ Integer i; public Task(Integer i) { super(); this.i=i; } @Override public Integer call() throws Exception { if(i==5){ Thread.sleep(5000); }else{ Thread.sleep(1000); } System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!"); return i; } } }
打印结果如下:
线程:pool-1-thread-3任务i=3,执行完成! 线程:pool-1-thread-1任务i=1,执行完成! 线程:pool-1-thread-4任务i=4,执行完成! 线程:pool-1-thread-2任务i=2,执行完成! 任务i==3完成!Fri Jun 16 11:39:17 CST 2017 任务i==1完成!Fri Jun 16 11:39:17 CST 2017 任务i==4完成!Fri Jun 16 11:39:17 CST 2017 任务i==2完成!Fri Jun 16 11:39:17 CST 2017 线程:pool-1-thread-4任务i=8,执行完成! 线程:pool-1-thread-3任务i=7,执行完成! 线程:pool-1-thread-1任务i=6,执行完成! 线程:pool-1-thread-2任务i=9,执行完成! 任务i==8完成!Fri Jun 16 11:39:18 CST 2017 任务i==7完成!Fri Jun 16 11:39:18 CST 2017 任务i==6完成!Fri Jun 16 11:39:18 CST 2017 任务i==9完成!Fri Jun 16 11:39:18 CST 2017 线程:pool-1-thread-3任务i=10,执行完成! 任务i==10完成!Fri Jun 16 11:39:19 CST 2017 线程:pool-1-thread-5任务i=5,执行完成! 任务i==5完成!Fri Jun 16 11:39:21 CST 2017 list=[3, 1, 4, 2, 8, 7, 6, 9, 10, 5]---》这里证实了确实按照执行完成顺序排序 总耗时=5004---》符合逻辑,10个任务,定长5线程池执行,取最长时间。
建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。4.CompletableFuture
原理:
4.1.从注释看:
JDK1.8才新加入的一个实现类,实现了Future<T>, CompletionStage<T>2个接口,JDK注释如下图:
译文(没兴趣的可以跳过):
当一个Future可能需要显示地完成时,使用CompletionStage接口去支持完成时触发的函数和操作。当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。
CompletableFuture实现了CompletionStage接口的如下策略:
1.为了完成当前的CompletableFuture接口或者其他完成方法的回调函数的线程,提供了非异步的完成操作。
2.没有显式入参Executor的所有async方法都使用
ForkJoinPool.commonPool()
为了简化监视、调试和跟踪,所有生成的异步任务都是标记接口AsynchronousCompletionTask
的实例。3.所有的CompletionStage方法都是独立于其他共有方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖。
CompletableFuture实现了Futurre接口的如下策略:
1.CompletableFuture无法直接控制完成,所以cancel操作被视为是另一种异常完成形式。方法
isCompletedExceptionally
可以用来确定一个CompletableFuture是否以任何异常的方式完成。2.以一个CompletionException为例,方法get()和get(long,TimeUnit)抛出一个ExecutionException,对应CompletionException。为了在大多数上下文中简化用法,这个类还定义了方法join()和getNow,而不是直接在这些情况中直接抛出CompletionException。
4.2.CompletionStage接口实现流式编程:
JDK8新增接口,此接口包含38个方法...是的,你没看错,就是38个方法。这些方法主要是为了支持函数式编程中流式处理。
4.3.CompletableFuture中4个异步执行任务静态方法:
如上图,其中supplyAsync用于有返回值的任务,runAsync则用于没有返回值的任务。Executor参数可以手动指定线程池,否则默认
ForkJoinPool.commonPool()系统级公共线程池,注意:这些线程都是Daemon线程,主线程结束Daemon线程不结束,只有JVM关闭时,生命周期终止
。4.4.组合CompletableFuture:
thenCombine(): 先完成当前CompletionStage和other 2个CompletionStage任务,然后把结果传参给BiFunction进行结果合并操作。
thenCompose():第一个CompletableFuture执行完毕后,传递给下一个CompletionStage作为入参进行操作。
demo:
JDK CompletableFuture 自带多任务组合方法allOf和anyOf
allOf是等待所有任务完成,构造后CompletableFuture完成
anyOf是只要有一个任务完成,构造后CompletableFuture就完成
方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
方式二:全流式处理转换成CompletableFuture[]+allOf组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取。---》推荐
package thread; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; import com.google.common.collect.Lists; /** * * @ClassName:CompletableFutureDemo * @Description:多线程并发任务,取结果归集 * @author diandian.zhang * @date 2017年6月14日下午12:44:01 */ public class CompletableFutureDemo { public static void main(String[] args) { Long start = System.currentTimeMillis(); //结果集 List<String> list = new ArrayList<String>(); List<String> list2 = new ArrayList<String>(); //定长10线程池 ExecutorService exs = Executors.newFixedThreadPool(10); List<CompletableFuture<String>> futureList = new ArrayList<>(); final List<Integer> taskList = Lists.newArrayList(2,1,3,4,5,6,7,8,9,10); try { 方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取 //for(int i=0;i<taskList.size();i++){ // final int j=i; // //异步执行 // CompletableFuture<String> future = CompletableFuture.supplyAsync(()->calc(taskList.get(j)), exs) // //Integer转换字符串 thenAccept只接受不返回不影响结果 // .thenApply(e->Integer.toString(e)) // //如需获取任务完成先后顺序,此处代码即可 // .whenComplete((v, e) -> { // System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date()); // list2.add(v); // }) // ; // futureList.add(future); //} 流式获取结果:此处是根据任务添加顺序获取的结果 //list = sequence(futureList).get(); //方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取 CompletableFuture[] cfs = taskList.stream().map(object-> CompletableFuture.supplyAsync(()->calc(object), exs) .thenApply(h->Integer.toString(h)) //如需获取任务完成先后顺序,此处代码即可 .whenComplete((v, e) -> { System.out.println("任务"+v+"完成!result="+v+",异常 e="+e+","+new Date()); list2.add(v); })).toArray(CompletableFuture[]::new); //等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取 CompletableFuture.allOf(cfs).join(); System.out.println("任务完成先后顺序,结果list2="+list2+";任务提交顺序,结果list="+list+",耗时="+(System.currentTimeMillis()-start)); } catch (Exception e) { e.printStackTrace(); }finally { exs.shutdown(); } } public static Integer calc(Integer i){ try { if(i==1){ //任务1耗时3秒 Thread.sleep(3000); }else if(i==5){ //任务5耗时5秒 Thread.sleep(5000); }else{ //其它任务耗时1秒 Thread.sleep(1000); } System.out.println("task线程:"+Thread.currentThread().getName()+"任务i="+i+",完成!+"+new Date()); } catch (InterruptedException e) { e.printStackTrace(); } return i; } /** * * @Description 组合多个CompletableFuture为一个CompletableFuture,所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get. * @param futures List * @return * @author diandian.zhang * @date 2017年6月19日下午3:01:09 * @since JDK1.8 */ public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { //1.构造一个空CompletableFuture,子任务数为入参任务list size CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); //2.流式(总任务完成后,每个子任务join取结果,后转换为list) return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); } /** * * @Description Stream流式类型futures转换成一个CompletableFuture,所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get. * @param futures Stream * @return * @author diandian.zhang * @date 2017年6月19日下午6:23:40 * @since JDK1.8 */ public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) { List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList()); return sequence(futureList); } }
方式二返回结果:
task线程:pool-1-thread-1任务i=2,完成!+Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-9任务i=9,完成!+Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-6任务i=6,完成!+Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-8任务i=8,完成!+Mon Jun 19 18:26:17 CST 2017 任务6完成!result=6,异常 e=null,Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-4任务i=4,完成!+Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-7任务i=7,完成!+Mon Jun 19 18:26:17 CST 2017 任务4完成!result=4,异常 e=null,Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-3任务i=3,完成!+Mon Jun 19 18:26:17 CST 2017 任务3完成!result=3,异常 e=null,Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-10任务i=10,完成!+Mon Jun 19 18:26:17 CST 2017 任务10完成!result=10,异常 e=null,Mon Jun 19 18:26:17 CST 2017 任务7完成!result=7,异常 e=null,Mon Jun 19 18:26:17 CST 2017 任务8完成!result=8,异常 e=null,Mon Jun 19 18:26:17 CST 2017 任务2完成!result=2,异常 e=null,Mon Jun 19 18:26:17 CST 2017 任务9完成!result=9,异常 e=null,Mon Jun 19 18:26:17 CST 2017 task线程:pool-1-thread-2任务i=1,完成!+Mon Jun 19 18:26:19 CST 2017---》任务1耗时3秒 任务1完成!result=1,异常 e=null,Mon Jun 19 18:26:19 CST 2017 task线程:pool-1-thread-5任务i=5,完成!+Mon Jun 19 18:26:21 CST 2017---》任务5耗时5秒 任务5完成!result=5,异常 e=null,Mon Jun 19 18:26:21 CST 2017 list2=[6, 4, 3, 10, 7, 8, 2, 9, 1, 5]list=[],耗时=5076---》符合逻辑,10个任务,10个线程并发执行,其中任务1耗时3秒,任务5耗时5秒,耗时取最大值。
建议:CompletableFuture满足并发执行,顺序完成先手顺序获取的目标。而且支持每个任务的异常返回,配合流式编程,用起来速度飞起。JDK源生支持,API丰富,推荐使用。
5.总结:
本文从原理、demo、建议三个方向分析了常用多线程并发,取结果归集的几种实现方案,希望对大家有所启发,整理表格如下:
Futrue
FutureTask
CompletionService
CompletableFuture
原理 Futrue接口
接口RunnableFuture的唯一实现类,RunnableFuture接口继承自Future<V>+Runnable: 内部通过阻塞队列+FutureTask接口 JDK8实现了Future<T>, CompletionStage<T>2个接口 多任务并发执行 支持 支持 支持 支持 获取任务结果的顺序 支持任务完成先后顺序
未知 支持任务完成的先后顺序 支持任务完成的先后顺序 异常捕捉 自己捕捉 自己捕捉 自己捕捉 源生API支持,返回每个任务的异常 建议 CPU高速轮询,耗资源,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用。 推荐使用,没有JDK8CompletableFuture之前最好的方案,没有质疑 API极端丰富,配合流式编程,速度飞起,推荐使用!
-
Python:使用多线程并发执行任务,并接收有序的返回值
2019-05-05 09:59:04在使用多线程时,简单的IO操作有时满足不了我们的...右侧:通过多线程并发执行,共进行了4次调用,整个执行时间大约为用时最长的一次的时间 先看一下要进行TTS的数据: ["我的公众号是Python疯子", "内容没有花架... -
多线程并发执行任务,取结果归集。终极总结:Future、FutureTask、CompletionService、CompletableFuture
2018-03-26 16:46:22目录1.Futrue原理:demo:建议:此种方法可实现基本目标,任务并行且按照提交顺序获取结果。...demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使... -
多线程并发执行任务,取结果归集:Future、FutureTask、CompletionService、CompletableFuture
2018-04-03 10:01:35使用线程池提交Callable接口任务,返回Future接口,添加进李斯特,最后遍历FutureList且内部使用while轮询,并发获取结果 package thread.future; import java.util.ArrayList; import java.util.D... -
多线程并发执行任务,取结果归集。终极总结:Future、FutureTask、CompletionService、CompletableFuture.....
2018-04-10 19:04:22目录1.Futrue原理:demo:建议:此种方法可实现基本目标,任务并行且按照提交顺序获取结果。...demo2多线程并发执行并结果归集,这里多套一层FutureTask比较鸡肋(直接返回Future简单明了)不建议使... -
spring boot 多线程并发执行定时任务
2019-03-01 15:26:18Spring Boot默认情况下,所有定时任务会在一个线程中去执行,下面看测试代码,定义了三个测试Job: @Component @Slf4j public class TestJob { @Scheduled(cron = "0/3 * * * * ? ") public void job1() { ... -
Spring Boot使用多线程并发执行定时任务
2018-10-23 16:06:15工程代码示例 : Spring Boot集成持久化Quartz定时任务管理和界面展示 工程地址 :https://github.com/tjfy1992/SpringBootQuartz 运行方法 Spring Boot工程已经集成了服务器。右键点击DemoApplication.java -&... -
SpringBoot实现多线程并发动态执行计划任务
2020-07-07 14:32:49原来写了一篇关于springboot实现计划任务的文章,但是有较多的人都在问为什么数据库变更后计划任务没刷新,怎么去动态获取,怎么实现多线程并发执行,所以现在新开一篇文章,重新实现计划任务的方法,抽象出刷新功能... -
Spring Boot定时任务配置线程池多线程并发执行
2020-08-18 16:09:12问题:多个任务默认单线程执行,会存在阻塞问题。 解决方案: 线程池配置: import java.util.concurrent.Executors; import org.springframework.context.annotation.Configuration; import org.spring... -
多线程执行任务
2018-01-09 10:20:25多线程并发执行任务 public class ThreadReadDataHub extends Thread { protected static final Logger logger = LoggerFactory.getLogger(ThreadReadDataHub.class); private Thread t; private String threa -
基于python多线程实现Linux任务并发执行
2016-06-24 15:24:00基于python多线程实现Linux任务并发执行 -
多线程: Java使用ThreadPoolExecutor类执行多线程并发任务
2018-11-01 21:35:26import java.util.concurrent.*; /** * 线程池测试类 */ ...public class ThreadPoolTest { ... * 多线程的任务类 */ static class Task implements Runnable{ //线程id private int id; ... -
Java Script 多线程并发执行与异步执行浅析
2017-10-11 20:02:45没有特殊需求情况下JS中较大部分仍然是单线程运行...值得一提的是,单线程运行并非是代码的单线程运行而是任务的单线程运行,当然多线程的并发执行 还有其他的实现这里不在阐述。此博文仅帮助理解异步与并发两个概念。 -
Quartz 2 定时任务(二):多线程并发执行与数据共享
2017-04-20 05:42:491. 禁止同一个 JobDetail 中的多个实例并发执行 Quartz 定时任务默认都是并发执行的,不会等待上一次任务执行完毕,只要间隔时间到就会执行,如果定时任执行太长,会长时间占用资源,导致其它任务... -
springboot动态配置定时任务2种方式,整合Quartz多线程并发执行多个定时任务配置
2017-10-04 09:42:20我的项目是采用的idea+gradle+springboot项目构建,下面是springboot实现定时任务所需要的jar包 //定时任务使用 compile group: 'org.quartz-scheduler', name: 'quartz', version: '2.3.0' compile group: 'org.... -
oracle 多线程并发任务
2020-07-11 16:03:26oracle 创建多个定时任务job达到异步多线程的方式执行多个操作任务 Procedure Trimspaceall Is Begin For Rec In (Select * From Tabs Where Regexp_Like(Table_Name, '[0-9]')) Loop --通过任务后台 模拟多线程... -
多线程并发执行并获取结果
2020-03-12 15:26:12//任务接口 public interface ParallelTask { String getTaskKey();...// 任务执行结果 import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor public class ParallelTaskResult { p... -
【技能库】--批量任务多线程并发执行(324)
2017-09-16 14:13:04//指定线程池执行任务 200ms内没有执行完毕的下次不再提取 //循环执行 private static Stack<TaskWithId> stack = new Stack() ; private static ExecutorService pool = Executors. ... -
多线程并发执行---------------管理线程Boss做任务分发,n个Worker线程连续执行获得的任务
2010-12-11 11:45:00事情是这样的: Boss有很多个Task要做,请了n个Worker来为他工作,Boss取出一个Task给一个Worker,取出第二个Task给下一个Worker......直到每一个Worker都有Task。取得到Task的Worker就开始工作了,Boss... -
对线程并发执行的疑问
2020-04-05 22:33:16我的理解是在单核CPU上,通过多线程并发在某些情况下可以加快执行速度。 这在并发的线程类型不同时或者线程同时处理多种类型的任务时很容易理解,因为这样提高了系统的资源利用率。 但是对于某些任务应该是不可以的...