精华内容
下载资源
问答
  • CountDownLatch实现原理

    万次阅读 2017-10-12 17:31:17
    CountDownLatch用过很多次了,突然有点好奇,它是如何实现阻塞线程的,一开始还以为跟LockSupport...实现原理:让需要的暂时阻塞的线程,进入一个死循环里面,得到某个条件后再退出循环,以此实现阻塞当前线程的效果。

    CountDownLatch用过很多次了,突然有点好奇,它是如何实现阻塞线程的,猜想是否跟LockSupport有关。今天浏览了一下它的源码,发现其实现是十分简单的,只是简单的继承了AbstractQueuedSynchronizer,便实现了其功能。


    实现原理:让需要的暂时阻塞的线程,进入一个死循环里面,得到某个条件后再退出循环,以此实现阻塞当前线程的效果。


    下面从构造方法开始,一步步解释实现的原理:

    一、构造方法

    下面是实现的源码,非常简短,主要是创建了一个Sync对象。

    public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    二、Sync对象

     private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }

    假设我们是这样创建的:new CountDownLatch(5)。其实也就相当于new Sync(5),相当于setState(5)。setState我们可以暂时理解为设置一个计数器,当前计数器初始值为5。


    tryAcquireShared方法其实就是判断一下当前计数器的值,是否为0了,如果为0的话返回1(返回1的时候,就表明当前线程可以继续往下走了,不再停留在调用countDownLatch.await()这个方法的地方)。


    tryReleaseShared方法就是利用CAS的方式,对计数器进行减一的操作,而我们实际上每次调用countDownLatch.countDown()方法的时候,最终都会调到这个方法,对计数器进行减一操作,一直减到0为止。


    稍微跑偏了一点,我们看看调用countDownLatch.await()的时候,做了些什么。

    三、countDownLatch.await()

     public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    代码很简单,就一句话(注意acquireSharedInterruptibly()方法是抽象类:AbstractQueuedSynchronizer的一个方法,我们上面提到的Sync继承了它),我们跟踪源码,继续往下看:

    四、acquireSharedInterruptibly(int arg)

     public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }

    源码也是非常简单的,首先判断了一下,当前线程是否有被中断,如果没有的话,就调用tryAcquireShared(int acquires)方法,判断一下当前线程是否还需要“阻塞”。其实这里调用的tryAcquireShared方法,就是我们上面提到的java.util.concurrent.CountDownLatch.Sync.tryAcquireShared(int)这个方法。

    当然,在一开始我们没有调用过countDownLatch.countDown()方法时,这里tryAcquireShared方法肯定是会返回1的,因为会进入到doAcquireSharedInterruptibly方法。

    五、doAcquireSharedInterruptibly(int arg)


    这个时候,我们应该对于countDownLatch.await()方法是怎么“阻塞”当前线程的,已经非常明白了。其实说白了,就是当你调用了countDownLatch.await()方法后,你当前线程就会进入了一个死循环当中,在这个死循环里面,会不断的进行判断,通过调用tryAcquireShared方法,不断判断我们上面说的那个计数器,看看它的值是否为0了(为0的时候,其实就是我们调用了足够多次数的countDownLatch.countDown()方法的时候),如果是为0的话,tryAcquireShared就会返回1,代码也会进入到图中的红框部分,然后跳出了循环,也就不再“阻塞”当前线程了。需要注意的是,说是在不停的循环,其实也并非在不停的执行for循环里面的内容,因为在后面调用parkAndCheckInterrupt()方法时,在这个方法里面是会调用 LockSupport.park(this);,来禁用当前线程的。

    五、关于AbstractQueuedSynchronizer

    看到这里,如果各位对AbstractQueuedSynchronizer没有了解过的话,可能代码还是看得有点迷糊,这是一篇我觉得解释得非常好的文章,大家可以看一下:http://ifeve.com/introduce-abstractqueuedsynchronizer/

    展开全文
  • CountDownLatch实现原理及使用

    万次阅读 多人点赞 2019-03-26 11:46:10
    1.CountDownLatch工作原理 CountDownLatch在多线程并发编程中充当一个计时器的功能,并且维护一个count的变量,并且其操作都是原子操作,该类主要通过countDown()和await()两个方法实现功能的,首先通过建立...

    1.CountDownLatch工作原理

            CountDownLatch在多线程并发编程中充当一个计时器的功能,并且维护一个count的变量,并且其操作都是原子操作,该类主要通过countDown()和await()两个方法实现功能的,首先通过建立CountDownLatch对象,并且传入参数即为count初始值。如果一个线程调用了await()方法,那么这个线程便进入阻塞状态,并进入阻塞队列。如果一个线程调用了countDown()方法,则会使count-1;当count的值为0时,这时候阻塞队列中调用await()方法的线程便会逐个被唤醒,从而进入后续的操作。比如下面的例子就是有两个操作,一个是读操作一个是写操作,现在规定必须进行完写操作才能进行读操作。所以当最开始调用读操作时,需要用await()方法使其阻塞,当写操作结束时,则需要使count等于0。因此count的初始值可以定为写操作的记录数,这样便可以使得进行完写操作,然后进行读操作。

    1. 首先是创建实例 CountDownLatch countDown = new CountDownLatch(2)
    2. 需要同步的线程执行完之后,计数-1; countDown.countDown()
    3. 需要等待其他线程执行完毕之后,再运行的线程,调用 countDown.await()实现阻塞同步

    2. 应用场景

    前面给了一个demo演示如何用,那这个东西在实际的业务场景中是否会用到呢?

    因为确实在一个业务场景中使用到了,不然也就不会单独捞出这一节...

    电商的详情页,由众多的数据拼装组成,如可以分成一下几个模块

    • 交易的收发货地址,销量
    • 商品的基本信息(标题,图文详情之类的)
    • 推荐的商品列表
    • 评价的内容
    • ....

    上面的几个模块信息,都是从不同的服务获取信息,且彼此没啥关联;所以为了提高响应,完全可以做成并发获取数据,如

    • 线程1获取交易相关数据
    • 线程2获取商品基本信息
    • 线程3获取推荐的信息
    • 线程4获取评价信息
    • ....

    但是最终拼装数据并返回给前端,需要等到上面的所有信息都获取完毕之后,才能返回,这个场景就非常的适合 CountDownLatch来做了

    1. 在拼装完整数据的线程中调用 CountDownLatch#await(long, TimeUnit) 等待所有的模块信息返回
    2. 每个模块信息的获取,由一个独立的线程执行;执行完毕之后调用 CountDownLatch#countDown() 进行计数-1
       

    3.代码演示

    package cn.day13;
    
    import java.util.concurrent.CountDownLatch;
    
    public class Test {
    
    	public static void main(String[] args) {
    		// TODO Auto-generated method stub
    		final CountDownLatch latch = new CountDownLatch(2);
    
    		new Thread() {
    			public void run() {
    				try {
    					System.out.println("子线程" + Thread.currentThread().getName()
    							+ "正在执行");
    					Thread.sleep(3000);
    					System.out.println("子线程" + Thread.currentThread().getName()
    							+ "执行完毕");
    					latch.countDown();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			};
    		}.start();
    
    		new Thread() {
    			public void run() {
    				try {
    					System.out.println("子线程" + Thread.currentThread().getName()
    							+ "正在执行");
    					Thread.sleep(3000);
    					System.out.println("子线程" + Thread.currentThread().getName()
    							+ "执行完毕");
    					latch.countDown();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			};
    		}.start();
    
    		try {
    			System.out.println("等待2个子线程执行完毕...");
    			latch.await();
    			System.out.println("2个子线程已经执行完毕");
    			System.out.println("继续执行主线程");
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	}
    
    }
    

    运行结果:

    子线程Thread-0正在执行
    等待2个子线程执行完毕...
    子线程Thread-1正在执行
    子线程Thread-0执行完毕
    子线程Thread-1执行完毕
    2个子线程已经执行完毕
    继续执行主线程
    

    代码二

    public class CountDownLatchDemo {
        private CountDownLatch countDownLatch;
    
        private int start = 10;
        private int mid = 100;
        private int end = 200;
    
        private volatile int tmpRes1, tmpRes2;
    
        private int add(int start, int end) {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
    
    
        private int sum(int a, int b) {
            return a + b;
        }
    
        public void calculate() {
            countDownLatch = new CountDownLatch(2);
    
            Thread thread1 = new Thread(() -> {
                try {
                    // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    tmpRes1 = add(start, mid);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + tmpRes1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "线程1");
    
            Thread thread2 = new Thread(() -> {
                try {
                    // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    tmpRes2 = add(mid + 1, end);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + tmpRes2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "线程2");
    
    
            Thread thread3 = new Thread(()-> {
                try {
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    countDownLatch.await();
                    int ans = sum(tmpRes1, tmpRes2);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + ans);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程3");
    
            thread3.start();
            thread1.start();
            thread2.start();
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            CountDownLatchDemo demo = new CountDownLatchDemo();
            demo.calculate();
    
            Thread.sleep(1000);
        }
    }

    运行结果

    线程3 : 开始执行
    线程1 : 开始执行
    线程2 : 开始执行
    线程1 : calculate ans: 5005
    线程2 : calculate ans: 15050
    线程3 : calculate ans: 20055

     

    展开全文
  • CountDownLatch用于主线程等待工作线程完成工作,注意,这里与pthread_join不一样: pthread_join是只要线程active就会阻塞,线程结束就会返回.一般用于主线程回收工作线程. CountDownLatch可以保证工作线程的任务...

    本文转载https://blog.csdn.net/zhangxiao93/article/details/72677207

    CountDownLatch用于主线程等待工作线程完成工作,注意,这里与pthread_join不一样:

    pthread_join是只要线程active就会阻塞,线程结束就会返回.一般用于主线程回收工作线程. 
    CountDownLatch可以保证工作线程的任务执行完毕,主线程再对工作线程进行回收

    1. 原理
    CountDownLatch,本质上来说,是一个thread safe的计数器,用于主线程和工作线程的同步. 
    我所知道的用法有两种:

    第一种:在初始化时,需要指定主线程需要等待的任务的个数(count),当工作线程完成 Task Callback后对计数器减1,而主线程通过wait()调用阻塞等待技术器减到0为止.

    第二种:初始化计数器值为1,在程序结尾将创建一个线程执行countDown操作并wait()当程序执行到最后会阻塞直到计数器减为0,这可以保证线程池中的线程都start了线程池对象才完成析够,这是一个坑,我在实现ThreadPool的过程中遇到过

    2. 实现
    CountDownLatch是一个Thread Safe的Couter,它支持的方法主要是两个countDown()和wait() 
    countDown就是对counter原子的执行减1操作 
    wait就使用条件变量等待counter减到0然后notify.

    3.代码
    3.1 CountDownLatch.h
     

    #ifndef __COUNTDOWNLATCH_H__
    #define __COUNTDOWNLATCH_H__
    #include "MutexLock.h"
    #include "Condition.h"
    class CountDownLatch : cl::NonCopy
    {
    public:
        explicit CountDownLatch(int count)
            : count_(count)
            , mutex_()
            , condition_(mutex_)
        {
        }
    
        void wait()
        {
            LockGuard<Mutex> lock(mutex_);
            while(count_ > 0)
            {
                condition_.wait();
            }
        }
    
        void countDown()
        {
            LockGuard<Mutex> lock(mutex_);
            --count_;
            if(count_ == 0)
            {
                condition_.notify_all();
            }
        }
    
        int getCount() const
        {
            LockGuard<Mutex> lock(mutex_);
            return count_;
        }
    
    private:
        int              count_;
        mutable Mutex    mutex_;
        Condition        condition_;
    };
    
    #endif
    ————————————————
    版权声明:本文为CSDN博主「NearXDU」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/zhangxiao93/article/details/72677207

    使用相类似回环栅栏处理

    explicit CyclicBarrier(int parties, CyclicBarrier::CyclicBarrierCallBack cb = NULL)
            : parties_(parties)
            , count_(parties)
            , mutex_()
            , condition_(mutex_)
            , cb_(cb)
        {
        }
    
        void reset()
        {
            LockGuard<Mutex> lock(mutex_);
            nextGeneration();
        }
        
        int wait()
        {
            return dowait(false, 0);
        }
        
        /// Returns the number of parties required to trip this barrier.
        int getParties() const
        {
            return parties_;
        }
        
        /// Returns the number of parties currently waiting at the barrier.
        /// This method is primarily useful for debugging and assertions.
        int getNumberWaiting()
        {
            LockGuard<Mutex> lock(mutex_);
            return parties_ - count_;
        }
        
    private:  
        void nextGeneration()
        {
            // signal completion of last generation
            condition_.notify_all();
            // set up next generation
            count_ = parties_;
        }
        
        int dowait(bool timed, int millisecond)
        {
            LockGuard<Mutex> lock(mutex_);
            int index = --count_;
            if (index == 0) // tripped
            {
                if(cb_)
                {
                    cb_();
                }
                nextGeneration();
                return 0;         
            }
            
            // loop until tripped, broken, interrupted, or timed out
            for (;;)
            {
                if (!timed)
                    condition_.wait();
                else if (millisecond > 0)
                    condition_.timed_wait(millisecond);
                    
                return index;
            }
        } 

     

    展开全文
  • CountDownLatch实现原理及使用姿势 在并发编程的场景中,最常见的一个case是某个任务的执行,需要等到多个线程都执行完毕之后才可以进行,CountDownLatch可以很好解决这个问题 下面将主要从使用和实现原理两...

    CountDownLatch实现原理及使用姿势

    在并发编程的场景中,最常见的一个case是某个任务的执行,需要等到多个线程都执行完毕之后才可以进行,CountDownLatch可以很好解决这个问题

    下面将主要从使用和实现原理两个方面进行说明,围绕点如下

    1. CountDownLatch 是个什么鬼
    2. 怎么用(结合case说明)
    3. 底层实现原理(及如何保障功能的正常性)

    I. 使用说明

    同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待

    比较有意思的是,CountDownLatch并未继承自其他的类or接口,在jdk中这样的类并不多见(多半是我孤陋寡闻)

    0. 接口定义

    在使用之前,得先了解下其定义的几个方法

    // 构造器,必须指定一个大于零的计数
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    
    // 线程阻塞,直到计数为0的时候唤醒;可以响应线程中断退出阻塞
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    
    // 线程阻塞一段时间,如果计数依然不是0,则返回false;否则返回true
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }
    
    // 计数-1
    public void countDown() {
        sync.releaseShared(1);
    }
    
    // 获取计数
    public long getCount() {
        return sync.getCount();
    }
    

    也就几个接口,基本上都是比较常见的了,需要注意的是不要把 await()Object#wait()方法弄混了,否则就gg思密达了...

    1. Demo演示

    依然以讲解 ReentrantLock中的例子来说明,多线程实现累加

    线程1实现 10加到100
    线程2实现 100加到200
    线程3实现 线程1和线程2计算结果的和
    

    实现如下

    public class CountDownLatchDemo {
        private CountDownLatch countDownLatch;
    
        private int start = 10;
        private int mid = 100;
        private int end = 200;
    
        private volatile int tmpRes1, tmpRes2;
    
        private int add(int start, int end) {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
    
    
        private int sum(int a, int b) {
            return a + b;
        }
    
        public void calculate() {
            countDownLatch = new CountDownLatch(2);
    
            Thread thread1 = new Thread(() -> {
                try {
                    // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    tmpRes1 = add(start, mid);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + tmpRes1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "线程1");
    
            Thread thread2 = new Thread(() -> {
                try {
                    // 确保线程3先与1,2执行,由于countDownLatch计数不为0而阻塞
                    Thread.sleep(100);
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    tmpRes2 = add(mid + 1, end);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + tmpRes2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            }, "线程2");
    
    
            Thread thread3 = new Thread(()-> {
                try {
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    countDownLatch.await();
                    int ans = sum(tmpRes1, tmpRes2);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + ans);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程3");
    
            thread3.start();
            thread1.start();
            thread2.start();
        }
    
    
        public static void main(String[] args) throws InterruptedException {
            CountDownLatchDemo demo = new CountDownLatchDemo();
            demo.calculate();
    
            Thread.sleep(1000);
        }
    }
    

    输出

    线程3 : 开始执行
    线程1 : 开始执行
    线程2 : 开始执行
    线程1 : calculate ans: 5005
    线程2 : calculate ans: 15050
    线程3 : calculate ans: 20055
    

    看了上面的定义和Demo之后,使用就会简单一点了,一般流程如

    1. 首先是创建实例 CountDownLatch countDown = new CountDownLatch(2)
    2. 需要同步的线程执行完之后,计数-1; countDown.countDown()
    3. 需要等待其他线程执行完毕之后,再运行的线程,调用 countDown.await()实现阻塞同步

    注意

    • 在创建实例是,必须指定初始的计数值,且应大于0
    • 必须有线程中显示的调用了countDown()计数-1方法;必须有线程显示调用了 await()方法(没有这个就没有必要使用CountDownLatch了)
    • 由于await()方法会阻塞到计数为0,如果在代码逻辑中某个线程漏掉了计数-1,导致最终计数一直大于0,直接导致死锁了
    • 鉴于上面一点,更多的推荐 await(long, TimeUnit)来替代直接使用await()方法,至少不会造成阻塞死只能重启的情况

    有兴趣的小伙伴可以对比下这个实现与 《Java并发学习之ReentrantLock的工作原理及使用姿势》中的demo,明显感觉使用CountDownLatch优雅得多(后面有机会介绍用更有意思的Fork/Join来实现累加)

    2. 应用场景

    前面给了一个demo演示如何用,那这个东西在实际的业务场景中是否会用到呢?

    因为确实在一个业务场景中使用到了,不然也就不会单独捞出这一节...

    电商的详情页,由众多的数据拼装组成,如可以分成一下几个模块

    • 交易的收发货地址,销量
    • 商品的基本信息(标题,图文详情之类的)
    • 推荐的商品列表
    • 评价的内容
    • ....

    上面的几个模块信息,都是从不同的服务获取信息,且彼此没啥关联;所以为了提高响应,完全可以做成并发获取数据,如

    • 线程1获取交易相关数据
    • 线程2获取商品基本信息
    • 线程3获取推荐的信息
    • 线程4获取评价信息
    • ....

    但是最终拼装数据并返回给前端,需要等到上面的所有信息都获取完毕之后,才能返回,这个场景就非常的适合 CountDownLatch来做了

    1. 在拼装完整数据的线程中调用 CountDownLatch#await(long, TimeUnit) 等待所有的模块信息返回
    2. 每个模块信息的获取,由一个独立的线程执行;执行完毕之后调用 CountDownLatch#countDown() 进行计数-1

    II. 实现原理

    同ReentrantLock一样,依然是借助AQS的双端队列,来实现原子的计数-1,线程阻塞和唤醒

    前面《Java并发学习之ReentrantLock的工作原理及使用姿势》 介绍了AQS的结构,方便查看,下面直接贴出

    0. AbstractQueuedSynchronizer (简称AQS)

    AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题

    AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus

    private transient volatile Node head;
    
    private transient volatile Node tail;
    
    private volatile int state;
    
    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
    
        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
    
        //取值为 CANCELLED, SIGNAL, CONDITION, PROPAGATE 之一
        volatile int waitStatus;
    
        volatile Node prev;
    
        volatile Node next;
    
        // Link to next node waiting on condition, 
        // or the special value SHARED
        volatile Thread thread;
    
        Node nextWaiter;
    }
    

    1. 计数器的初始化

    CountDownLatch内部实现了AQS,并覆盖了tryAcquireShared()tryReleaseShared()两个方法,下面说明干嘛用的

    通过前面的使用,清楚了计数器的构造必须指定计数值,这个直接初始化了 AQS内部的state变量

    Sync(int count) {
        setState(count);
    }
    

    后续的计数-1/判断是否可用都是基于sate进行的

    2. countDown() 计数-1的实现

    // 计数-1
    public void countDown() {
        sync.releaseShared(1);
    }
    
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 首先尝试释放锁
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0) //如果计数已经为0,则返回失败
                return false;
            int nextc = c-1;
            // 原子操作实现计数-1
            if (compareAndSetState(c, nextc)) 
                return nextc == 0;
        }
    }
    
    // 唤醒被阻塞的线程
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) { // 队列非空,表示有线程被阻塞
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { 
                // 头结点如果为SIGNAL,则唤醒头结点下个节点上关联的线程,并出队
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head) // 没有线程被阻塞,直接跳出
                break;
        }
    }
    

    上面截出计数减1的完整调用链

    1. 尝试释放锁tryReleaseShared,实现计数-1
    • 若计数已经小于0,则直接返回false
    • 否则执行计数(AQS的state)减一
    • 若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
    1. 释放并唤醒阻塞线程 doReleaseShared
    • 如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
    • 头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队

    疑问一: 看到这个实现,是不是只要countDownLatch的计数为0了,所有被阻塞的线程都会被执行?

    改下上面的demo,新增线程4,实现线程2的结果-线程1的结果

    public class CountDownLatchDemo {
        
        // ...省略重复
        
        private int sub(int a, int b) {
            return a - b;
        }
    
        public void calculate() {
            countDownLatch = new CountDownLatch(2);
    
            Thread thread1 = // ... ;
            Thread thread2 = // ...;
            
            Thread thread3 = new Thread(()-> {
                try {
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + " : 唤醒");
                    Thread.sleep(100); // 确保线程4先执行完相减
                    int ans = sum(tmpRes1, tmpRes2);
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + ans);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程3");
    
            Thread thread4 = new Thread(()-> {
                try {
                    System.out.println(Thread.currentThread().getName() + " : 开始执行");
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + " : 唤醒");
                    int ans = sub(tmpRes2, tmpRes1);
                    Thread.sleep(200); // 保证线程3先输出执行结果,以验证线程3和线程4是否并发执行
                    System.out.println(Thread.currentThread().getName() +
                            " : calculate ans: " + ans);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程4");
            
            thread3.start();
            thread4.start();
            thread1.start();
            thread2.start();
        }
    
        public static void main(String[] args) throws InterruptedException {
            CountDownLatchDemo demo = new CountDownLatchDemo();
            demo.calculate();
    
            Thread.sleep(1000);
        }
    }
    

    输出如下

    线程4 : 开始执行
    线程3 : 开始执行
    线程2 : 开始执行
    线程2 : calculate ans: 15050
    线程1 : 开始执行
    线程1 : calculate ans: 5005
    线程3 : 唤醒
    线程4 : 唤醒
    线程3 : calculate ans: 20055
    线程4 : calculate ans: 10045
    

    上面的实现中,线程3中sleep一段时间,确保线程4的计算会优先执行;线程4计算完成之后的sleep时间,以保证线程3计算完成并输出结果,然后线程4才输出结果;结合输出,这个期望是准确的,也就是说,线程3和线程4被唤醒后是并发执行的,没有先后阻塞顺序

    即CountDownLatch计数为0之后,所有被阻塞的线程都会被唤醒,且彼此相对独立,不会出现独占锁阻塞的问题

    3. await() 阻塞等待计数为0

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
        
    
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) // 若线程中端,直接抛异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    
    // 计数为0时,表示获取锁成功
    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    // 阻塞,并入队
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 入队
        boolean failed = true;
        try {
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 获取锁成功,设置队列头为node节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) // 线程挂起
                  && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    阻塞的逻辑相对简单

    1. 判断state计数是否为0,不是,则直接放过执行后面的代码
    2. 大于0,则表示需要阻塞等待计数为0
    3. 当前线程封装Node对象,进入阻塞队列
    4. 然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码

    III. 小结

    1. 使用注意

    • 在创建实例时,必须指定初始的计数值,且应大于0
    • 必须有线程中显示的调用了countDown()计数-1方法;必须有线程显示调用了await()方法(没有这个就没有必要使用CountDownLatch了)
    • 由于await()方法会阻塞到计数为0,如果在代码逻辑中某个线程漏掉了计数-1,导致最终计数一直大于0,直接导致死锁了;
    • 鉴于上面一点,更多的推荐 await(long, TimeUnit)来替代直接使用await()方法,至少不会造成阻塞死只能重启的情况
    • 允许多个线程调用await方法,当计数为0后,所有被阻塞的线程都会被唤醒

    2. 实现原理

    await内部实现流程:

    1. 判断state计数是否为0,不是,则直接放过执行后面的代码
    2. 大于0,则表示需要阻塞等待计数为0
    3. 当前线程封装Node对象,进入阻塞队列
    4. 然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码

    countDown内部实现流程:

    1. 尝试释放锁tryReleaseShared,实现计数-1
    • 若计数已经小于0,则直接返回false
    • 否则执行计数(AQS的state)减一
    • 若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
    1. 释放并唤醒阻塞线程 doReleaseShared
    • 如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
    • 头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队

    扫描关注,java分享

    https://static.oschina.net/uploads/img/201710/13203703_6IVg.jpg

    转载于:https://my.oschina.net/u/566591/blog/1560140

    展开全文
  • 【多线程】CountDownLatch实现原理

    千次阅读 2020-05-15 14:52:56
    CountDownLatch是多线程中一个比较重要的概念,它可以使得一个或多个线程等待其他线程执行完毕之后再执行。它内部有一个计数器和一个阻塞队列,每当一个线程调用countDown()方法后,计数器的值减少1。当计数器的值不...
  • CountDownLatch实现原理

    千次阅读 2019-06-16 16:40:57
    在日常开发中经常会遇到需要在主线任务中开启多个线程,并等到所有子线程完成工作之后再进行汇总的情形,这种情况下可以考虑使用CountDownLatch这个工具类实现功能,下面是使用CountDownLatch的一个示例: ...
  • CountDownLatch实现原理及使用姿势在并发编程的场景中,最常见的一个case是某个任务的执行,需要等到多个线程都执行完毕之后才可以进行,CountDownLatch可以很好解决这个问题下面将主要从使用和实现原理两个方面进行...
  • java并发学习-CountDownLatch实现原理

    千次阅读 2020-04-08 22:42:09
    CountDownLatch在多线程并发编程中充当一个...该类主要通过countDown()和await()两个方法实现功能的,首先通过建立CountDownLatch对象,并且传入参数即为count初始值。如果一个线程调用了await()方法,那么这个线...
  • CountDownLatch是基于AQS实现的一种同步器,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。CountDownLatch使用给定的计数(同步状态)进行初始化,线程调用await()方法,如果同步状态不为0,调用...
  • CountDownLatch实现原理 1.CountDownLatch在构造时传递的参数int值用来初始化 AQS的state值(其实是AQS的实现类Sync) 2.一旦有一个线程调用await()就要去获取锁tryAcquireSharedNanos(), state不为0就获取不到. 所以...
  • 该类主要通过countDown()和await()两个方法实现功能的,首先通过建立CountDownLatch对象,并且传入参数即为count初始值。 如果一个线程调用了await()方法,那么这个线程便进入阻塞状态,并进入阻塞队列。 如果一个...
  • CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。 CountDownLatch使用示例: ...
  • CountDownLatch是一个用来控制并发的很常见的工具,它允许一个或者多个线程等待其他的线程执行完其操作。比如我需要统计多篇文章中出现不同单词的数量,我们会为每篇文章分配一个线程进行统计,统计完成之后,会保存...
  • 实现原理使用CountDownLatch(int count)构建CountDownLatch实例,将count参数赋值给内部计数器state,调用await()方法阻塞当前线程,并将当前线程封装加入到等待队列中,直到state等于零或当前线程被中断;...
  • 这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。 与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线...
  • 上一篇通过研究ReentrantLock分析了AQS的独占功能,本文将...CountDownLatch是同步工具类之一,可以指定一个计数值,在并发环境下由线程进行减1操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒,实现线程间...
  • CountDownLatch的使用 CountDownLatch是同步工具类之一,可以指定一个计数值,在并发环境下由线程进行减1操作,当计数值变为0之后,被await方法阻塞的线程将会唤醒...
  • 在说CountDownLatch前,必须要先提一下AQS。AQS全称抽象队列同步器(AbstractQuenedSynchronizer),它是一个可以用来实现线程同步的基础框架。当然,它不是我们理解的Spring这种框架,它是一个类,类名就是...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 20,873
精华内容 8,349
关键字:

countdownlatch实现原理