精华内容
下载资源
问答
  • 使用示例 下面这个例子非常实用: // 这是一个关于缓存操作的故事 class CachedData { Object data;... // 读写实例 final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void pr...

    使用示例

    下面这个例子非常实用:

    // 这是一个关于缓存操作的故事
    class CachedData {
        Object data;
        volatile boolean cacheValid;
        // 读写锁实例
        final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
        void processCachedData() {
            // 获取读锁
            rwl.readLock().lock();
            if (!cacheValid) { // 如果缓存过期了,或者为 null
                // 释放掉读锁,然后获取写锁 (后面会看到,没释放掉读锁就获取写锁,会发生死锁情况)
                rwl.readLock().unlock();
                rwl.writeLock().lock();
    
                try {
                    if (!cacheValid) { // 重新判断,因为在等待写锁的过程中,可能前面有其他写线程执行过了
                        data = ...
                        cacheValid = true;
                    }
                    // 获取读锁 (持有写锁的情况下,是允许获取读锁的,称为 “锁降级”,反之不行。)
                    rwl.readLock().lock();
                } finally {
                    // 释放写锁,此时还剩一个读锁
                    rwl.writeLock().unlock(); // Unlock write, still hold read
                }
            }
    
            try {
                use(data);
            } finally {
                // 释放读锁
                rwl.readLock().unlock();
            }
        }
    }

    ReentrantReadWriteLock 分为读锁和写锁两个实例,读锁是共享锁,可被多个线程同时使用,写锁是独占锁。持有写锁的线程可以继续获取读锁,反之不行。

     

    ReentrantReadWriteLock 总览

    这一节比较重要,我们要先看清楚 ReentrantReadWriteLock 的大框架,然后再到源码细节。

    首先,我们来看下 ReentrantReadWriteLock 的结构,它有好些嵌套类:

     大家先仔细看看这张图中的信息。然后我们把 ReadLock 和 WriteLock 的代码提出来一起看,清晰一些:

    很清楚了,ReadLock 和 WriteLock 中的方法都是通过 Sync 这个类来实现的。Sync 是 AQS 的子类,然后再派生了公平模式和不公平模式。

    从它们调用的 Sync 方法,我们可以看到: ReadLock 使用了共享模式,WriteLock 使用了独占模式

    等等,同一个 AQS 实例怎么可以同时使用共享模式和独占模式???

    这里给大家回顾下 AQS,我们横向对比下 AQS 的共享模式和独占模式:

    AQS 的精髓在于内部的属性 state

    1. 对于独占模式来说,通常就是 0 代表可获取锁,1 代表锁被别人获取了,重入例外
    2. 而共享模式下,每个线程都可以对 state 进行加减操作

    也就是说,独占模式和共享模式对于 state 的操作完全不一样,那读写锁 ReentrantReadWriteLock 中是怎么使用 state 的呢?答案是将 state 这个 32 位的 int 值分为高 16 位和低 16位,分别用于共享模式和独占模式

     

    源码分析

    有了前面的概念,大家心里应该都有数了吧,下面就不再那么啰嗦了,直接代码分析。

    源代码加注释 1500 行,并不算难,我们要看的代码量不大。如果你前面一节都理解了,那么直接从头开始一行一行往下看就是了,还是比较简单的。

    ReentrantReadWriteLock 的前面几行很简单,我们往下滑到 Sync 类,先来看下它的所有的属性:

    abstract static class Sync extends AbstractQueuedSynchronizer {
        // 下面这块说的就是将 state 一分为二,高 16 位用于共享模式,低16位用于独占模式
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
        // 取 c 的高 16 位值,代表读锁的获取次数(包括重入)
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        // 取 c 的低 16 位值,代表写锁的重入次数,因为写锁是独占模式
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
        // 这个嵌套类的实例用来记录每个线程持有的读锁数量(读锁重入)
        static final class HoldCounter {
            // 持有的读锁数
            int count = 0;
            // 线程 id
            final long tid = getThreadId(Thread.currentThread());
        }
    
        // ThreadLocal 的子类
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
        /**
          * 组合使用上面两个类,用一个 ThreadLocal 来记录当前线程持有的读锁数量
          */ 
        private transient ThreadLocalHoldCounter readHolds;
    
        // 用于缓存,记录"最后一个获取读锁的线程"的读锁重入次数,
        // 所以不管哪个线程获取到读锁后,就把这个值占为已用,这样就不用到 ThreadLocal 中查询 map 了
        // 算不上理论的依据:通常读锁的获取很快就会伴随着释放,
        //   显然,在 获取->释放 读锁这段时间,如果没有其他线程获取读锁的话,此缓存就能帮助提高性能
        private transient HoldCounter cachedHoldCounter;
    
        // 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
    
        Sync() {
            // 初始化 readHolds 这个 ThreadLocal 属性
            readHolds = new ThreadLocalHoldCounter();
            // 为了保证 readHolds 的内存可见性
            setState(getState()); // ensures visibility of readHolds
        }
        ...
    }
    1. state 的高 16 位代表读锁的获取次数,包括重入次数,获取到读锁一次加 1,释放掉读锁一次减 1
    2. state 的低 16 位代表写锁的获取次数,因为写锁是独占锁,同时只能被一个线程获得,所以它代表重入次数
    3. 每个线程都需要维护自己的 HoldCounter,记录该线程获取的读锁次数,这样才能知道到底是不是读锁重入,用 ThreadLocal 属性 readHolds 维护
    4. cachedHoldCounter 有什么用?其实没什么用,但能提示性能。将最后一次获取读锁的线程的 HoldCounter 缓存到这里,这样比使用 ThreadLocal 性能要好一些,因为 ThreadLocal 内部是基于 map 来查询的。但是 cachedHoldCounter 这一个属性毕竟只能缓存一个线程,所以它要起提升性能作用的依据就是:通常读锁的获取紧随着就是该读锁的释放。我这里可能表达不太好,但是大家应该是懂的吧。
    5. firstReader 和 firstReaderHoldCount 有什么用?其实也没什么用,但是它也能提示性能。将"第一个"获取读锁的线程记录在 firstReader 属性中,这里的第一个不是全局的概念,等这个 firstReader 当前代表的线程释放掉读锁以后,会有后来的线程占用这个属性的。firstReader 和 firstReaderHoldCount 使得在读锁不产生竞争的情况下,记录读锁重入次数非常方便快速
    6. 如果一个线程使用了 firstReader,那么它就不需要占用 cachedHoldCounter
    7. 个人认为,读写锁源码中最让初学者头疼的就是这几个用于提升性能的属性了,使得大家看得云里雾里的。主要是因为 ThreadLocal 内部是通过一个 ThreadLocalMap 来操作的,会增加检索时间。而很多场景下,执行 unlock 的线程往往就是刚刚最后一次执行 lock 的线程,中间可能没有其他线程进行 lock。还有就是很多不怎么会发生读锁竞争的场景。

    上面说了这么多,是希望能帮大家降低后面阅读源码的压力,大家也可以先看看后面的,然后再慢慢体会。

    前面我们好像都只说读锁,完全没提到写锁,主要是因为写锁真的是简单很多,我也特地将写锁的源码放到了后面,我们先啃下最难的读锁先。

     

    读锁获取

    下面我就不一行一行按源码顺序说了,我们按照使用来说。

    我们来看下读锁 ReadLock 的 lock 流程:

    // ReadLock
    public void lock() {
        sync.acquireShared(1);
    }
    
    
    // AQS
    /**
         * Acquires in shared mode, ignoring interrupts.  Implemented by
         * first invoking at least once {@link #tryAcquireShared},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquireShared} until success.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquireShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         */
        //该方法用来获取读锁,忽略中断。实现原理:首先至少调用一次tryAcquireShared(),如果有一次
        //返回成功表示成功获取到锁,流程结束;返回失败则将当前线程阻塞,在此过程中可能会重复阻塞
        //和解除阻塞,不考虑中断的情况下会一直调用tryAcquireShared()直到成功获取锁。
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)    //@1
                doAcquireShared(arg);           //@2
        }

    然后我们就会进到 Sync 类的 tryAcquireShared 方法:

    在 AQS 中,如果 tryAcquireShared(arg) 方法返回值小于 0 代表没有获取到共享锁(读锁),大于 0 代表获取到

    回顾 AQS 共享模式:tryAcquireShared 方法不仅仅在 acquireShared 的最开始被使用,这里是 try,也就可能会失败,如果失败的话,执行后面的 doAcquireShared,进入到阻塞队列,然后等待前驱节点唤醒。唤醒以后,还是会调用 tryAcquireShared 进行获取共享锁的。当然,唤醒以后再 try 是很容易获得锁的,因为这个节点已经排了很久的队了,组织是会照顾它的。

    所以,你在看下面这段代码的时候,要想象到两种获取读锁的场景,一种是新来的,一种是排队排到它的。

    protected final int tryAcquireShared(int unused) {
        /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                //主要步骤:
                //1. 如果写锁被其他线程持有,直接返回失败。
                //2. 否则先判断本次获取读锁的线程是否会因为不符合排队策略被阻塞。如果符合,继续
                //判断是否达到了读锁最大持有数量,都没问题的话尝试用CAS操作更新state,成功则
                //代表获取锁成功,接下来就是更新前文介绍的那4个变量。注意这一步没有判断重入,
                //重入判断放到fullTryAcquireShared()中了,这也是一个优化策略,因为大部分情况
                //下都是非重入的,是否重入需要根据readHolds中数据来判断,效率较差。
                //3. 如果步骤2失败,不管是因为排队策略阻塞,还是CAS操作失败,还是达到最大持有
                //数量,继续进入fullTryAcquireShared()中尝试获取锁。    
    
        Thread current = Thread.currentThread();
        int c = getState();
    
        // exclusiveCount(c) 不等于 0,说明有线程持有写锁,
        //    而且不是当前线程持有写锁,那么当前线程获取读锁失败
        //         (另,如果持有写锁的是当前线程,是可以继续获取读锁的)
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
    
        // 读锁的获取次数
        int r = sharedCount(c);
    
        // 读锁获取是否需要被阻塞,稍后细说。为了进去下面的分支,假设这里不阻塞就好了
        if (!readerShouldBlock() &&
            // 判断是否会溢出 (2^16-1,没那么容易溢出的)
            r < MAX_COUNT &&
            // 下面这行 CAS 是将 state 属性的高 16 位加 1,低 16 位不变,如果成功就代表获取到了读锁
            compareAndSetState(c, c + SHARED_UNIT)) {
    
            // =======================
            //   进到这里就是获取到了读锁
            // =======================
    
            if (r == 0) {
                // r == 0 说明此线程是第一个获取读锁的,或者说在它前面获取读锁的都走光光了,它也算是第一个吧
                //  记录 firstReader 为当前线程,及其持有的读锁数量:1
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                // 进来这里,说明是 firstReader 重入获取读锁(这非常简单,count 加 1 结束)
                firstReaderHoldCount++;
            } else {
                // 前面我们说了 cachedHoldCounter 用于缓存最后一个获取读锁的线程
                // 如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0) 
                    // 到这里,那么就是 cachedHoldCounter 缓存的是当前线程,但是 count 为 0,
                    // 大家可以思考一下:这里为什么要 set ThreadLocal 呢?(当然,答案肯定不在这块代码中)
                    //   既然 cachedHoldCounter 缓存的是当前线程,
                    //   当前线程肯定调用过 readHolds.get() 进行初始化 ThreadLocal
                    readHolds.set(rh);
    
                // count 加 1
                rh.count++;
            }
            // return 大于 0 的数,代表获取到了共享锁
            return 1;
        }
        // 往下看
        return fullTryAcquireShared(current);
    }

    上面的代码中,要进入 if 分支,需要满足:readerShouldBlock() 返回 false,并且 CAS 要成功(我们先不要纠结 MAX_COUNT 溢出)。

    那我们反向推,怎么样进入到最后的 fullTryAcquireShared:

    • readerShouldBlock() 返回 true,2 种情况:

    (1)在 FairSync 中说的是 hasQueuedPredecessors(),即阻塞队列中有其他元素在等待锁。

    也就是说,公平模式下,有人在排队呢,你新来的不能直接获取锁

    (2)在 NonFairSync 中说的是 apparentlyFirstQueuedIsExclusive(),即判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。看看源码

        final boolean readerShouldBlock() {
                /* As a heuristic to avoid indefinite writer starvation,
                 * block if the thread that momentarily appears to be head
                 * of queue, if one exists, is a waiting writer.  This is
                 * only a probabilistic effect since a new reader will not
                 * block if there is a waiting writer behind other enabled
                 * readers that have not yet drained from the queue.
                 */
                //这里就是一个共享锁获取线程的阻塞策略,防止写线程长时间饥饿
                return apparentlyFirstQueuedIsExclusive();   //该方法,具体又是在 AbstractQueuedSynchronizer中
            }
    /**
         * Returns {@code true} if the apparent first queued thread, if one
         * exists, is waiting in exclusive mode.  If this method returns
         * {@code true}, and the current thread is attempting to acquire in
         * shared mode (that is, this method is invoked from {@link
         * #tryAcquireShared}) then it is guaranteed that the current thread
         * is not the first queued thread.  Used only as a heuristic in
         * ReentrantReadWriteLock.
         */
        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }

    作者给写锁定义了更高的优先级,所以如果碰上获取写锁的线程马上就要获取到锁了,获取读锁的线程不应该和它抢。

    如果 head.next 不是来获取写锁的,那么可以随便抢,因为是非公平模式,大家比比 CAS 速度

    • compareAndSetState(c, c + SHARED_UNIT) 这里 CAS 失败,存在竞争。可能是和另一个读锁获取竞争,当然也可能是和另一个写锁获取操作竞争。

    然后就会来到 fullTryAcquireShared 中再次尝试:

    /**
     * 1. 刚刚我们说了可能是因为 CAS 失败,如果就此返回,那么就要进入到阻塞队列了,
     *    想想有点不甘心,因为都已经满足了 !readerShouldBlock(),也就是说本来可以不用到阻塞队列的,
     *    所以进到这个方法其实是增加 CAS 成功的机会
     * 2. 在 NonFairSync 情况下,虽然 head.next 是获取写锁的,我知道它等待很久了,我没想和它抢,
     *    可是如果我是来重入读锁的,那么只能表示对不起了
     */
    final int fullTryAcquireShared(Thread current) {
        HoldCounter rh = null;
        // 别忘了这外层有个 for 循环
        for (;;) {
            int c = getState();
            // 如果其他线程持有了写锁,自然这次是获取不到读锁了,乖乖到阻塞队列排队吧
            if (exclusiveCount(c) != 0) {
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
            } else if (readerShouldBlock()) {
                /**
                  * 进来这里,说明:
                  *  1. exclusiveCount(c) == 0:写锁没有被占用
                  *  2. readerShouldBlock() 为 true,说明阻塞队列中有其他线程在等待
                  *
                  * 既然 should block,那进来这里是干什么的呢?
                  * 答案:是进来处理读锁重入的!
                  * 
                  */
    
                // firstReader 线程重入读锁,直接到下面的 CAS
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            // cachedHoldCounter 缓存的不是当前线程
                            // 那么到 ThreadLocal 中获取当前线程的 HoldCounter
                            // 如果当前线程从来没有初始化过 ThreadLocal 中的值,get() 会执行初始化
                            rh = readHolds.get();
                            // 如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove
                            // 然后往下两三行,乖乖排队去
                            if (rh.count == 0)
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        // 排队去。
                        return -1;
                }
                /**
                  * 这块代码我看了蛮久才把握好它是干嘛的,原来只需要知道,它是处理重入的就可以了。
                  * 就是为了确保读锁重入操作能成功,而不是被塞到阻塞队列中等待
                  *
                  * 另一个信息就是,这里对于 ThreadLocal 变量 readHolds 的处理:
                  *    如果 get() 后发现 count == 0,居然会做 remove() 操作,
                  *    这行代码对于理解其他代码是有帮助的
                  */
            }
    
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
    
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                // 这里 CAS 成功,那么就意味着成功获取读锁了
                // 下面需要做的是设置 firstReader 或 cachedHoldCounter
    
                if (sharedCount(c) == 0) {
                    // 如果发现 sharedCount(c) 等于 0,就将当前线程设置为 firstReader
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    // 下面这几行,就是将 cachedHoldCounter 设置为当前线程
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh;
                }
                // 返回大于 0 的数,代表获取到了读锁
                return 1;
            }
        }
    }

    firstReader 是每次将读锁获取次数从 0 变为 1 的那个线程。

    能缓存到 firstReader 中就不要缓存到 cachedHoldCounter 中。

    上面的源码分析应该说得非常详细了,如果到这里你不太能看懂上面的有些地方的注释,那么可以先往后看,然后再多看几遍。

    至此,完成尝试获取锁步骤 tryAcquireShared 方法,我们再次回到 acquireShared,如果返回-1,那么需要排队申请,具体请看 doAcquireShared(arg);

       public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)   
                doAcquireShared(arg);      
        }
    /**
         * Acquires in shared uninterruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);   //@1
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { // @2,开始自旋重试
                    final Node p = node.predecessor();   // @3
                    if (p == head) {               // @4
                        int r = tryAcquireShared(arg);         
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);    //@5
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())             // @6
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    队列中获取共享锁解析:

    代码@1,在队列尾部增加一个节点。锁模式为共享模式。
    代码@3,获取该节点的前置节点。
    代码@4,如果该节点的前置节点为head(头部),为什么前置节点是head时,可以再次尝试呢?在讲解ReentrantLock时,也讲过,head节点的初始化在第一次产生锁争用时初始化,刚开始初始化的head节点是不代表线程的,故可以尝试获取锁。如果获取失败,则将进入到shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法中,线程阻塞,等待被唤醒。
    重点分析一下获取锁后的操作:setHeadAndPropagate
     

    /**
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         *
         * @param node the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below 
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0) {   // @1
                Node s = node.next;
                if (s == null || s.isShared())    // @2
                    doReleaseShared();          //@3
            }
        }
     
    /**
         * Sets head of queue to be node, thus dequeuing. Called only by
         * acquire methods.  Also nulls out unused fields for sake of GC
         * and to suppress unnecessary signals and traversals.
         *
         * @param node the node
         */
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
     
    /**
         * Release action for shared mode -- signal successor and ensure
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {   //@4
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {   //@5
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))   //@6
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed      //@7
                    break;
            }
        }

    读锁获取成功释放共享锁的步骤:
    代码@1,如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,或在节点的下个节点也在申请读锁,则在CLH队列中传播下去唤醒线程,怎么理解这个传播呢,就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。具体请看doReleaseShared方法。
    代码@4,从队列的头部开始遍历每一个节点。
    代码@5,如果节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。为什么会设置不成功,可能改节点被取消;还有一种情况就是有多个线程在运行该代码段,这就是PROPAGATE的含义吧,传播,请看代码@7的理解。
    代码@6,如果状态为0,则设置为Node.PROPAGATE,设置为传播,该值然后会在什么时候变化呢?在判断该节点的下一个节点是否需要阻塞时,会判断,如果状态不是Node.SIGNAL或取消状态,为了保险起见,会将前置节点状态设置为Node.SIGNAL,然后再次判断,是否需要阻塞。
    代码@7,如果处理过一次 unparkSuccessor 方法后,头节点没有发生变化,就退出该方法,那head在什么时候会改变呢?当然在是抢占锁成功的时候,head节点代表获取锁的节点。一旦获取锁成功,则又会进入setHeadAndPropagate方法,当然又会触发doReleaseShared方法,传播特性应该就是表现在这里吧。再想一下,同一时间,可以有多个多线程占有锁,那在锁释放时,写锁的释放比较简单,就是从头部节点下的第一个非取消节点,唤醒线程即可,为了在释放读锁的上下文环境中获取代表读锁的线程,将信息存入在 readHolds ThreadLocal变量中。
    到这里为止,读锁的申请就讲解完毕了,先给出如下流程图:

     尝试获取读锁过程:

     从队列中获取读锁的流程如下:

     

    读锁释放

     

    下面我们看看读锁释放的流程:

    // ReadLock
    public void unlock() {
        sync.releaseShared(1);
    }
    // Sync
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); // 这句代码其实唤醒 获取写锁的线程,往下看就知道了
            return true;
        }
        return false;
    }
    
    // Sync
    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        if (firstReader == current) {
            if (firstReaderHoldCount == 1)
                // 如果等于 1,那么这次解锁后就不再持有锁了,把 firstReader 置为 null,给后来的线程用
                // 为什么不顺便设置 firstReaderHoldCount = 0?因为没必要,其他线程使用的时候自己会设值
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
            // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
    
            int count = rh.count;
            if (count <= 1) {
    
                // 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
                readHolds.remove();
    
                if (count <= 0)
                    // 就是那种,lock() 一次,unlock() 好几次的逗比
                    throw unmatchedUnlockException();
            }
            // count 减 1
            --rh.count;
        }
    
        for (;;) {
            int c = getState();
            // nextc 是 state 高 16 位减 1 后的值
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
                // 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
                return nextc == 0;
        }
    }

    读锁释放的过程还是比较简单的,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。

    然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程。

        //AbstractQueuedSynchronizer的doReleaseShared
        /**
         * Release action for shared mode -- signal successor and ensure
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    写锁获取

    1. 写锁是独占锁。
    2. 如果有读锁被占用,写锁获取是要进入到阻塞队列中等待的。
    // WriteLock
    public void lock() {
        sync.acquire(1);
    }
    // AQS
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            // 如果 tryAcquire 失败,那么进入到阻塞队列等待
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // Sync
    protected final boolean tryAcquire(int acquires) {
    
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
    
            // 看下这里返回 false 的情况:
            //   c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
            //   c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
            //   也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
    
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
    
            // 这里不需要 CAS,仔细看就知道了,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了
            setState(c + acquires);
            return true;
        }
    
        // 如果写锁获取不需要 block,那么进行 CAS,成功就代表获取到了写锁
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

    下面看一眼 writerShouldBlock() 的判定,然后你再回去看一篇写锁获取过程。

    static final class NonfairSync extends Sync {
        // 如果是非公平模式,那么 lock 的时候就可以直接用 CAS 去抢锁,抢不到再排队
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        ...
    }
    static final class FairSync extends Sync {
        final boolean writerShouldBlock() {
            // 如果是公平模式,那么如果阻塞队列有线程等待的话,就乖乖去排队
            return hasQueuedPredecessors();
        }
        ...
    }

    写锁释放

    // WriteLock
    public void unlock() {
        sync.release(1);
    }
    
    // AQS
    public final boolean release(int arg) {
        // 1. 释放锁
        if (tryRelease(arg)) {
            // 2. 如果独占锁释放"完全",唤醒后继节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // Sync 
    // 释放锁,是线程安全的,因为写锁是独占锁,具有排他性
    // 实现很简单,state 减 1 就是了
    protected final boolean tryRelease(int releases) {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        int nextc = getState() - releases;
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);
        setState(nextc);
        // 如果 exclusiveCount(nextc) == 0,也就是说包括重入的,所有的写锁都释放了,
        // 那么返回 true,这样会进行唤醒后继节点的操作。
        return free;
    }

    看到这里,是不是发现写锁相对于读锁来说要简单很多。

    锁降级

    Doug Lea 没有说写锁更高级,如果有线程持有读锁,那么写锁获取也需要等待。

    不过从源码中也可以看出,确实会给写锁一些特殊照顾,如非公平模式下,为了提高吞吐量,lock 的时候会先 CAS 竞争一下,能成功就代表读锁获取成功了,但是如果发现 head.next 是获取写锁的线程,就不会去做 CAS 操作。

    Doug Lea 将持有写锁的线程,去获取读锁的过程称为锁降级(Lock downgrading)。这样,此线程就既持有写锁又持有读锁。

    但是,锁升级是不可以的。线程持有读锁的话,在没释放的情况下不能去获取写锁,因为会发生死锁

    回去看下写锁获取的源码:

    protected final boolean tryAcquire(int acquires) {
    
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
            // 看下这里返回 false 的情况:
            //   c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
            //   c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
            //   也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            ...
        }
        ...
    }

    仔细想想,如果线程 a 先获取了读锁,然后获取写锁,那么线程 a 就到阻塞队列休眠了,自己把自己弄休眠了,而且可能之后就没人去唤醒它了。

    总结

     

    本文内容参考自:

    https://www.javadoop.com/post/reentrant-read-write-lock#toc5

    https://blog.csdn.net/prestigeding/article/details/53286756#commentBox

     

    展开全文
  • AQS即是AbstractQueuedSynchronizer,一个用来构建和同步工具的框架,包括常用的ReentrantLock、CountDownLatch、Semaphore等。 AQS没有之类的概念,它有个state变量,是个int类型,在不同场合有着不同含义。...

    什么是AQS

    AQS即是AbstractQueuedSynchronizer,一个用来构建锁和同步工具的框架,包括常用的ReentrantLock、CountDownLatch、Semaphore等。

    AQS没有锁之类的概念,它有个state变量,是个int类型,在不同场合有着不同含义。本文研究的是锁,为了好理解,姑且先把state当成锁。

    AQS围绕state提供两种基本操作“获取”和“释放”,有条双向队列存放阻塞的等待线程,并提供一系列判断和处理方法,简单说几点:

    • state是独占的,还是共享的;
    • state被获取后,其他线程需要等待;
    • state被释放后,唤醒等待线程;
    • 线程等不及时,如何退出等待。

    至于线程是否可以获得state,如何释放state,就不是AQS关心的了,要由子类具体实现。

    直接分析AQS的代码会比较难明白,所以结合子类ReentrantLock来分析。AQS的功能可以分为独占和共享,ReentrantLock实现了独占功能,是本文分析的目标。

    ReentrantLock对比synchronized

    ReentrantLock实现了Lock接口,加锁和解锁都需要显式写出,注意一定要在适当时候unlock。

    和synchronized相比,ReentrantLock用起来会复杂一些。在基本的加锁和解锁上,两者是一样的,所以无特殊情况下,推荐使用synchronized。ReentrantLock的优势在于它更灵活、更强大,除了常规的lock()、unlock()之外,还有lockInterruptibly()、tryLock()方法,支持中断、超时。

    公平锁和非公平锁

    首先看ReentrantLock构造方法:

        /**
         * Creates an instance of {@code ReentrantLock}.
         * This is equivalent to using {@code ReentrantLock(false)}.
         */
        public ReentrantLock() {
            sync = new NonfairSync(); // 默认选择非公平锁
        }
    
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ?
            new FairSync()  // 公平锁
            :
            new NonfairSync(); // 非公平锁
        }
    

    学习AQS的时候,了解到AQS依赖于内部的FIFO同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个Node对象并将其加入到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

    这时,我有了一个疑问,AQS的同步队列是FIFO的,就是先来排队的先走。那怎么实现非公平锁呢?查阅了一些资料,总算知道了。

    首先从公平锁开始看起。

    ReentrantLock 默认采用非公平锁,除非在构造方法中传入参数 true 。

    公平锁的 lock 方法

    static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
        // AbstractQueuedSynchronizer.acquire(int arg)
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    

    我们可以看到,在注释1的位置,有个!hasQueuedPredecessors()条件,意思是说当前同步队列没有前驱节点(也就是没有线程在等待)时才会去compareAndSetState(0, acquires)使用CAS修改同步状态变量。所以就实现了公平锁,根据线程发出请求的顺序获取锁。

    非公平锁的lock方法

    static final class NonfairSync extends Sync {
        final void lock() {
            // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
        // AbstractQueuedSynchronizer.acquire(int arg)
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //3.这里也是直接CAS,没有判断前面是否还有节点。
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    非公平锁的实现在刚进入lock方法时会直接使用一次CAS去尝试获取锁,不成功才会到acquire方法中,如注释2。而在nonfairTryAcquire方法中并没有判断是否有前驱节点在等待,直接CAS尝试获取锁,如注释3。由此实现了非公平锁。

    非公平锁和公平锁的两处不同:

    1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。

    2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

    公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

    相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

    它们均继承了 ReentrantLock.Sync 类, ReentrantLock.Sync 类 又继承了 AbstractQueuedSynchronizer类
    1. ReentrantLock.Sync 类是对一些公共方法的封装。
    2. AbstractQueuedSynchronizer 内部维护着一个由双向链表实现的队列 用来记录等待锁释放的线程,
     

    实际操作中 NonfairSync 的使用比较多 这里针对NonfairSync 进行解析

    加锁操作

    ReentrantLock.NonfairSync

      /**
       * 非公平锁的同步对象
       * Sync object for non-fair locks
       */
      static final class NonfairSync extends Sync {
          private static final long serialVersionUID = 7316153563782823691L;
    
          /**
           * Performs lock.  Try immediate barge, backing up to normal
           * acquire on failure.
           */
          final void lock() {
              if (compareAndSetState(0, 1))
                  setExclusiveOwnerThread(Thread.currentThread());
              else
                  acquire(1);
          }
    
          protected final boolean tryAcquire(int acquires) {
              return nonfairTryAcquire(acquires);
          }
      }
    

       1. 先看lock方法

           

         final void lock() {
                //  通过原子操作 改变上锁状态
                if (compareAndSetState(0, 1)) // 变更成功
                    setExclusiveOwnerThread(Thread.currentThread()); // 设置持有者为当前线程
                else // 变更不成功
                    acquire(1);
            }

    按代码顺序往下看
    1.1 AbstractQueuedSynchronizer#compareAndSetState方法

        private static final Unsafe unsafe = Unsafe.getUnsafe();
    
        static {
            try {
                // 获取偏移量
                stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
                headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
                tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
                waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
                nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
    
            } catch (Exception ex) { throw new Error(ex); }
        }
    
        /**
         *  通过原子操作 改变上锁状态
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that the actual
         *         value was not equal to the expected value.
         */
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this  调用本地方法 实现硬件级别的原子操作 cas
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

    Unsafe :

    •  CAS的核心类 (CAS 比较并交换)
    • 通过本地方法 操作内存 实现 cas
    •  一知半解的情况下 不要操作此类

    1.2 AbstractOwnableSynchronizer#setExclusiveOwnerThread(Thread.currentThread());将当前线程记录为独占锁的线程

    1.3 AbstractQueuedSynchronizer.acquire();

        /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            // 再次尝试上锁 回到了  NonfairSync.tryAcquire 方法, tryAcquire 调用了 Sync.nonfairTryAcquire方法
            if (!tryAcquire(arg) && 
                acquireQueued(
                        addWaiter(Node.EXCLUSIVE), // 链表尾部添加节点
                        arg
                    )
                )
                selfInterrupt();
        }

     

    1.3.1 ReentrantLock.Sync#nonfairTryAcquire:

        /**
             * 判断 reentranLock 状态 是否被锁住(state ?= 0)
             * <p>如果没被锁住尝试 原子性上锁 失败返回false</>
             * <p>如果被锁住 判断是否是当前线程持有锁(重入锁的实现) 如果是 state + 1
             * (信号量  记录该线程持有锁的次数。 该线程每次释放所 信号量 -1。 信号量为零 代表 锁被真正释放)</>
             * <p>else 返回false</p>
             * Performs non-fair tryLock.  tryAcquire is implemented in
             * subclasses, but both need nonfair try for trylock method.
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread(); // 获取当前线程
                int c = getState(); // 获取当锁的状态
                if (c == 0) { // 如果锁已被经释放 再次尝试获取锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) { // 如果当前线程为锁的拥有者
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc); // 累加 state 的值  此段代码 实现了重入锁
                    return true;
                }
                return false;
            }

    获取锁成功分为两种情况,第一个if判断AQS的state是否等于0,表示锁没有人占有。没有的话调用compareAndSetState使用cas的方式修改state,传入的acquires写死是1。最后线程获取锁成功,setExclusiveOwnerThread将线程记录为独占锁的线程。

    第二个if判断当前线程是否为独占锁的线程,因为ReentrantLock是可重入的,线程可以不停地lock来增加state的值,对应地需要unlock来解锁,直到state为零。

    如果 tryAcquire(arg) 返回true,则不会执行acquireQueued,表示成功获取锁,如果tryAcquire(arg) 返回 false,说明没有成功获取锁,则加入请求队列中。接着请看 addWaiter(Node.EXCLUSIVE) 方法。
     

     

    1.3.2 AbstractQueuedSynchronizer#addWaiter代码:

    AQS内部有一条双向的队列存放等待线程,节点是Node对象。每个Node维护了线程、前后Node的指针和等待状态等参数。

    线程在加入队列之前,需要包装进Node,调用方法是addWaiter,addWaiter 中涉及的逻辑,就是 CLH思想的实现,故在 AbstractQueuedSynchronizer中,源码如下:

        /**
         *
         * 把当前线程加入队列 尾部
         *
         * 负责队列初始化
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) { //@1--start 队列尾部不为空
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }//@1--end
            // 列队尾部为空 或者  CAS 操作失败
            enq(node);
            return node;
        }

    每个Node需要标记是独占的还是共享的,由传入的mode决定,ReentrantLock自然是使用独占模式Node.EXCLUSIVE。 

    对于上面的代码@1,处说,如果当前该锁的尾部节点不为空时,只需要原子性的将新增节点放入原队列的尾部,然后更新锁的tail 属性即可。如果尾部节点不为空,说明有线程已经在该锁上等待,那如果尾部为空,是什么情况呢?尾部为空,表示没有线程持有锁,那为什么该线程获取锁没有成功呢?我们不妨设想一下,该线程在尚未执行到addWaiter时,尾部不为空,无法获取锁,当执行到addWaiter时,别的线程释放了锁,导致尾部为空,可以重新获取锁了;(其实这个就是并发编程的魅力,与synchronized关键字不同的机制);为了解答上述疑问,我们进入到 enq(node)方法中一探究竟。

        /**
         * java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // @1 尾部为空 尝试构建表结构 
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else { // 尾部不为空 不断尝试CAS 操作
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

     

    使用自旋来加入,众所周知,CLH算法,需要初始化一个假的head节点,也就是head节点并不代表一个等待获取锁的对象,AbstractQueuedSynchronzier选择初始化head,tail的时机为第一次产生锁争用的时候。@1处为初始化head,tail,初始化后,再将新添加的节点放入到队列的尾部,然后该方法会返回原队列的尾节点。addWaiter方法执行后,继续回到acquire(args)方法处:

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    接下来,查看acquireQueued方法,addWaiter方法返回的是代表当前线程的Node节点

     

    1.3.3 线程加入队列后,下一步是调用acquireQueued阻塞线程。

    java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued代码:

    /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
    */
    
      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    
                    final Node p = node.predecessor();//  @1
                    if (p == head && tryAcquire(arg)) { // @2 判断当前节点的 前驱节点 是否为队列头部  如果是 再次尝试上锁(如果头部节点 已经释放所, 则使当前线程成为持有者 并且设置自己为 头部。 同时释放前驱节点)
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt() // 进入等待状态 等待唤醒
                            )//@3
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);  // 抛出异常 才会走的到这里。  源码在下面
            }
        }

    首先@1,获取该节点的 node 的上一个节点。
    @2如果node的前节点是head,,因为head初始化时,都是假节点,不代表有线程拥有锁,所以再次尝试获取锁,如果获取锁,则将锁的 head设置为当前获取锁的线程的Node,然后返回false《这步是实现公平锁的核心,保证释放锁时,由下个排队线程获取锁》。这里返回false代表 if (!tryAcquire(arg) &&  acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 的结果为false,那么直接返回就好,不需要selfInterrupt()设置中断标记。

    如果node的前节点不是head的话,则说明该锁被别的线程占用了,那就需要等待其他线程释放该锁,具体,我们看一下 shouldParkAfterFailedAcquire,为了更好的理解 shouldParkAfterFailedAcquire,我们先了解一下waitState变量,waitState是在AQS内部类Node中定义的一个volatile变量。

            /** waitStatus value to indicate thread has cancelled */
            //取消状态
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            //代表下个线程节点需要被唤醒
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            //当前线程节点在等待条件触发
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            //(共享锁)状态需要向后传播
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     The successor of this node is (or will soon be)
             *               blocked (via park), so the current node must
             *               unpark its successor when it releases or
             *               cancels. To avoid races, acquire methods must
             *               first indicate they need a signal,
             *               then retry the atomic acquire, and then,
             *               on failure, block.
             *   CANCELLED:  This node is cancelled due to timeout or interrupt.
             *               Nodes never leave this state. In particular,
             *               a thread with cancelled node never again blocks.
             *   CONDITION:  This node is currently on a condition queue.
             *               It will not be used as a sync queue node
             *               until transferred, at which time the status
             *               will be set to 0. (Use of this value here has
             *               nothing to do with the other uses of the
             *               field, but simplifies mechanics.)
             *   PROPAGATE:  A releaseShared should be propagated to other
             *               nodes. This is set (for head node only) in
             *               doReleaseShared to ensure propagation
             *               continues, even if other operations have
             *               since intervened.
             *   0:          None of the above
             *
             * The values are arranged numerically to simplify use.
             * Non-negative values mean that a node doesn't need to
             * signal. So, most code doesn't need to check for particular
             * values, just for sign.
             *
             * The field is initialized to 0 for normal sync nodes, and
             * CONDITION for condition nodes.  It is modified using CAS
             * (or when possible, unconditional volatile writes).
             */
            volatile int waitStatus;

     

    shouldParkAfterFailedAcquire传入当前节点和前节点,根据前节点的状态,判断线程是否需要阻塞。

    java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire

        /**
         * 检查 是否需要阻塞当前线程
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus; //@1
            if (ws == Node.SIGNAL)    //@2
                /* 
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                 //前驱节点在等待唤醒  也就是说当前节点需要进入等待状态
                return true;
            if (ws > 0) {  //@3 前驱节点被取消
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.     
                 */
                // 前驱节点被取消则跳过前驱节点,循环此操作直到找到一个不为 取消状态 的前驱节点
                do {    //@4-start
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);    //@4-end
                pred.next = node;    //@5
            } else {    //@6
                /* 
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //只有前置节点的状态为 0 或 PROPAGATE,才能进入到该代码块,表明我们需要一个信号,但暂不挂起线程,调用者需要重试一次,确保它不能获取到锁,从而阻塞该线程。
    
                 // 设置前驱节点为 SIGNAL 标记自己为等待唤醒,下次循环到这里之前,如果没有成功拥有锁, 则会进入 if (ws == Node.SIGNAL) 代码段
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    //前驱节点状态是SIGNAL时,当前线程需要阻塞;
    //前驱节点状态是CANCELLED时,通过循环将当前节点之前所有取消状态的节点移出队列;
    //前驱节点状态是其他状态时,需要设置前驱节点为SIGNAL。

    @1 首先获取前置节点的 waitStatus。
    @2 如果前置节点的waitStatus = Node.SIGNAL,那么当前节点,直接阻塞,说明状态是一个信号,如果前置节点状态为              Node.SIGNAL,那么后续节点应该阻塞(一个节点的waitStatus初始值为 0。)

    @3,ws > 0 ,则代表前置节点已取消。
    @4 处的代码,就是找到当前Node的第一个不为取消状态的前置节点,重构CLH队列后,返回false,再次进入到acquireQueued  的无限循环中,又继续acquireQueued的流程,继续尝试获取锁,最终获取到锁,或者阻塞。
    @6如果前置节点为0或PROPAGATE(可传播),如果前置节点为0,说明之前还没有其他节点通过(prev)来判断该prev的后继节点是否需要阻塞,所以,通过CAS设置前置节点为 Node.SIGNAL,重试获取锁过程,避免不必要的线程阻塞。

    如果线程需要阻塞,由parkAndCheckInterrupt方法进行操作。

    java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    parkAndCheckInterrupt使用了LockSupport,和cas一样,最终使用UNSAFE调用Native方法实现线程阻塞(以后有机会就分析下LockSupport的原理,park和unpark方法作用类似于wait和notify)。最后返回线程唤醒后的中断状态,关于中断,后文会分析。

    至此,获取锁的过程就结束了,为了直观体现上述获取锁的过程,现给出如下流程图:

    java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire代码我们放到后面分析。

        /**
         * Cancels an ongoing attempt to acquire.
         * 列队等待中 抛出异常会调用此方法
         * @param node the node
         */
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null; // 释放线程
    
            // 前驱节点已被取消  重新定义前驱节点
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED; // 取消当前线程 所属的节点(标记为取消),  没有使用 cas  因为 其他线程 不会干扰这里
    
            // If we are the tail, remove ourselves. 如果我们是尾巴,就把自己弄走
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                // 如果node既不是tail,又不是head的后继节点
                // 则将node的前继节点的waitStatus置为SIGNAL
                // 并使node的前继节点指向node的后继节点(相当于将node从队列中删掉了)
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
                    //  如果node是head的后继节点,则直接唤醒node的后继节点
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }

     java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor代码:

        /** 唤醒后继节点
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0) //置零当前线程所在的结点状态,允许失败。
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);  // 唤醒下级节点
        }

    非公平锁lock 方法的实现 大概就以上的内容。到这里总结一下获取锁的过程:线程去竞争一个锁,可能成功也可能失败。成功就直接持有资源,不需要进入队列;失败的话进入队列阻塞,等待唤醒后再尝试竞争锁。

    释放锁

    通过上面详细的获取锁过程分析,释放锁过程大概可以猜到:头节点是获取锁的线程,先移出队列,再通知后面的节点获取锁。

    public void unlock() {
        sync.release(1);
    }
    

    ReentrantLock的unlock方法很简单地调用了AQS的release:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {    //@1
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    和lock的tryAcquire一样,unlock的tryRelease同样由ReentrantLock中内部类NonFairSync实现:

    protected final boolean tryRelease(int releases) {
                int c = getState() - releases;   //  @1
                if (Thread.currentThread() != getExclusiveOwnerThread()) //@2
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {  // @3
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);   //@4
                return free;
    }
    

    代码@1,首先,计算持有锁的次数=当前被持有锁的次数-减去释放的锁的数量;
    代码@2,判断当前锁的持有线程释放与释放锁的线程是否相同,否则,直接抛出运行时异常
    代码@3,如果释放锁后,占有次数为0,则代表该锁被释放,设置锁的占有线程为null,
    代码@4,设置锁的state,如果返回true,表示锁被释放,如果返回false,表示,锁继续被该线程占有(重入了多次,就需要释放多次)。再次回到release方法,如果tryRelease方法返回true,表示可以释放锁,
     

      public final boolean release(int arg) {
            if (tryRelease(arg)) {  @1
                Node h = head;
                if (h != null && h.waitStatus != 0)   // @2
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

     代码@2为什么需要判断 h!=null && h.waitStatus != 0的判断呢?,在讲解获取锁的时候,方法
    shouldParkAfterFailedAcquire 中对于代码@6处的讲解,其实不难发现,一个节点在请求锁时,只有当它的前驱节点的waitStatus=Node.SIGNAL时,才会阻塞。如果 head为空,则说明CLH队列为空,压根就不会有线程阻塞,故无需执行unparkSuccessor(h),同样的道理,如果head节点的waitStatus=0,则说明压根就没有head后继节点判断是否要绑定的逻辑,故也没有线程被阻塞这一说。改进后的CLH,head如果不为空,该节点代表获取锁的那个线程对应的Node,请看获取锁代码acquireQueued中的代码@2处,如果获得锁,setHead(node);知道这一点,就不难理解为什么在释放锁时调用unparkSuccessor(h)时,参数为head了。
    现在将目光转移到 AbstractQueuedSynchronizer. unparkSuccessor(h)方法中:

    /**
         * Wakes up node's successor, if one exists.
         *
         * @param node the node
         */
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;  
            if (ws < 0)     // @1
                compareAndSetWaitStatus(node, ws, 0);
     
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {  //@2 start
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            } // @2 end
     
            if (s != null) // @3
                LockSupport.unpark(s.thread);
        }
    

    代码@1,目前waitStatus > 0表示取消,等于0表示正常(新建),该步骤主要是为了保护,避免重复释放。
    代码@2 start-end,此处,主要是从占有锁的节点,往后找,找到第一个没有被取消的节点,然后唤醒它所代表的线程。这里为什么要从尾部寻址呢?
    代码@3,唤醒线程,释放锁的逻辑代码已经结束,那调用LockSupport.unpark(s.thread)后,会进入到哪呢?此时,请再次进入获取锁代码的 acquireQueue方法和shouldParkAfterFailedAcquire方法,先解读如下:
        当LockSupport.unpark(s.thread)事,那acquireQueued的代码@3处parkAndCheckInterrupt方法会解除阻塞,继续往下执行,进入到 acquireQueued的for循环处:此时会有两种情况
        1、HEAD --> Node(s)  ... > ...            (Node(s)为  LockSupport.unpark 中的 s)
        2、HEAD --> A Cancel Node -->  Node(s)
        如果为第一种情况,直接进入 @2去尝试获取锁。
        如果为第二种情况,shouldParkAfterFailedAcquire(prev,node)中的prev为一个取消的节点,然后会重构整个CLH链表,删除Node到head节点直接的取消节点,使得被唤醒线程的节点的上一个节点为head,从而满足@2处的条件,进入获取锁方法。至此, lock方法与unlock方法实现了交互。

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();    //  @1
                    if (p == head && tryAcquire(arg)) {    // @2
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt()  )   //@3
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    与shouldParkAfterFailedAcquire方法:
      */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;   //  @1
            if (ws == Node.SIGNAL)    // @2
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {   // @3
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {   // @4 start
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);   //@4 end
     
                pred.next = node; // @5
            } else { // @6
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                   只有前置节点的状态为 0 或 PROPAGATE,,才能进入到该代码块,表明我们需要一个信号,但暂不挂起线程,调用者需要重试一次,确保它不能获取到锁,从而阻塞该线程。
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

     为了方便大家理解,给出一个简要的释放锁的流程图:

     

    中断锁

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    

    在acquire里还有最后一句代码调用了selfInterrupt,功能很简单,对当前线程产生一个中断请求。

    为什么要这样操作呢?因为LockSupport.park阻塞线程后,有两种可能被唤醒。

    第一种情况,前节点是头节点,释放锁后,会调用LockSupport.unpark唤醒当前线程。整个过程没有涉及到中断,最终acquireQueued返回false时,不需要调用selfInterrupt。

    第二种情况,LockSupport.park支持响应中断请求,能够被其他线程通过interrupt()唤醒。但这种唤醒并没有用,因为线程前面可能还有等待线程,在acquireQueued的循环里,线程会再次被阻塞。parkAndCheckInterrupt返回的是Thread.interrupted(),不仅返回中断状态,还会清除中断状态,保证阻塞线程忽略中断。最终acquireQueued返回true时,真正的中断状态已经被清除,需要调用selfInterrupt维持中断状态。

    因此普通的lock方法并不能被其他线程中断,ReentrantLock是可以支持中断,需要使用lockInterruptibly。

    两者的逻辑基本一样,不同之处是parkAndCheckInterrupt返回true时,lockInterruptibly直接throw new InterruptedException()。

    ReentrantLock lockInterruptibly 源码分析
        

        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }

    首先先提一个问题: void lock(),通过该方法去获取锁,如果锁被占用,线程阻塞,如果调用被阻塞线程的interupt()方法,会取消获取锁吗?答案是否定的。
         首先需要知道 LockSupport.park 会响应中断,但不会抛出 InterruptedException。
         接下来,我们就从lockInterruptibly()方法入手,一步一步解析,并分析与lock方法的差异。
         首先进入的是AbstractQueuedSynchronizer的acquireInterruptibly方法。
     

    /**
         * Acquires in exclusive mode, aborting if interrupted.
         * Implemented by first checking interrupt status, then invoking
         * at least once {@link #tryAcquire}, returning on
         * success.  Otherwise the thread is queued, possibly repeatedly
         * blocking and unblocking, invoking {@link #tryAcquire}
         * until success or the thread is interrupted.  This method can be
         * used to implement method {@link Lock#lockInterruptibly}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);   // @1
        }
        如果尝试获取锁失败后,进入获取锁并等待锁逻辑,doAcquireInterruptibly
    /**
         * Acquires in exclusive interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);   // @1
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {              // @2
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();   //@3
                }
            } finally {
                if (failed)
                    cancelAcquire(node); //@4
            }
        }

     整个获取锁的逻辑与 lock方法一样,唯一的区别在于  @3 处,如果parkAndCheckInterrupt如果是通过t.interupt方法,使LockSupport.park取消阻塞的话,会抛出InterruptedException,停止尝试获取锁,然后将添加的节点取消,那重点关注一下cancelAcquire(node);
     

    /**
         * Cancels an ongoing attempt to acquire.
         *
         * @param node the node
         */
        private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
     
            node.thread = null;
     
            // Skip cancelled predecessors
            Node pred = node.prev;   
            while (pred.waitStatus > 0)  // @1
                node.prev = pred = pred.prev;
     
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
            Node predNext = pred.next; //@2
     
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
            node.waitStatus = Node.CANCELLED; 
     
            // If we are the tail, remove ourselves.
            if (node == tail && compareAndSetTail(node, pred)) {   // @3 
                compareAndSetNext(pred, predNext, null);
            } else {  // @4
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {   // @5
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {  // @6
                    unparkSuccessor(node);
                }
     
                node.next = node; // help GC
            }
        }

    代码@1:此处的目的就是, 设置prev的值为从当前取消节点往head节点方向,第一个未取消节点。并将中间的取消节点脱离这条链。
    代码@2 Node predNext = pred.next;
    代码@3 如果被取消的节点是尾节点的话,那么将pred设置为尾节点,compareAndSetTail(node, pred),如果设置失败,说明,有别的线程在申请锁,使得尾部节点发生了变化,那这样的话,我当前节点取消的工作,就到此可以结束了;如果设置成功了,既然pred是尾节点,那么再次将pred的next域设置为null;当然也可能设置失败,表明又有新的线程在申请锁,创建了节点。所以取消操作,也到此结束。
    代码@4,如果取消的节点,不是尾部节点的话,这时需要维护CLH链,请看代码@5
    代码@5,首先pred不是head节点,接下来判断是否需要设置pred.next = 当前待取消节点的next。
     如果 pred.waitStatus==Node.SIGNAL, 或者操作pred.waitStatus=Node.SIGNAL状态成功,并且pred.thread 的线程不为空;此时进一步判断待取消的节点的next不为空,并且状态为非取消的时,将pred.next 设置为 node.next;该取消节点被删除
    代码@6,如果pred为head,执行一次唤醒操作。


    整个加锁释放锁流程中Node.CANCEL状态节点的删除操作有两处,一处在shouldParkAfterFailedAcquire,另一处就发生在cancelAcquire方法。
     

    总结

    从ReentrantLock的实现完整分析了AQS的独占同步功能(如果需要更进一步详细了解,可以参考这篇博文https://blog.csdn.net/tyrroo/article/details/92772279),总的来讲并不复杂。别忘了AQS还有共享功能。

     

    展开全文
  • 1、java.util.concurrent.locks.Condition接口一览: void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean ...

    1、java.util.concurrent.locks.Condition接口一览:

     

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();

    Condition 实现的语义为 Object.wait 与 Object.notify
    关于Condition的实现类为 AbstractQueuedSynchronizer.ConditionObject内部类。
    首先在讲解源码之前,我重点罗列出ConditionObject的关键数据结构:
        private transient Node fristWaiter;
        private transient Node lastWaiter;
    从这里看出,每个CondtionObject,都维护着自己的条件等待等待队列,并且是一个双端链表。


    1)void await() throws InterruptedException;源码分析
     

    /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with
             *      saved state as argument, throwing
             *      IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())    // @1 
                    throw new InterruptedException();
     
                Node node = addConditionWaiter();    //@2
                int savedState = fullyRelease(node);   // @3
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {     //@4
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  // @5
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  //@6
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled     //@7                            
                    unlinkCancelledWaiters();
                if (interruptMode != 0)   //@8                                                                         
                    reportInterruptAfterWait(interruptMode);
            }

    代码@1:检测当前线程的中断标记,如果中断位为1,则抛出异常(和object.wait()语义一致)。这里也能解释为什么await()需要在lock()获取锁之后才能调用,如果不是获取锁之后调用就会报异常。
    代码@2:添加等待节点。就是一个简单的链表维护节点的操作,具体参照addConditionWaiter讲解。
    代码@3:释放占有的锁,并获取当前锁的state,因为await实现的语义为Object.wait(),释放锁并等待条件的发生。当条件满足,线程被唤醒后,第一步是需要获取锁,然后在上次await()的下一条指令处继续执行。代码3就是实现上述语义的释放锁部分。为什么要获取当前锁的state?是因为后面需要通过acquireQueued(node, savedState)获取锁时恢复当前线程重入锁的次数。
    代码@4:isOnSyncQueue 当前节点是否在同步队列中,如果在同步阻塞队列中,则申请锁,去执行;如果不在同步队列中(在条件队列中),阻塞,等待满足条件。新增的节点,默认在条件队列中(Conditon),也就是说第一次判断isOnSyncQueue 一定为false,会进入循环体,先阻塞当前线程,正常由signal()唤醒阻塞则第二次判断isOnSyncQueue ,此时结果应为true不进入循环,执行循环后面的流程。isOnSyncQueue 源码解读在下文中;
    代码@5:线程从条件队列被唤醒,唤醒后,线程要从条件队列移除,进入到同步等待队列,被唤醒有有如下两种情况,一是条件满足,收到signal信号,二是线程被取消(中断),该步骤是从条件队列移除,加入到同步等待队列,返回被唤醒的原因,如果是被中断,需要根据不同模式,处理中断。处理中断也有两种方式:1.继续设置中断位;2:直接抛出InterruptedException。请看下文关于checkInterruptWhileWaiting的源码解读。
    代码@6:运行到代码6时,说明线程已经结束了释放锁,从条件队列移除,线程运行,在继续执行业务逻辑之前,必须先获取锁(这里可能会被阻塞在同步队列)。只有成功获取锁后,才会去判断线程的中断标志,才能在中断标志为真时,抛出InterruptException。
    代码@7,执行一些收尾工作,清理整个条件队列:
    代码@8,处理中断,是设置中断位,还是抛出InterruptException。
    那我们先关注一下addConditionWaiter方法:

            /**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {     //@1
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);  //@2
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }

    添加条件等待节点,根据链表的特征,直接在尾部节点的nextWaiter指向新建的节点,并将新建的节点设置为整个链表的尾部,首先要知道如下数据结构:
        object {
              Node firstWaiter;
              Node lastWaiter;
              node {
                   node nextWaiter;
                  该节点承载的业务数据,比如这里的Thread t;等
             }
        }
    知道上述结构,其实整个链的数据维护,基本一目了然,自己都可以实现下面的逻辑。
    代码@1,如果最后一个等待节点的状态不是Node.CONDITION,则,则先删除等待链中节点状态不为Node.CONDITION的节点。具体代码分析请参照下文unlinkCancelledWaiters的解读。
    代码@2开始,就是普通链表的节点添加的基本方法。

    // 清除等待节点方法
     

            /**
             * Unlinks cancelled waiter nodes from condition queue.
             * Called only while holding lock. This is called when
             * cancellation occurred during condition wait, and upon
             * insertion of a new waiter when lastWaiter is seen to have
             * been cancelled. This method is needed to avoid garbage
             * retention in the absence of signals. So even though it may
             * require a full traversal, it comes into play only when
             * timeouts or cancellations occur in the absence of
             * signals. It traverses all nodes rather than stopping at a
             * particular target to unlink all pointers to garbage nodes
             * without requiring many re-traversals during cancellation
             * storms.
             */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;  // 
                Node trail = null;       //@1
                while (t != null) {
                    Node next = t.nextWaiter;    
                    if (t.waitStatus != Node.CONDITION) {  // @3
                        t.nextWaiter = null;
                        if (trail == null)    // @4 说明trail还未被赋值,前面的循环中还未找到状态为CONDITION的节点
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next; //@5 前面的循环中找到状态为CONDITION的节点后,每次循环非CONDITION状态的节点时跳过,处理next节点
                        if (next == null)    // @6
                            lastWaiter = trail;
                    }
                    else   // @4
                        trail = t;
                    t = next;
                }
            }

    该方法的思路为,从第一个节点开始,将不等于Node.CONDITION的节点清除。
    代码@1,设置尾部节点临时变量,用来记录最终的尾部节点。
    第一次循环,是循环第一个节点,如果它的状态为Node.CONDITION,则该链的头节点保持不变,设置临时尾节点为t,然后进行下一个节点的判断;如果节点不为Node.CONDITION, 重置头节点为下一个节点,或尾部节点的下一个节点(@4,@5)。代码@6代表整个循环结束,设置 ConditionObject对象的lastWaiter为trail的值。


    await步骤中,释放锁过程源码解析。释放锁的过程,逻辑为unlock,但该方法,返回当前锁的state,因为释放锁后,该方法在条件没有满足前提下,自身需要阻塞。被唤醒后,需要先尝试获取锁,然后才能执行接下来的逻辑。
     

    /**
         * Invokes release with current state value; returns saved state.
         * Cancels node and throws exception on failure.
         * @param node the condition node for this wait
         * @return previous sync state
         */
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }

    await,@4步骤中,isOnSyncQueue 源码解读:

    /**
         * Returns true if a node, always one that was initially placed on
         * a condition queue, is now waiting to reacquire on sync queue.
         * @param node the node
         * @return true if is reacquiring
         */
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)   // @1
                return false;
            if (node.next != null) // If has successor, it must be on queue   // @2
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
            //node.prev不为null的情况下,节点也可能不在同步队列中,因为当
            //(1)不同线程同时调用interrupt()和signal()方法时;
            //(2)不同线程同时调用interrupt()方法时;
            //enq()存在并发,其中的CAS操作可能失败
            return findNodeFromTail(node);
        }

    代码@1,如果节点的状态为Node.CONDITION 或 node.prev == null,表明该节点在条件队列中,并没有加入同步阻塞队列(同步阻塞队列为申请锁等待的队列),await方法中,新增的节点,默认满足上述条件,所以返回false,表示在条件队列中,等待条件的发生,条件满足之前,当前线程应该阻塞。node.prev在什么时候会改变呢?答案是在signal()流程或中断唤醒后的enq()方法中。
    代码@2,如果node.next不为空,说明在同步阻塞队列中。这个我想毫无疑问。当然也说明next域肯定是在进入同步队列过程中会设置值。
    代码@3, 上面的注释也说的比较清楚,node.prev不为空,但也不在同步队列中,这个是由于CAS可能会失败,为了不丢失信号,从同步队列中再次选择该节点,如果找到则返回true,否则返回false。


    await @5 checkInterruptWhileWaiting 代码解读:
     

     /*
             * For interruptible waits, we need to track whether to throw
             * InterruptedException, if interrupted while blocked on
             * condition, versus reinterrupt current thread, if
             * interrupted while blocked waiting to re-acquire.
             */
     
            /** Mode meaning to reinterrupt on exit from wait */
            private static final int REINTERRUPT =  1;   // 重新设置中断位,中断由上层处理
            /** Mode meaning to throw InterruptedException on exit from wait */  
            private static final int THROW_IE    = -1;    // 直接抛出 InterruptedException  0:正常
     
            /**
             * Checks for interrupt, returning THROW_IE if interrupted
             * before signalled, REINTERRUPT if after signalled, or
             * 0 if not interrupted.
             */
            private int checkInterruptWhileWaiting(Node node) {
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
     
    /**
         * Transfers node, if necessary, to sync queue after a cancelled
         * wait. Returns true if thread was cancelled before being
         * signalled.
         * @param current the waiting thread
         * @param node its node
         * @return true if cancelled before the node was signalled
         */
        final boolean transferAfterCancelledWait(Node node) {
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {   //@1 中断发生在signal()之前会走这里
                enq(node);
                return true;
            }
            /*
             * If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just
             * spin.
             */
            //中断和signal()并发时,等待signal()完成enq()直到节点进入同步队列
            while (!isOnSyncQueue(node))
                Thread.yield();//触发中断的线程让步(不释放锁)
            return false;
        }
     
    /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) { 
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    重点关注@1,首先需要知道一点,如果收到正常的singal()信号而被唤醒的节点【这个在singal方法时重点分析】,状态为Node.SINGAL,不会是Node.CONDITION状态,所以如果代码@1compareAndSetWaitStatus设置成功,说明线程是调用了interrupt()方法而使得LockSupport.park解除阻塞的,然后将该节点加入到同步队列中,结束 await的等待条件触发语义,进入到 抢占锁阶段。【再次重申Object wait语义,释放当前锁,然后等待条件的触发【条件队列】,条件发生后,要先重新去抢占锁,获取锁则继续执行,否则阻塞在获取锁【同步队列】】,所以当 线程阻塞在  await 方法时,调用 t.interrupt方法时只是中断条件队列的等待,并不能马上取消执行,马上抛出InterrupterException。


    await方法流程图:

     2)signal()方法详解 

    /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {  // @1
                if (!isHeldExclusively())  //如果当前线程不是锁的持有者,直接抛出异常。
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first); //通知第一个等待者 //@2
            }
     
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {  
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)   // @3
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null); // @4
            }
     
        /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal).
         */
        final boolean transferForSignal(Node node) {  // @5
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */ 
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))   //@6
                return false;
     
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);     // @7
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))     //@8
                LockSupport.unpark(node.thread);
            return true;
        }

    signal的具体实现,是从doSignal方法开始
    @代码3,首先将要被通知的节点的下一个节点设置为等待队列的head节点,如果当前节点的下一个节点为空,则设置等待队列的尾节点(lastWaiter)为空,然后将当前被通知的节点的下一个节点设为空;该步骤核心思想就是将被通知节点移除条件等待队列,然后重新维护条件等待对的firstWaiter和lastWaiter。
    @代码4,!transferForSignal(first) && (first = firstWaiter) != null,根据后文的解析可以知道,如果被通知节点没有进入到同步阻塞队列(transferForSignal返回false)并且条件等待队列还有不为空的节点,则继续循环通知。
    @代码5,transferForSignal该方法,将被通知的节点放入同步等待队列。
    @代码6,首先判断,尝试将节点状态设置为0,如果设置失败,则说明该节点的状态已经不是Node.CONDITION,进一步说明该节点在没有等到通知信号时,被取消,直接返回false,通知下一个等待者。(回到代码@3,@4)
    @代码7,将节点放入到同步队列中。个人认为信号通知,主要是将节点从条件等待队列移入到同步等待队列,主要是防止signal信号的丢失。
    @代码8,如果前置节点取消,或者在设置前置节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程,@8设置失败发送的情况也就是前置节点状态发送改变(被取消等),所以直接唤醒被通知节点的线程,也就是说,signal方法,只有在入队列后,前置节点被取消时,才会执行LockSupport.unpark方法唤醒线程,通常该方法,只是将节点从条件等待队列放入同步队列,然后该方法执行完毕,释放持有的锁。


    整个通知signal方法的流程如下:


    ---------------------
    作者:唯有坚持不懈
    来源:CSDN
    原文:https://blog.csdn.net/prestigeding/article/details/53158246
    版权声明:本文为博主原创文章,转载请附上博文链接!

    展开全文
  • 根据网上的说法,jdk并发包中的 Condition await 与 ...monitorObject.wait,该方法调用必须在临界区中(保护的代码段)被调用,线程如果在临界区中调用监视器的wait方法,然后线程会释放占有监视器monitorOb...

    根据网上的说法,jdk并发包中的 Condition await 与 signal 实现了 Object.wait notify 语义。以下总结,是基于Condition await,singal方法的实现原理总结出来的:

    • monitorObject.wait,该方法调用必须在临界区中(锁保护的代码段)被调用,线程如果在临界区中调用监视器的wait方法,然后线程会释放占有监视器monitorObject的锁,然后阻塞(等待条件的发生,该线程会保存在monitorObject的条件等待队列,当该线程收到信号或中断被唤醒后,首先需要尝试获取监视器的锁,然后继续执行操作,如果是被中断,需要在获取锁后,才会被中断。)
    • monitorObject.notify,该方法调用同样只能在临界区中被调用,锁的释放,在执行完临界区后,才会释放。根据Condition.singal实现机制,首先唤醒,是先将线程从条件等待队列放入到同步阻塞队列,然后执行完临界区代码后,释放锁,其他线程竞争锁。

    为了对Condition await 与 signal 方法有一个直接的了解,现给出一个简单的生产者、消费者测试用例:

    package persistent.prestige.study.concurent.bread;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class TestMain {
    
    	public static void main(String[] args) {
    		BreadContainerByObject container = new BreadContainerByObject();
    
    		for (int i = 0; i < 5; i++) {
    			new Thread(new Producers(container)).start();
    		}
    
    		for (int i = 0; i < 3; i++) {
    			new Thread(new Customer(container)).start();
    		}
    	}
    
    }
    
    
    interface BreadContainer extends Serializable {
    	
    	public void put(Bread b) throws InterruptedException;
    	
    	public Bread poll() throws InterruptedException;
    
    }
    
    /**
     * 基于 Reentrant Condition实现
     * @author dingwei2
     *
     */
    @SuppressWarnings("serial")
    class BreadContainerByCondition implements BreadContainer {
    
    	private Lock lock = new ReentrantLock();
    	private Condition NotFull = lock.newCondition();
    	private Condition NotEmpty = lock.newCondition();
    
    	// 面包容器
    	private List<Bread> breads = new ArrayList<Bread>();
    	private static final int MAX = 20;
    
    	private volatile int num = 0;
    
    	@Override
    	public void put(Bread b) throws InterruptedException {
    		// TODO Auto-generated method stub
    		try {
    			lock.lock();
    			while(breads.size() >= MAX ) { //已经满了
    				NotFull.await();
    			}
    			
    			b.setId(num ++);
    			breads.add(b);
    			//放入一个元素后,NotEmpty
    			NotEmpty.signalAll();
    			
    		} finally {
    			lock.unlock();
    		}
    
    	}
    
    	@Override
    	public Bread poll() throws InterruptedException{
    		try {
    			lock.lock();
    			while(breads.isEmpty()) {//如果为空
    				NotEmpty.await();
    			}
    			
    			Bread b = breads.remove(breads.size() -1);
    			NotFull.signalAll();
    			return b;
    			
    		} finally {
    			lock.unlock();
    		}
    	}
    
    }
    
    /**
     * 基于 Object.notify Object.wait
     * @author dingwei2
     *
     */
    @SuppressWarnings("serial")
    class BreadContainerByObject implements BreadContainer{
    	
    	// 面包容器
    	private List<Bread> breads = new ArrayList<Bread>();
    	private static final int MAX = 20;
    	
    	private volatile int num = 0;
    	
    	public void put(Bread b) {
    		synchronized (breads) {
    			while(breads.size() >= MAX) {
    				try {
    					breads.wait();
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();//这里不应该 将 InterruptedExcepiton 吞掉
    				}
    			}
    			b.setId(num ++);
    			breads.add(b);
    			breads.notifyAll();
    		}
    		
    	}
    	
    	public Bread poll() {
    		Bread b = null;
    		synchronized (breads) {
    			while(breads.size() < 1) {
    				try {
    					breads.wait();
    				} catch (InterruptedException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();//这里不应该 将 InterruptedExcepiton 吞掉
    				}
    			}
    			b = breads.remove(breads.size() -1);
    			breads.notifyAll();
    		}
    		
    		return b;
    	}
    
    }
    
    
    /**
     * 生产者
     * 
     * @author dingwei2
     *
     */
    class Producers implements Runnable {
    
    	private BreadContainerByObject container;
    
    	public Producers(BreadContainerByObject container) {
    		this.container = container;
    	}
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		for (int i = 0; i < 5; i++) {
    			Bread b = new Bread();
    			b.setFactoryName(Thread.currentThread().getName());
    			container.put(b);
    		}
    
    	}
    
    }
    
    /**
     * 消费者
     * @author dingwei2
     *
     */
    class Customer implements Runnable {
    
    	public BreadContainerByObject container;
    
    	public Customer(BreadContainerByObject container) {
    		this.container = container;
    	}
    
    	@Override
    	public void run() {
    		// TODO Auto-generated method stub
    		for(int i = 0; i < 5; i ++ ) {
    			Bread b = container.poll();
    			System.out.println(Thread.currentThread().getName() + "消费了" + b.toString());
    		}
    		
    	}
    }
    
    
    
    @SuppressWarnings("serial")
    class Bread implements Serializable {
    	
    	private Integer id;
    	
    	private String factoryName;
    
    	public Integer getId() {
    		return id;
    	}
    
    	public void setId(Integer id) {
    		this.id = id;
    	}
    
    	@Override
    	public String toString() {
    		return "面包:" + (id == null ? 0 : id.intValue()) + ";生产工厂:" + getFactoryName();
    	}
    
    	public String getFactoryName() {
    		return factoryName;
    	}
    
    	public void setFactoryName(String factoryName) {
    		this.factoryName = factoryName;
    	}
    	
    	
    
    }

    使用Reentrant Condition ,细化了消息通知的粒度,比如使用了当队列中产品时,通过 NotEmpty 条件来唤醒消费者,当队列还有可用的空间存放产品时,使用 NotFull 条件来唤醒生产者,使用两个条件队列,确保被唤醒的线程的准确性,加入到同步队列的节点,在该节点获取到锁后,确实是满足条件的(特别在临界情况的时候)。而 Object.wait,Object.notify, 生产者,消费者在同一个条件队列中排队。

    欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:

    1、源码分析RocketMQ专栏(40篇+)
    2、源码分析Sentinel专栏(12篇+)
    3、源码分析Dubbo专栏(28篇+)
    4、源码分析Mybatis专栏
    5、源码分析Netty专栏(18篇+)
    6、源码分析JUC专栏
    7、源码分析Elasticjob专栏
    8、Elasticsearch专栏
    9、源码分析Mycat专栏

    展开全文
  • java并发编程锁机制

    2019-12-14 12:55:23
    之前系列文章都在叙述java线程池的设计以及实现机制,没有涉及java并发编程的锁机制,这是因为锁机制与线程池是相对独立的内容,自成体系,可以把锁机制当做线程池的一个基础组件,想黑盒一样使用它。 可我们如何去...
  • Redis并发锁1、 单线程redis为什么需要分布式锁虽然一个redis是单进程单线程模式,但请求并不是一定按先后顺序处理的,多个请求会被redis交叉着执行,(就像单个cpu,在一个时间点只能执行一个命令,为什么多个线程...
  • 第一部分:synchronized和volatile锁机制用来保护对象的一致性以及操作的原子性,是实现线程安全的重要手段。线程安全涉及到对象两个重要的状态:共享性和可变性。如果对象是不可变的、线程私有的那么它一定是线程...
  • Java并发编程:Concurrent锁机制解析前面,我们讲了Java自带的对象锁机制。因为我们的方法必然是在一个对象中的,所以,通过对象的锁,可以很好的控制对方法的调用。当对象的锁被一个线程持有后,其他线程想要调用该...
  • 从这次开始接触Java1.5推出的并发包中的东东,先看一下jdk中的并发包:接下来咱们则会集中对这些并发包中的核心进行深入了解,不光要学会怎么用这些并发包中的类,而且还得知道这些功能背后运行的原理, 所以手踏...
  • Java并发机制的实现原理

    万次阅读 多人点赞 2016-07-18 20:04:21
    Java并发机制实现原理
  • 深入理解 Java 并发锁 1. 并发锁简介 确保线程安全最常见的做法是利用锁机制(Lock、sychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的...
  • 同步的基本思想为了保证共享数据在同一时刻只被一个线程使用,我们有一种很简单的实现思想,就是在共享数据里保存一个,当没有线程访问时,是空的。当有第一个线程访问时,就在里保存这个线程的标识并允许这个...
  • synchronized 和 volatile,是最基础的两个!volatile是轻量级,它在多核处理器开发中保证了共享变量的可见性。...java语言规范第3版中对volatile的定义如下:volatile会添加一条lock# 前缀的...
  • 主要为大家详细介绍了Java并发编程之显式锁机制的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
  • Java并发锁机制

    2020-12-14 19:52:03
    目录一、Lock接口1.1 什么是锁?1.2 Lock对比synchronized关键字及常用API1.3 Lock接口源码二、队列同步器2.1 什么是...锁机制Java并发的核心之一,凡是谈及多并发多线程问题,锁机制是在所难免的,本文将对Java
  • 本次将提及用到的锁机制:独享锁/共享锁独享锁是指该锁一次只能被一个线程所持有。共享锁是指该锁可被多个线程所持有。对于Java ReentrantLock而言,其是独享锁。但是对于Lock的另一个实现类ReadWriteLock,其读锁是...
  • 本文转载自互联网,侵删本系列文章将整理到我在GitHub上的《Java面试指南》仓库,更多精彩内容请到我的仓库里查看文章同步发于我的个人博客:www.how2playlife.com本文是微信公众号【Java技术江湖】的《Java并发指南...
  • 基于redis实现分布式并发锁(注解实现)说明前提, 应用服务是分布式或多服务, 而这些"多"有共同的"redis";(2017-12-04) 笑哭, 写这篇之前一直觉得应该有大神已经写好了, 但未找到. 其实redis官网已经给出了实现(百度、...
  • 在上篇文章《Java并发编程之锁机制之Lock接口》中,我们已经了解了,Java下整个Lock接口下实现的锁机制是通过AQS(这里我们将AbstractQueuedSynchronizer 或AbstractQueuedLongSynchronizer统称为AQS)与Condition来...
  • 一、资源和加锁1、场景描述多线程并发访问同一个资源问题,假如线程A获取变量之后修改变量值,线程C在此时也获取变量值并且修改,两个线程同时并发处理一个变量,就会导致并发问题。这种并行处理数据库的情况在实际...
  • 锁机制Java锁机制其实就是一种等待机制,将多个线程对共享数据的并发访问转换为串行访问,即一个共享数据一次只能被一个线程访问,这样锁就可以用来保障线程安全了。锁(Lock)可以理解为对共享数据进行保护的一个许可...
  • 本文着重介绍了在java并发中常见的几种锁机制。1.偏向锁偏向锁是JDK1.6提出来的一种锁优化的机制。其核心的思想是,如果程序没有竞争,则取消之前已经取得锁的线程同步操作。也就是说,若某一锁被线程获取后,便进入...
  • Java并发编程之锁机制之引导篇 该文章属于《Java并发编程》系列文章,如果想了解更多,请点击《Java并发编程之总目录》 前言 在前面的文章中。我们了解了volatile、了解了synchronized。现在我们来了解一下Java...
  • 锁机制用来保护对象的一致性以及操作的原子性,是实现线程安全的重要手段。线程安全涉及到对象两个重要的状态:共享性和可变性。如果对象是不可变的、线程私有的那么它一定是线程安全的。所以说,只有在共享的、可变...
  • 最近在忙公司的项目,现在终于有时间来写博客啦~开心开心 前言 通过前面的文章,我们已经了解了AQS(AbstractQueuedSynchronizer)内部的实现与基本原理。现在我们来了解一下,Java中为...Java并发编程之锁机制之AQ...
  • 深入理解JAVA并发锁

    2020-11-05 09:28:10
    深入理解 Java 并发锁 1. 并发锁简介 确保线程安全最常见的做法是利用锁机制(Lock、sychronized)来对共享数据做互斥同步,这样在同一个时刻,只有一个线程可以执行某个方法或者某个代码块,那么操作必然是原子性的...
  • Java并发编程:Concurrent锁机制解析 code {color: #FF0000} pre.src {background-color: #002b36; color: #839496;} Java并发编程:Concurrent锁机制解析 Table of Contents 1. 本质 2. Lock 3....
  • 是存在哪里的,怎么标识是什么Monitor机制Java中怎么表现的优化升级1. 存在哪里对象在内存中的布局分为三块区域:对象头、实例数据和对齐填充。Hotspot虚拟机的对象头主要包括两部分数据:Mark Word(标记...
  • 在上篇文章《Java并发编程之锁机制之引导篇》中,我们大致了解了Lock接口(以及相关实现类)在并发编程重要作用。接下来我们就来具体了解Lock接口中声明的方法以及使用优势。 Lock简介 Lock 接口实现类提供了比使用 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,050
精华内容 1,620
关键字:

java并发锁机制

java 订阅