精华内容
下载资源
问答
  • 1.CountDownLatch 现在做的这个华为云TaurusDB比赛中,参考的之前参加过阿里的PolarDB大赛的两个大佬的代码,发现都有用到CountDownLatch这个类,之前看代码的时候也看过,但是没有搞得很明白,自己写也写不出来,...

    1.CountDownLatch

    现在做的这个华为云TaurusDB比赛中,参考的之前参加过阿里的PolarDB大赛的两个大佬的代码,发现都有用到CountDownLatch这个类,之前看代码的时候也看过,但是没有搞得很明白,自己写也写不出来,在此自己先学习一下。

    字面理解:CountDownLatch:数量减少的门栓。

    创建这样一个门栓

    CountDownLatch countDownLatch = new CountDownLatch(count);

    参数:count,门栓的计数次数。在所有线程执行完成之前,调用countDownLatch.await()阻塞主线程。每当一个线程执行完一个指定动作之后,count就会减少1,当count等于0时,主线程不再阻塞,开始继续执行下面的代码,当count大于0时,主线程一直阻塞,等待count变为0。每个线程动作执行结束后,执行countDownLatch.countDown(),这个门栓的count减一。

    int ThreadNum = 16;
    CountDownLatch countDownLatch = new CountDownLatch(ThreadNum);
    for(int i = 0; i < ThreadNum ; i++){
       final int finalI = i;
       new Thread(() -> {
          int n = 0;
          System.out.println("线程应该做的事情");
          while(n < 10){
             n++;
          }
          countDownLatch.countDown();
       }).start();
    }
    try{
       countDownLatch.await();
    }catch(InterruptedException e){
       logger.infor("InterruptedException!!");
    }

    2.线程池

    其实线程池之前的ipv6的项目里用过,但是也忘记得差不多了,复习一下。

     线程在创建和关闭时都需要花费时间,如果为每一个小的任务都创建一个线程,可能创建和销毁线程所用的时间会多于该线程真实工作所消耗的时间,就会得不偿失。除了时间,空间也需要考虑,线程本身也是要占用内存空间的,大量的线程会食用过多的内存资源,可能会造成OOM。另外在回收时,大量的线程会延长GC的停顿时间。

    因此在生产环境中使用线程必须对其加以控制和管理

    使用线程池之后,创建线程变成了从线程池中获得空闲的线程,关闭线程变成了归还线程给线程池。

    通过ThreadPoolExecutor可以创建一个线程池,ThreadPoolExecutor实现了Executors接口。

    举个栗子:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    public class ThreadPoolTest {
        public static void main(String[] args) {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(10,20,60, 
                    TimeUnit.SECOUNDS,new ArrayBlockingQueue<Runnable>(15000),new ThreadFactory(){
                private AtomicInteger threadId = new AtomicInteger(0);
                @Override
                public Thread newThread(Runnable r){
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    String prefix = "thread-";
                    thread.setName(prefix+threadId.incrementAndGet());
                    return thread;
                }  
            });
        }
    }

    这样就创建了一个线程池。参数依次解释:

    corePoolSize:指定了线程池中线程的数量,线程池中可以有10个存活的线程

    maximumPoolSize:指定了线程池中最大的线程数,线程池中最多能有20个存活的线程

    keepAliveTime:当线程池中的数量超过corePoolSize时,这些线程在多长时间会被销毁,60s

    unit:keepAliveTime的单位

    workQueue:任务队列,被提交但是没有被执行的任务存在的地方。他是一个BlockingQueue<Runnable>接口的对象。

    threadFactory:线程工厂,你想创建什么样子的线程

    重点说一下workQueue:

    根据队列的功能分类,可以使用以下几种BlockingQueue接口

    展开全文
  • CountDownLatch是什么 &nbsp;&nbsp;&nbsp;&nbsp;CountDownLatch是一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。 &nbsp;&nbsp;&nbsp;&nbsp;...

    一. CountDownLatch是什么
        CountDownLatch是一个同步工具类,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
        CountDownLatch是通过一个计数器实现的,计数器的初始值为线程的数量,这是一个一次性现象,计数不能重置。如果需要重置计数,可以考虑使用cyclicbarrier。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

    二. CountDownLatch中的主要方法
        1. public void CountDownLatch(int count) {…} 构造器中的计数值(count)实际上就是需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重置这个计数值。与CountDownLatch第一次交互式主线程等待其他线程,主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。
        2. await() :导致当前线程等待,直到计数器下降到零,除非线程被中断。如果当前计数为零,则此方法立即返回。如果当前计数大于零,则当前线程将禁用线程调度,并且处于休眠状态,直到出现两种情况之一:计数达到零的countdown()方法调用;或其他线程中断当前线程。
        3. countDown():每调用一次这个方法,在构造函数中初始化的count值就减1。直到count的值为0,主线程就能通过await()方法,恢复执行自己的任务。

    三. 线程池
        Java通过Executors提供四种线程池,分别是:
        1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
        2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
        3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
        4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    四. 使用场景
        利用CountDownLatch和newFixedThreadPool实现excel批量导入需求。关键代码:

    /**
     * Description:
     * 线程池工具类
     * 保证全局使用一个相同的线程池,方便控制线程的创建和销毁
     */
    public class ThreadPoolUtil {
    
        private static final ExecutorService pool = Executors.newFixedThreadPool(1000);
    
        public static void submit(Runnable runnable){
            pool.submit(runnable);
        }
    }
    /**
    * 后台挂起执行列传商品以及插入数据库
    */
    Map<String, Object> runAdd(final Map<Integer,String> errorRowMap,final HSSFSheet hssfSheet, final WmsCompany company){
       final Map<String, Object> result = new HashMap<>();
       result.put("repeatCount", 0);
    
       //保存商品数据的集合
       final Map<String, WmsGoods> goodsMap = new ConcurrentHashMap<>();
    
       //通过检查的excel行数,即导入的商品数
       final int rowCount = **;
    
       //使添加线程等待转换线程执行后执行的工具
       final CountDownLatch doWork = new CountDownLatch(rowCount);
    
       try {
           for (int i = 2; i <= rowCount; i++) {
               final HSSFRow hssfRow = hssfSheet.getRow(i);
    
               if (hssfRow == null){
                   doWork.countDown();
                   continue;
               }
    
               if( errorRowMap.containsKey(i)){
                   doWork.countDown();
                   continue;
               }
    
    ThreadPoolUtil.submit(new Runnable() {
         @Override
         public void run() {
            WmsGoods wmsGoods = cellToWmsGoods(hssfRow);
    
              if (wmsGoods != null) {
    
                try{
                   wmsGoods.setCompid(company.getCompid());
                   wmsGoods.setStoreid(company.getStoreId().intValue());
                   wmsGoods.setStorename(company.getStoreName());
                   wmsGoods.setCreaemp(company.getMobile());
                   wmsGoods.setCrearq(DateUtil.getStrTime());
                   wmsGoods.setGoodType("0");//默认普通商品
                ......
     logger.info("公司信息:"+new Gson().toJson(company).toString());
    
                   }catch(Exception e){
                      e.printStackTrace();
                   }
             }
                 //递减锁存器的计数
                 doWork.countDown();
           }
        });
    
     }
    
          //添加线程等待转换线执行
          doWork.await();
           //执行添加操作
           List<WmsGoods> goodsList = new CopyOnWriteArrayList<>();
           goodsList.addAll(goodsMap.values());
    
           result.putAll(wmsGoodsService.addGoodsTimer(goodsList));
       } catch (Exception e) {
           e.printStackTrace();
    
           logger.error("添加商品失败!");
           result.put("state", 1);
           result.put("msg", e.getMessage());
       }finally {
           logger.info("添加商品完成---------------!");
       }
       return result;
    }

    五. 遇到的问题

        1. 在使用线程池批量导入,几次遇到了服务器多次挂了的情况,排查原因,主要是程序的线程池未及时回收,导致资源不足,所以采用了一个全局的线程池,固定开启1000个线程。

        2. 底层采用的是mybatis,最开始是用的批量更新方法,将每次导入的数据一次入库,更新,后来遇到时间过长的问题,所以改成了单条执行插入,更新操作。

    展开全文
  • 本文写一个线程池CountDownLatch结合例子,主任务先行开始,打印出开始执行时间,等待n(demo设置的为5)个子任务执行完毕,进行后续打印操作。附上源码,仅供学习了解所用。 具体处理事务的子任务类: package...

    本文写一个线程池与CountDownLatch结合例子,主任务先行开始,打印出开始执行时间,等待n(demo设置的为5)个子任务执行完毕,进行后续打印操作。附上源码,仅供学习了解所用。

    具体处理事务的子任务类:

    package com.gll.basic.multi;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 
     * @author LynnGe
     *
     */
    public class CounterTask implements Runnable {
    	private CountDownLatch latch;
    	private List<Integer> countList;
    
    	public CounterTask(CountDownLatch latch, List<Integer> countList) {
    		this.latch = latch;
    		this.countList = countList;
    	}
    
    	@Override
    	public void run() {
    		try {
    			//doSomeThing....
    			countList.add(new Integer(1));
    		} finally {
    			if (latch != null) {
    				latch.countDown();
    			}
    		}
    	}
    
    }

     

    测试主类

    package com.gll.basic.multi;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 
     * @author LynnGe
     *
     */
    public class ThreadPoolDemo {
    	static ExecutorService taskPool = Executors.newFixedThreadPool(6);
    
    	public static void main(String[] args) throws Exception {
    		int waitTimes = 5;
    		CountDownLatch latch = new CountDownLatch(waitTimes);
    		List<Integer> rlt = new ArrayList<Integer>();
    		new WaitingThread(latch).start();
    		//创建任务的次数和等待的次数相等
    		for (int i = 0; i < waitTimes; i++) {
    			taskPool.execute(new CounterTask(latch, rlt));
    		}
    		Integer sum = 0;
    		for (Integer i : rlt) {
    			sum += i;
    		}
    		System.out.println("sum:: " + sum);
    		// 任务全部执行完毕,所以获取的为0
    		System.out.println("latch:: " + latch.getCount());
    		System.out.println("taskPool shutdown flag:: " + taskPool.isShutdown());
    		//立即关闭线程池
    		taskPool.shutdownNow();//中断线程池中所有线程,实际使用中慎用
    		System.out.println("taskPool shutdown flag:: " + taskPool.isShutdown());
    
    	}
    
    	static class WaitingThread extends Thread {
    		CountDownLatch countDownLatch;
    
    		public WaitingThread(CountDownLatch countDownLatch) {
    			this.countDownLatch = countDownLatch;
    		}
    
    		@Override
    		public void run() {
    			System.out.println(System.nanoTime() + " 等待  " + countDownLatch.getCount() + " 个子任务执行开始");
    			try {
    				countDownLatch.await();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    			System.out.println(System.nanoTime() + " 等待结束");
    		}
    	}
    
    }
    

    运行结果 :

    程序运行结果

     

    展开全文
  • springboot线程池结合CountDownLatch使用

    千次阅读 2019-11-05 16:34:32
    spring boot 处理多线程等待 ...使用 CountDownLatch 统计线程数,实现等待多个子线程执行完毕 @RestController public class TestController { private static Logger logger = LoggerFactory.get...

    springboot线程池结合CountDownLatch使用

    使用场景
    等待多个子线程并行执行完毕,通知主线程继续执行任务

    1 配置线程池

    配置spring boot 线程池

    2 多线程调用

    2.1 调用代码

    使用 CountDownLatch 统计线程数,实现等待多个子线程执行完毕

    @RestController
    public class TestController {
    
        private static Logger logger = LoggerFactory.getLogger(TestController.class);
    
        @Autowired
        private TestService testService;
        @Autowired
        private ThreadPoolTaskExecutor ThreadPoolA;
    
    
        @RequestMapping("/test")
        public void test()throws Exception{
            logger.info("main->start");
            //初始化子线程个数 2
            final CountDownLatch latch =new CountDownLatch(2);
            ThreadPoolA.execute(() -> {
                try {
                    logger.info("thread1->doWork");
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    logger.info("thread1->end");
                    //线程数-1
                    latch.countDown();
                }
            });
            ThreadPoolA.execute(()->{
                try {
                    logger.info("thread2->doWork");
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    logger.info("thread2->end");
                    //线程数-1
                    latch.countDown();
                }
            });
            logger.info("main->doWork");
            //等待线程执行完毕
            latch.await();
            logger.info("main->end");
        }
    }
    

    需获取线程返回值时 ThreadPoolA.execute 改为 ThreadPoolA.submit,例如:

            //省略
            Future<Long> future = ThreadPoolA.submit(() -> {
                //省略
                return 1L;
            });
            //省略
            latch.await();
            //调用future.get()将会在该线程执行完毕后才会往下执行
            logger.info(future.get());
    

    为避免线程数统计不正确导致死锁,建议使用latch.await(long timeout, TimeUnit unit)

    //十分钟内线程计数未归零,将自动释放
    latch.await(10, TimeUnit.MINUTES);
    

    2.2 执行结果

    在这里插入图片描述

    2 CountDownLatch 源码展示

    public class CountDownLatch {
        
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        /**
         * 初始化线程个数
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * 当前线程等待直到线程数为0
         */
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * 当前线程等待直到线程数为0或超过等待时间
         */
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * 统计线程个数,如果计数达到零,则释放所有等待线程
         */
        public void countDown() {
            sync.releaseShared(1);
        }
        public long getCount() {
            return sync.getCount();
        }
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }
    }
    

    上一篇:spring boot 配置多线程池

    展开全文
  • 1. 首先在项目配置异步线程池,如下: @EnableAsync // 开启异步任务 @Configuration public class TaskPoolConfig { @Bean("taskExecutor") // 线程池名称 public Executor taskExecutor() { // 使用Spring封装...
  • 今天和大家分享的是:在开发服务端API时候,如何合理的运用线程池+CountDownLatch来保证API的高并发访问...本文不详细赘述线程池的原理和数据结构,只是先普及下入门知识,然后再看如何正确的把这两种技术结合起来使用
  • Java线程池CountDownLatch的使用

    千次阅读 2016-07-25 12:43:23
    Java多线程除了可以直接使用Thread和Runnable来实现...主要介绍线程池在实际工作的统计使用,并且介绍java并发包中同步锁的实现:CountDownLatch。 1、什么是CountDownLatch 直译过来就是倒计数(CountDown)门闩(Latch
  • CountDownLatch线程池结合,返回执行结果优雅关闭线程池,参考https://www.cnblogs.com/goodA…import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.CountDownLatch;import java.util....
  • 本代码使用缓存线程池cacheThreadPool初始化多个线程,使用countdownlatch做子线程的计数控制,当子线程执行完,释放执行权,交给主线程,开始下一轮的执行。 话不多说:上代码 package com.test.thread; import...
  • countDownLatch的使用

    2021-01-30 11:09:07
    countDownLatch是在java1.5被引入,跟它一起被引入的工具类还有CyclicBarrier、Semaphore、concurrentHashMap和BlockingQueue。 存在于java.util.cucurrent包下。 2.概念 countDownLatch这个类使一个线程等待其他...
  • 本次文章记录在日常开发中线程池的使用 Executors介绍:Executors的使用 本文主要介绍 newFixedThreadPool 创建固定的线程池的使用 newFixedThreadPool 创建一个固定的线程池,如果所有线程都处于活动状态,并且有新...
  • 线程池的创建和使用 threadlocal的使用 countDownLatch的使用 高并发场景的使用 下面是一个简单的例子,但是包含了很多内容,大家细细品尝 import io.netty.util.concurrent.DefaultThreadFactory; import java....
  • 多线程按序打印之使用CountDownLatch遇到的问题及解决 多线程的交替打印问题(如两个线程循环打印1到100、三个线程循环打印a到z等都可以使用CountDownLatch解决),这里记录我使用CountDownLatch遇到的一个bug。 先...
  • ThreadPoolExecutor/ExecutorService:线程池,使用线程池可以复用线程,降低频繁创建线程造成的性能消耗,同时对线程的创建、启动、停止、销毁等操作更简便。 CountDownLatch是在java1.5被引入的,跟它一起被引入的...
  • &amp;nbsp;&amp;...使用线程池CountDownLatch完成多线程阻塞 这篇文章主要是将主线程阻塞,让子线程先跑。 1、添加Spring配置文件 &amp;lt;?xml version=&quot;1.0&q
  • 主方法入参threadSize线程池数量。 通过FutureTask 来获取线程执行返回的数据。 public static Map<String, String> demo(int threadSize) { Map<String, String> map = new HashMap<>(); ...
  • 1 场景 多线程并行处理多个任务,所有任务执行结束,获取执行结果,提高任务执行效率,使用多线程处理任务耗时取决于单个线程耗时最长的任务,而不是时间的叠加。单线程执行多个任务,耗时则是所有任务的耗时总和。...
  • 此次主要整理线程执行顺序的相关实现方法,包括join(),CountDownLatch,CyclicBarrier和线程池。一. join()join() 是 Thread 类的一个方法,join() 方法的作用是等待当前线程结束,也即让“主线程”等待“子线程”...
  • 写在前面: 希望大家从一开始就跟着敲, 收获一定是满满...//提供线程池,TransmittableThreadLocal两个工具类 <dependency> <groupId>com.alibaba</groupId> <artifactId>transmittable-threa.
  • 工作中遇到一个需求,需要通过多线程调用其他接口,再将得到的结果进行...本文选用CountDownLatch工具类,并结合线程池进行实现实现该功能。 public Resp getDatasourceListByOu(String ouId) { ExecutorService execu
  • CountDownLatch countDownLatch = new CountDownLatch(6); System.out.println("开始执行多线程任务1111111111:::"+System.currentTimeMillis()); for (int i =0;i;i++){ test.doTaskThree(countDownLatch,i); ...
  • CountDownLatch latch = new CountDownLatch(list.size()); for(Entity entity:list){ asyncTask.queryTask(entity,latch); } latch.await(); return list; } } AsyncTask: @Component public class ...
  • 在知乎上看到一道Java多线程的笔试题,心想我多线程只会Thread和Runable,在写WebFluxTest用过一次CountDownLatch,除此之外还没怎么去看cocurrent包下的类,所以就想试试。 题目 知乎传送门:某大型电商Java...
  • // 锁存器 CountDownLatch latch = new CountDownLatch(list.size()); ThreadPool threadPool = new ThreadPool(list, latch); int a = threadPool.work(); return "success:" + a + "\terror:" + (list.size() - a...
  • 在实际的解决方案中我们可以通过闭锁CountDownLatch来解决并行执行的过程,但是CountDownLatch闭锁无法解决部分串行化执行的问题,在java8中,有一种更加合理的多任务处理方案,就是我们的异步编排,可以根据实际...
  • 线程池优化和CountDownLatch实现线程等待   CountDownLatch主要解决一个线程(主线程)等待多个线程的场景,计算器不能循环利用,下次用的时候要再次new出来。   // 创建 2 个线程的线程池 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,980
精华内容 2,392
关键字:

countdownlatch结合线程池