-
2019-08-25 14:57:00
日常的批量处理任务中,经常需要使用多线程同时处理大量任务,一次读取一定数量的数据,然后放入线程池中等待线程处理完成,再取一定数量数据进行循环处理。
效率比较低的方式是使用同步的for循环进行处理
其次就是使用多线程处理。一般情况使用多线程都会使用线程池来管理,有些情况下,不能把大量任务一次性丢进线程池中,以为内存有限,一般线程池的阻塞队列也是有界的,超出限制可能OOM或者触发拒绝策略,因此需要分批处理,假设一次性读取5000条数据,则需要先等待线程池处理完这5000条数据再进行下一次处理。这时候我们需要确认开启的多线程中的子任务全部结束,再让主线程去执行下一次处理。
大致总结的几种处理方案代码示例如下,本人水平有限,欢迎各位大佬指点留言,谢谢!private static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool(2 * CPU_COUNT); /**1. 使用CountDownLatch*/ public void testCountDownLatch() { //模拟查询到数据库中待处理数据 List batchList = new ArrayList<>(); for (int i = 0; i < 10; i++) { batchList.add(new java.lang.Object()); } if (CollectionUtils.isEmpty(batchList)) { return; } log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); final CountDownLatch countDownLatch = new CountDownLatch(batchList.size()); batchList.forEach(Object -> FORK_JOIN_POOL.execute(() -> { try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); log.info("当前线程休眠完成"); countDownLatch.countDown(); } catch (Exception e) { log.error("异常", e); } })); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); } /**2. 使用CyclicBarrier*/ public void testCyclicBarrier() { //模拟查询到数据库中待处理数据 List<Object> batchList = new ArrayList<>(); for (int i = 0; i < 10; i++) { batchList.add(new Object()); } if (CollectionUtils.isEmpty(batchList)) { return; } log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); final CyclicBarrier cyclicBarrier = new CyclicBarrier(batchList.size(), () -> { log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); testCyclicBarrier(); }); batchList.forEach(Object -> FORK_JOIN_POOL.execute(() -> { try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); log.info("当前线程休眠完成"); cyclicBarrier.await(); } catch (Throwable e) { log.error("异常", e); } })); } /**3. 使用CompletionService*/ public void testCompletionService() { for (int j = 0; j < 3; j++) { List<Object> batchList = new ArrayList<>(); for (int i = 0; i < 10; i++) { batchList.add(new Object()); } if (CollectionUtils.isEmpty(batchList)) { return; } log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); CompletionService completionService = new ExecutorCompletionService(FORK_JOIN_POOL); batchList.forEach(Object -> { completionService.submit(() -> { try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); log.info("当前线程休眠完成"); } catch (Throwable e) { log.error("异常", e); } return null; }); }); batchList.forEach(imgRecord -> { try { completionService.take().get(); } catch (Exception e) { e.printStackTrace(); } }); log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); } } /**4. 使用CompletableFuture*/ public void testCompletableFuture() { for (int j = 0; j < 3; j++) { List<Object> batchList = new ArrayList<>(); for (int i = 0; i < 10; i++) { batchList.add(new Object()); } if (CollectionUtils.isEmpty(batchList)) { return; } log.info("开始本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); ArrayList<CompletableFuture<?>> futureList = new ArrayList<>(); batchList.forEach(Object -> { final CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(new Random().nextInt(10)); log.info("当前线程休眠完成"); } catch (Throwable e) { log.error("异常", e); } }, FORK_JOIN_POOL); futureList.add(future); }); CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); log.info("完成本次批量处理,数据个数:{},时间:{}", batchList.size(), LocalDateTime.now()); } }
更多相关内容 -
多线程-springboot实现多线程处理任务
2021-01-06 11:16:302.springboot实现多线程 1.在入口类上开启多线程 @SpringBootApplication //@EnableScheduling @EnableAsync public class SampleController { public static void main(String[] args) throws Exception { ...1.概述
2.springboot实现多线程
1.在入口类上开启多线程
@SpringBootApplication //@EnableScheduling @EnableAsync public class SampleController { public static void main(String[] args) throws Exception { SpringApplication.run(SampleController.class, args); } }
2.添加配置类
其实也可以直接在启动类中配置,但通常不会这么做
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class AsyncTaskConfig implements AsyncConfigurer { /** * 核心线程数 */ // @Value("${threadPool.corePoolSize}") private int corePoolSize = 10; /** * 最大线程数 */ private int maxPoolSize = 50; /** * 线程池缓冲队列容量 */ private int queueCapacity = 10; /** * 空闲线程销毁前等待时长 */ private int awaitTerminationSeconds = 10; /** * 线程名前缀 */ private String threadNamePrefix = "Sample-Async-"; /** * ThreadPoolTaskExcutor运行原理 * 当线程池的线程数小于corePoolSize,则新建线程入池处理请求 * 当线程池的线程数等于corePoolSize,则将任务放入Queue中,线程池中的空闲线程会从Queue中获取任务并处理 * 当Queue中的任务数超过queueCapacity,则新建线程入池处理请求,但如果线程池线程数达到maxPoolSize,将会通过RejectedExecutionHandler做拒绝处理 * 当线程池的线程数大于corePoolSize时,空闲线程会等待keepAliveTime长时间,如果无请求可处理就自行销毁 */ @Override @Bean public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); threadPool.setCorePoolSize(corePoolSize); threadPool.setMaxPoolSize(maxPoolSize); threadPool.setQueueCapacity(queueCapacity); threadPool.setAwaitTerminationSeconds(awaitTerminationSeconds); threadPool.setThreadNamePrefix(threadNamePrefix); //关机时,是否等待任务执行完 threadPool.setWaitForTasksToCompleteOnShutdown(true); //设置拒绝策略 //CALLER_RUNS:由调用者所在的线程执行该任务 threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化线程 threadPool.initialize(); return threadPool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
3.编写任务类
import org.springframework.context.annotation.Scope; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @Service @Scope("prototype") public class AsyncTaskTestService { @Async public void asyncTaskTest() { for (int i = 0; i < 100; i++) { System.out.println(Thread.currentThread().getName() + "aaaaaaaaaaaa"); } } }
4.测试
我这里集成了swagger,就直接用controller测试了
@Autowired private AsyncTaskTestService asyncTaskTestService; @PostMapping("/asyncTest") @ApiOperation(value = "多线程测试") public void asyncTest() { for (int i = 0; i < 5; i++) { asyncTaskTestService.asyncTaskTest(); } }
5.多线程使用过程中可能遇到的问题
-
多线程处理任务并合并数据
2018-12-29 17:19:28newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待...一、线程池创建四种方式
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定时线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。二、有返回值的多线程
ExecutorService接口继承自Executor,Executor中的execute方法无返回值,ExecutorService接口中的方法有返回值。
三、计数器使用
CountDownLatch也是juc包中的一个类,类似倒计时计数器,创建对象时通过构造方法设置初始值,调用CountDownLatch对象的await()方法则处于等待状态,调用countDown()方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。
有了计数器就可以暂时将主线程阻塞,等异步的多线程全部执行完毕并返回结果后,再继续执行主线程。
四、线程安全问题
有了上面线程池跟计数器的基础,现在可以动手写一个多线程处理任务并合并数据的demo了。
大致思路就是:创建一个定长的线程池,长度为10,计数器初始值也设置为10。每执行一次,将计数器减一,并且将执行结果添加到list集合中,最终多线程全部执行完毕后,计数器停止等待 主线程继续往下执行,返回list。
public static List<String> getExecutorService() throws InterruptedException{ System.out.println("开始执行多线程..."); long startTime = System.currentTimeMillis(); List<String> list = new ArrayList<>();//存放返回结果 CountDownLatch countDownLatch = new CountDownLatch(10); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { Runnable runnable = new Runnable(){ @Override public void run() { try { Thread.sleep(3000); list.add(UUID.randomUUID().toString()); System.out.println("当前线程name : "+Thread.currentThread().getName()); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; executorService.execute(runnable); } countDownLatch.await(); System.out.println("submit总共cost 时间:" + (System.currentTimeMillis()-startTime)/1000 + "秒"); executorService.shutdown(); return list; }
执行结果如下:
十个线程全部工作,但是返回值中却有null值,跟想要的结果有点出入,为啥呢?
原因在于:ArrayList是非线程安全的。ArrayList的add方法中有size++,不是一个原子操作,所以线程不安全。
五、CopyOnWriteArrayList的用法
part4中提到的问题 解决方案很简单,将ArrayList换成CopyOnWriteArrayList即可。
public static List<String> getExecutorService() throws InterruptedException{ System.out.println("开始执行多线程..."); long startTime = System.currentTimeMillis(); List<String> list = new CopyOnWriteArrayList<>();//存放返回结果 CountDownLatch countDownLatch = new CountDownLatch(10); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { Runnable runnable = new Runnable(){ @Override public void run() { try { Thread.sleep(3000); list.add(UUID.randomUUID().toString()); System.out.println("当前线程name : "+Thread.currentThread().getName()); countDownLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }; executorService.execute(runnable); } countDownLatch.await(); System.out.println("submit总共cost 时间:" + (System.currentTimeMillis()-startTime)/1000 + "秒"); executorService.shutdown(); return list; }
CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操作无锁的ArrayList,写操作则通过创建底层数组的新副本来实现,是一种读写分离的并发策略,我们也可以称这种容器为"写时复制器"。
优点:读操作时性能很高,因为不需要任何同步措施,适用于读多写少的并发场景。
缺点:①.每次写操作都要copy原容器,频繁的GC,内存压力大。②.由于读写分离的策略,读到的数据很可能是旧数据。
-
java多线程执行任务(工具)
2022-03-09 14:54:09在项目开发的过程中经常会碰到多线程执行任务,每次用线程池实现时,由于每次的需求都有所差别有时是所有任务同时执行有时是分批次执行有时还需要知道所有任务什么时候执行完。今天闲着写了一个通用的多线程执行工具...java多线程执行任务
在项目开发的过程中经常会碰到多线程执行任务,每次用线程池实现时,由于每次的需求都有所差别有时是所有任务同时执行有时是分批次执行有时还需要知道所有任务什么时候执行完。今天闲着写了一个通用的多线程执行工具。
实现原理如图:
直接上代码
Scheder
import cn.hutool.core.util.RandomUtil; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; /** * @Author: dinghao * @Date: 2022/3/7 15:29 */ @Slf4j public class Scheder { /** * 任务信息数组 */ private TaskInfo[] taskInfos; /** * 执行计划队列 */ private LinkedBlockingQueue<Plan> planQueue; /** * 允许并行执行的线程数 */ private ExecutorService loopExecutor; /** * 执行模式: * 1:所有任务信息都执行 * 2:先执行部分任务,执行完后再执行其他任务 */ private int model; /** * 先执行的数量 */ private int modelSize; /** * model = 2 时有效 */ private ArrayList<Integer> indexList = new ArrayList<>(); /** * 协助判断,是否线程池的任务全部结束 */ private AtomicInteger count; /** * 调度器的执行状态 */ private volatile boolean status = false; /** 构造器 * @param taskInfos 任务数组 * @param nThrends 同时执行任务中的计划线程数 * @param queueSize 计划执行队列 * @param model 执行模式 1:所有任务信息都执行 2:先执行部分任务,执行完后再执行其他任务 * @param modelSize 每批执行任务的数量 */ public Scheder(TaskInfo[] taskInfos, int nThrends, int queueSize, int model, Integer modelSize) { this.taskInfos = taskInfos; this.planQueue = new LinkedBlockingQueue<>(queueSize); this.loopExecutor = Executors.newFixedThreadPool(nThrends); this.model = model; if (this.model == 2 && modelSize != null) { this.modelSize = modelSize > taskInfos.length ? taskInfos.length : modelSize; } else { this.modelSize = taskInfos.length; } count = countPlan(); } /** * 计算一共有多少执行计划 * @return / */ private AtomicInteger countPlan() { int sum = 0; for (int i = 0; i < this.taskInfos.length; i++) { sum += this.taskInfos[i].getPlanQueue().size(); } return new AtomicInteger(sum); } public Scheder(TaskInfo[] taskInfos, int nThrends, int model, Integer modelSize) { this(taskInfos, nThrends, 100, model, modelSize); } public Scheder(TaskInfo[] taskInfos, int nThrends) { this(taskInfos, nThrends, 100, 1, null); } public Scheder(TaskInfo[] taskInfos) { this(taskInfos, 10, 100, 1, null); } public void setModel(int model) { this.model = model; } public int getModel() { return model; } public void setModelSize(int modelSize) { this.modelSize = modelSize; } public int getModelSize() { return modelSize; } public ArrayList<Integer> getIndexList() { return indexList; } public AtomicInteger getCount() { return count; } public boolean isStatus() { return status; } /** * 执行方法 */ public void run() { if (this.status) { log.warn("任务处于启动状态"); return; } this.status = true; // 开启向队列中添加执行计划线程 init(); // 循环执行执行计划 while (this.status) { // 所有执行计划执行完后,退出 if (this.taskInfos.length <= 0 && this.planQueue.size() == 0) { this.status = false; break; } try { // 获取一个执行计划 Plan plan = this.planQueue.take(); // 执行计划 this.loopExecutor.execute(() -> plan.run0(this.count)); } catch (InterruptedException e) { log.error("任务执行中发生异常", e); } } int size; // 所有线程执行完毕出循环 for (; ; ) { size = this.count.get(); if (size == 0) { break; } } //停止线程池 this.loopExecutor.shutdown(); for (; ; ) { //只有当线程池中所有线程完成任务时才会返回true,并且需要先调用线程池的shutdown方法或者shutdownNow方法。 if (this.loopExecutor.isTerminated()) { System.out.println("执行结束!"); break; } } } /** * 开启一个线程,持续向执行计划队列添加执行计划,直到所有的计划任务添加完 */ private void init() { new Thread(() -> { while (this.status) { // 任务信息数组数量 int length = this.taskInfos.length; // 执行完结束线程 if (length <= 0) { break; } // 获取添加执行计划的的任务索引值 int index = getIndexOfModel(this.model, length); TaskInfo taskInfo = null; try { taskInfo = this.taskInfos[index]; } catch (Exception e) { e.printStackTrace(); } LinkedList<Plan> plans = taskInfo.getPlanQueue(); if (plans.size() > 0) { Plan plan = plans.removeFirst(); try { this.planQueue.put(plan); } catch (InterruptedException e) { log.error("向执行计划队列放入计划异常", e); } } else { this.taskInfos = reBuildTaskInfos(this.taskInfos, index); } } }).start(); } /** * 根据执行模式获取添加执行计划的的任务信息索引值 * * @param model 执行模式 * @param length 任务信息数组数量 * @return 任务信息索引值 */ private int getIndexOfModel(int model, int length) { if (model == 1) { return RandomUtil.randomInt(0, length * 100) % length; } else { this.indexList.removeIf(item -> item >= length); if (this.indexList.size() < this.modelSize) { int index = RandomUtil.randomInt(0, length * 100) % length; this.indexList.add(index); return index; } else { return this.indexList.get(RandomUtil.randomInt(0, length * 100) % this.indexList.size()); } } } /** * 重新构建任务信息数组 * * @param taskInfos 原来任务信息数组 * @param index 需要移除的任务信息 * @return 新的任务信息数组 */ private TaskInfo[] reBuildTaskInfos(TaskInfo[] taskInfos, int index) { TaskInfo[] newTaskINfo = new TaskInfo[taskInfos.length - 1]; for (int j = 0, i = 0; i < taskInfos.length; i++) { if (i != index) { newTaskINfo[j] = taskInfos[i]; j++; } } return newTaskINfo; } }
构造器解释:
- @param taskInfos 任务数组
- @param nThrends 同时执行任务中的计划线程数
- @param queueSize 计划执行队列
- @param model 执行模式 1:所有任务信息都执行 2:先执行部分任务,执行完后再执行其他任务
- @param modelSize 每批执行任务的数量
TaskInfo
import lombok.Data; import java.util.LinkedList; /** * @Author: dinghao * @Date: 2022/3/7 15:31 */ @Data public class TaskInfo { /** * 任务名称 */ private String name; /** * 执行计划队列 */ private LinkedList<Plan> planQueue; public TaskInfo(String name, LinkedList<Plan> planQueue) { this.name = name; this.planQueue = planQueue; } }
Plan
import java.util.concurrent.atomic.AtomicInteger; /** * @Author: dinghao * @Date: 2022/3/7 15:37 */ public interface Plan { /** * 线程池执行前 */ default void before(){ } void run(); /** * 线程池执行后 */ default void after(){ } default void run0(AtomicInteger atomicInteger) { try{ before(); run(); }finally { after(); atomicInteger.decrementAndGet(); } } }
上面就是封装的工具
实现自己的计划
MyPlan
import lombok.Data; /** * @Author: dinghao * @Date: 2022/3/9 10:33 */ @Data public class MyPlan implements Plan { private String name; @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + name); } }
Test
public class Test { public static void main(String[] args) { int userSize = 1000; int jobSize = 100; TaskInfo[] taskInfos = new TaskInfo[userSize]; IntStream.range(0, userSize).parallel().forEach(i -> { LinkedList<Plan> plans = new LinkedList<>(); for (int j = 0; j < jobSize; j++) { MyPlan myPlan = new MyPlan(); myPlan.setName("用户" + i + ",执行计划" + j); plans.add(myPlan); } taskInfos[i] = new TaskInfo("用户" + i, plans); }); Scheder scheder = new Scheder(taskInfos, 10, 2, 10); scheder.run(); } }
测试结果:
-
SpringBoot--多线程处理
2021-07-31 00:38:02原文网址:SpringBoot--多线程处理_IT利刃出鞘的博客-CSDN博客 简介 为什么需要多线程 项目里经常会遇到这样的场景: 读一次数据库的某个表 遍历这些数据,对每一个数据,都以它为条件再次查其他表 将第二步查到的... -
Java多线程并行处理任务的实现
2019-04-20 21:08:02Java多线程并行处理任务的实现 在实际项目开发的过程中,遇到过需要处理一个由多个子任务组成的任务的问题.顺序处理起来会造成响应时间超长,用户体验不好的问题.我想到一个解决方案,即使用多线程并行处理子任务.思路... -
Spring-Boot中如何使用多线程处理任务
2016-08-25 19:22:37看到这个标题,相信不少人会感到疑惑,回忆你们自己的场景会发现,在Spring的项目中很少有使用多线程处理任务的,没错,大多数时候我们都是使用Spring MVC开发的web项目,默认的Controller,Service,Dao组件的作用... -
Java多线程之运行多个任务并处理所有结果
2018-08-23 23:13:47执行器框架给我们提供了一个方法,让我们可以发送给执行器一个任务列表,并等待任务列表中的所有任务执行完毕。然后它将返回一个与任务列表对应的Future列表。 package com.primer.demo.util; import lombok.... -
SpringBoot多线程任务处理
2019-03-01 08:56:47Spring-Boot中如何使用多线程处理任务 https://www.cnblogs.com/qindongliang/p/5808145.html 看到这个标题,相信不少人会感到疑惑,回忆你们自己的场景会发现,在Spring的项目中很少有使用多线程处理任务的,没错... -
多线程异步任务处理
2018-11-07 21:29:31文章目录多线程异步任务处理线程池线程池的优缺点常用的线程池技术@Async注解源码 我们常用ThreadPoolExecutor提供的线程池服务,springboot框架提供了@Async注解,那我们怎么去使用它呢?我们先来了解下什么是... -
Springboot定时任务【多线程处理】
2022-03-28 11:00:59当一个定时任务出现问题,另一个定时任务也无法执行,整个定时任务服务可能会卡死,不再运行,下面所以写,通过多线程方式调用定时任务,创建线程池,姐可能的避免任务冲突的情况; 第一步,创建线程池 package ... -
springboot2.0 多线程并发执行任务
2020-01-14 11:22:29springboot2.0版本 执行多线程方式,个人学习了两种,一种直接是继承父类Thread或实现Runnable 接口类,,重写run方法;第二种是通过springboot的支持注解@Async的方式。 第一种:自定义类继承Thread类或继承... -
java多线程处理任务【原】
2011-06-23 12:00:00多线程处理任务 很多时候,我们需要对一个庞大的队列或者二维数组进行处理。这些处理可能是循环的,比如网络爬出,也可能是有结尾的,比如给一个 excel 多个 sheet 的联系人列表发邮件。很幼稚的方法就是用一个... -
多线程处理大量数据 java
2020-09-27 17:12:11例如:获取大量数据并处理,生成execl文件导出 问题描述: 5W条数据处理后生成execl文件需要6个小时,效率慢 APP 中接收数据代码: @Override public void run() { bytes = mmInStream.read(buffer); mHandler.... -
定时任务使用多线程注意事项
2021-03-22 18:08:17在定时任务中为了加快处理速度,一般都会使用多线程处理业务。需要注意一下事项:1. 定时任务是否允许上一个定时任务未结束,下一个定时任务可以启动,通过Scheduled中的配置在决定。2. 主线程已经关闭,线程池中的... -
ScheduledExecutorService:多线程任务调度
2018-08-09 19:45:44多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。 建议多线程-任务调度,使用如下方式: 首先... -
多线程处理rabbitmq消息
2018-08-24 11:23:17问题描述:项目中接收到rabbitmq消息后,先进行一系列的处理,等所有处理完成后,将消息推送到前台,但是在处理消息的过程中,每个方法中都有与数据库交互的代码,直接导致消息推送不及时。 单线程代码模型: ... -
OpenCV视频流的C++多线程处理方式
2021-01-14 11:20:08目录为什么需要多线程处理视频流C++的多线程处理方式函数封装的实现方式类封装的实现方式可能遇到的问题 为什么需要多线程处理视频流 在之前有写过一篇文章Python环境下OpenCV视频流的多线程处理方式,上面简单记录... -
线程池使用ExecutorService 多线程处理队列任务
2017-09-03 22:48:45接下来说一种非常实用的多线程操作模式,此方式能够应对大部分的多线程操作,稍微改一下往里面套就可以满足大部分的业务需求。 基本方式是: 使用ExecutorService 管理多线程,使用Executors创建newFixedThrea -
php的pthreads扩展实现执行多线程任务
2022-04-07 11:10:31一、PHP安装pthreads的多线程扩展 下载pthreads源码:http://pecl.php.net/package/pthreads 首先确定安装的php版本是线程安全的,如果不是的话重新编译加上 --enable-maintainer-zts \ 1.解压tar -zxvf pthreads... -
java多线程提交任务并返回结果
2018-08-16 21:30:06java多线程提交任务并返回结果 最近工作中有需要短时间内提交大量请求,并获取响应结果,最终选择了CompletionService接口来实现,它整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,... -
Python环境下OpenCV视频流的多线程处理方式
2020-11-24 19:52:44目录前言Python多线程编程OpenCV视屏流的多线程处理 前言 最近在功能性测试的过程中,需要在Python环境下用OpenCV读取网络摄像头的视频流,接着用目标检测器进行视屏帧的后续处理。在测试过程中发现如果是单线程的... -
ffmpeg 转码及多线程处理
2022-03-14 17:19:58但是你会发现执行的很慢,不要慌,ffmpeg自带了多线程转码方法 -threads 5 -preset ultrafast 即: ffmpeg -report -i /data/aaa.mp4 -threads 5 -preset ultrafast -f hls /data/aaa/aaa.m3u8 以上转完之后,m3u8... -
定时任务下的多线程任务
2018-03-21 17:45:55多线程的报警需要在一次定时任务内执行完毕后,再执行下一次定时任务 。 6.1 创建多线程队列: ConcurrentLinkedQueue < Runnable > tasksQueue = new ConcurrentLinkedQueue (); 6.2 创建计数器(构造方法中... -
多线程处理List数据
2018-11-04 17:26:35CountDownLatch进行多线程处理list 由于公司的一个辅算系统进行计算的时间比较长3万的数据需要5分钟才能算完这个完全超出了预算,我跟负责项目的同事交流之后发现代码的if 语句特别多导致臃肿等等。针对这些先了几点... -
多线程处理十万百万级List(大list处理)
2021-04-24 21:45:12普通单线程处理,处理时间长,还经常报gc问题。针对此问题,查阅了网上很多资料,好多都使用多线程来处理。跟着好多的博客进行处理,要么是线程安全问题,要么根本速度就提高不了。我针对我项目中的使用场景,结合... -
大漠多线程循环任务自动切换模板后台绑定启动线程
2020-06-03 09:11:09子程序 _按钮_开始_被单击, , , 绑定,启动线程 .局部变量 n, 整数型 .局部变量 索引, 整数型 .局部变量 句柄, 整数型 .判断开始 (按钮_开始.标题 = “全部开始”) 按钮_开始.标题 = “全部停止” 超级延时 ... -
一个任务分解成多个子任务每个子任务分配一个线程处理(多线程处理一批大数据)
2019-07-17 10:52:251、首先需要理解CountDownLatch: ...CountDownLatch的作用也是如此,在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待在门口,而这个“倒数”过程则是由各个执行线程... -
java多线程分批执行任务demo
2020-04-28 15:01:0710个任务完成后,不管任务成功或者失败,线程池回收10个线程继续完成剩下10个任务。 实际应用:当我们批量的需求比如启动1000个节点,启动一个节点的时间大概是3s,我们肯定不会去并行执行1000次启动,肯定是利用... -
多线程之Task(任务)
2018-11-12 15:44:30任务和线程的区别: 1、任务是架构在线程之上的,也就是说任务最终还是要抛给线程去执行。 2、任务跟线程不是一对一的关系,比如开10个任务并不是说会开10个线程,这一点任务有点类似线程池,但是任务相比线程池...