• 但是因为有不同的模式,用了两种具体实现分别应对公平和非公平。 二者在实现获取的逻辑上有区别,也就是AQS留空的部分。下面看下lock和unlock实现。 非公平: static final class Nonfa...

    ReentrantLock

    内部使用了AQS:https://blog.csdn.net/u010900754/article/details/88849938

    但是因为有不同的锁模式,用了两种具体实现分别应对公平锁和非公平锁。
    二者在实现获取锁的逻辑上有区别,也就是AQS留空的部分。下面看下lock和unlock实现。

    非公平:

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }

    这里先快速获取一次,失败就走ASQ的过程。acquire方法会走到tryAcquire,tryAcquire调用了父类的nonfairTryAcquire。

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }

    先判断AQS的state是否是0,是0说明没有现成持有锁,CAS获取一次,不行就走AQS的逻辑。如果state非0,也不是说不能获取,因为是“重入锁”嘛,所以再判断一次当前线程是否是自己,如果是,可以重入,直接吧state++即可。这里非公平体现在哪里?当前线程总是可以通过tryAcquire直接快速获取锁,即使等待队列里有其他线程等待,所以是非公平的。另外还可以支持重入。这也是为什么AQS代码把获取这部分逻辑留空的原因。

    下面是release的实现。仅仅是设置了state,队列操作在AQS里面。

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }

    公平锁:

     static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
     }

     可以看到获取锁必须通过hasQueuedPredecessors的检验:

         public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }

    这个函数只有在要么head.next是空要么head.next所代表的的线程不是当前线程才返回true,这种情况意味着当前线程不是队列的最前面,只有在最前面才可以尝试获取锁。

    展开全文
  • 最近正在阅读Java ReentrantLock源码,始终对可重入和不可重入概念理解不透彻,进行学习后记录在这里。 基础知识 Java多线程的wait()方法和notify()方法 这两个方法是成对出现和使用的,要执行这两个方法,有一个...

    最近正在阅读Java ReentrantLock源码,始终对可重入和不可重入概念理解不透彻,进行学习后记录在这里。

    基础知识

    Java多线程的wait()方法和notify()方法

    这两个方法是成对出现和使用的,要执行这两个方法,有一个前提就是,当前线程必须获其对象的monitor(俗称“锁”),否则会抛出IllegalMonitorStateException异常,所以这两个方法必须在同步块代码里面调用。

    wait():阻塞当前线程

    notify():唤起被wait()阻塞的线程

    不可重入锁

    所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。我们尝试设计一个不可重入锁:

    public class Lock{
        private boolean isLocked = false;
        public synchronized void lock() throws InterruptedException{
            while(isLocked){    
                wait();
            }
            isLocked = true;
        }
        public synchronized void unlock(){
            isLocked = false;
            notify();
        }
    }

    使用该锁:

    public class Count{
        Lock lock = new Lock();
        public void print(){
            lock.lock();
            doAdd();
            lock.unlock();
        }
        public void doAdd(){
            lock.lock();
            //do something
            lock.unlock();
        }
    }

    当前线程执行print()方法首先获取lock,接下来执行doAdd()方法就无法执行doAdd()中的逻辑,必须先释放锁。这个例子很好的说明了不可重入锁。

    可重入锁

    接下来,我们设计一种可重入锁

    public class Lock{
        boolean isLocked = false;
        Thread  lockedBy = null;
        int lockedCount = 0;
        public synchronized void lock()
                throws InterruptedException{
            Thread thread = Thread.currentThread();
            while(isLocked && lockedBy != thread){
                wait();
            }
            isLocked = true;
            lockedCount++;
            lockedBy = thread;
        }
        public synchronized void unlock(){
            if(Thread.currentThread() == this.lockedBy){
                lockedCount--;
                if(lockedCount == 0){
                    isLocked = false;
                    notify();
                }
            }
        }
    }

    所谓可重入,意味着线程可以进入它已经拥有的锁的同步代码块儿。

    我们设计两个线程调用print()方法,第一个线程调用print()方法获取锁,进入lock()方法,由于初始lockedBy是null,所以不会进入while而挂起当前线程,而是是增量lockedCount并记录lockBy为第一个线程。接着第一个线程进入doAdd()方法,由于同一进程,所以不会进入while而挂起,接着增量lockedCount,当第二个线程尝试lock,由于isLocked=true,所以他不会获取该锁,直到第一个线程调用两次unlock()将lockCount递减为0,才将标记为isLocked设置为false。

    可重入锁的概念和设计思想大体如此,Java中的可重入锁ReentrantLock设计思路也是这样

    展开全文
  • 所谓读写,是对访问资源共享和排斥,一般的重入性语义为如果对资源加了写,其他线程无法再获得写与读,但是持有写的线程,可以对资源加读降级);如果一个线程对资源加了读,其他线程可以继续...

    1、ReentrantReadWriterLock 基础

    所谓读写锁,是对访问资源共享锁和排斥锁,一般的重入性语义为如果对资源加了写锁,其他线程无法再获得写锁与读锁,但是持有写锁的线程,可以对资源加读锁(锁降级);如果一个线程对资源加了读锁,其他线程可以继续加读锁。

    java.util.concurrent.locks中关于多写锁的接口:ReadWriteLock。

    public interface ReadWriteLock {
        /**
         * Returns the lock used for reading.
         *
         * @return the lock used for reading.
         */
        Lock readLock();
    
        /**
         * Returns the lock used for writing.
         *
         * @return the lock used for writing.
         */
        Lock writeLock();
    }

    提一个问题,是否觉得 ReentrantReadWriteLock 会实现 Lock 接口吗?与 ReentrantLock 有什么关系?

    答案是否定的,ReentrantReadWriterLock 通过两个内部类实现 Lock 接口,分别是 ReadLock,WriterLock 类。与 ReentrantLock一样,ReentrantReadWriterLock 同样使用自己的内部类Sync(继承AbstractQueuedSynchronizer)实现CLH算法。为了方便对读写锁获取机制的了解,先介绍一下Sync内部类中几个属性,采用了位运算:

    /*
             * Read vs write count extraction constants and functions.
             * Lock state is logically divided into two unsigned shorts:
             * The lower one representing the exclusive (writer) lock hold count,
             * and the upper the shared (reader) hold count.
             */
    
            static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
            /** Returns the number of shared holds represented in count  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** Returns the number of exclusive holds represented in count  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    首先ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数(ReentrantLock中的state),用所以,采取的办法是,高16位用来表示读锁占有的线程数量,用低16位表示写锁被同一个线程申请的次数。

    • SHARED_SHIFT,表示读锁占用的位数,常量16
    • SHARED_UNIT,   增加一个读锁,按照上述设计,就相当于增加 SHARED_UNIT;
    • MAX_COUNT    ,表示申请读锁最大的线程数量,为65535
    • EXCLUSIVE_MASK  :表示计算写锁的具体值时,该值为 15个1,用 getState & EXCLUSIVE_MASK算出写锁的线程数,大于1表示重入。
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; } 
    
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    举例说明,比如,现在当前,申请读锁的线程数为13个,写锁一个,那state怎么表示?

    上文说过,用一个32位的int类型的高16位表示读锁线程数,13的二进制为 1101,那state的二进制表示为

    00000000 00001101 00000000 00000001,十进制数为 851969, 接下在具体获取锁时,需要根据这个 851968 这个值得出上文中的 13 与 1。要算成13,只需要将state 无符号向左移位16位置,得出00000000 00001101,就出13,根据851969要算成低16位置,只需要用该00000000 00001101 00000000 00000001 & 111111111111111(15位),就可以得出00000001,就是利用了1&1得1,1&0得0这个技巧。

    移位元素,如果一个数值向左移(<)一位,在没越界(超过该类型表示的最大值)的情况下,想当于操作数 * 2

    如果一个数值向右(>) 移动移位,在没有越界的情况下,想到于操作数 除以2。

    然后再关注如下几个与线程本地变量相关的属性:    

    /**
             * The number of reentrant read locks held by current thread.
             * Initialized only in constructor and readObject.
             * Removed whenever a thread's read hold count drops to 0.
             */
            private transient ThreadLocalHoldCounter readHolds;
    
            /**
             * The hold count of the last thread to successfully acquire
             * readLock. This saves ThreadLocal lookup in the common case
             * where the next thread to release is the last one to
             * acquire. This is non-volatile since it is just used
             * as a heuristic, and would be great for threads to cache.
             *
             * <p>Can outlive the Thread for which it is caching the read
             * hold count, but avoids garbage retention by not retaining a
             * reference to the Thread.
             *
             * <p>Accessed via a benign data race; relies on the memory
             * model's final field and out-of-thin-air guarantees.
             */
            private transient HoldCounter cachedHoldCounter;
    
            /**
             * firstReader is the first thread to have acquired the read lock.
             * firstReaderHoldCount is firstReader's hold count.
             *
             * <p>More precisely, firstReader is the unique thread that last
             * changed the shared count from 0 to 1, and has not released the
             * read lock since then; null if there is no such thread.
             *
             * <p>Cannot cause garbage retention unless the thread terminated
             * without relinquishing its read locks, since tryReleaseShared
             * sets it to null.
             *
             * <p>Accessed via a benign data race; relies on the memory
             * model's out-of-thin-air guarantees for references.
             *
             * <p>This allows tracking of read holds for uncontended read
             * locks to be very cheap.
             */
            private transient Thread firstReader = null;
            private transient int firstReaderHoldCount;

    上述这4个变量,其实就是完成一件事情,将获取读锁的线程放入线程本地变量(ThreadLocal),方便从整个上 下文,根据当前线程获取持有锁的次数信息。其实 firstReader,firstReaderHoldCount ,cachedHoldCounter 这三个变量就是为readHolds变量服务的,是一个优化手段,尽量减少直接使用readHolds.get方法的次数,firstReader与firstReadHoldCount保存第一个获取读锁的线程,也就是readHolds中并不会保存第一个获取读锁的线程;cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数。请结合如下代码理解上述观点:

                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }

    2、ReentrantReadWriterLock源码分析

    2.1 ReadLock 源码分析

    2.1.1 lock方法

    /**
             * Acquires the read lock.
             *
             * <p>Acquires the read lock if the write lock is not held by
             * another thread and returns immediately.
             *
             * <p>If the write lock is held by another thread then
             * the current thread becomes disabled for thread scheduling
             * purposes and lies dormant until the read lock has been acquired.
             */
            public void lock() {
                sync.acquireShared(1);
            }

    sync.acquireShared方法存在于AbstractQueuedSynchronizer类中,

    /**
         * Acquires in shared mode, ignoring interrupts.  Implemented by
         * first invoking at least once {@link #tryAcquireShared},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquireShared} until success.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquireShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         */
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)    //@1
                doAcquireShared(arg);           //@2
        }

    根据常识,具体获取锁的过程在子类中实现,果不其然,tryAcquireShared方法在ReentrantReadWriterLock的Sync类中实现

    protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                Thread current = Thread.currentThread();    //@1 start
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;                                                     // @1 end
                int r = sharedCount(c);
                if (!readerShouldBlock() &&                          
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {    // @2
                    if (r == 0) {                                      //@21                               
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {  //@22
                        firstReaderHoldCount++;
                    } else {                                            // @23
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);      // @3
            }

    尝试获取共享锁代码解读:

    @1 start--end ,如果有线程已经抢占了写锁,并且不是当前线程,则直接返回-1,通过排队获取锁。

    @2,如果线程不需要阻塞,并且获取读锁的线程数没有超过最大值,并且使用 CAS更新共享锁线程数量成功的话;表示成获取读锁,然后进行内部变量的相关更新操作;先关注一下,成功获取读锁后,内部变量的更新操作:

    @21,如果r=0, 表示,当前线程为第一个获取读锁的线程

    @22,如果第一个获取读锁的对象为当前对象,将firstReaderHoldCount 加一

    @23,成功获取锁后,如果不是第一个获取多锁的线程,将该线程持有锁的次数信息,放入线程本地变量中,方便在整个请求上下文(请求锁、释放锁等过程中)使用持有锁次数信息。

    @3 在讲解代码@3之前,我们先重点分析@2处的第一个条件,是否需要阻塞方法:readerShouldBlock,在具体的子类中,现在查看的是NonfairSync中的方法:

    final boolean readerShouldBlock() {
                /* As a heuristic to avoid indefinite writer starvation,
                 * block if the thread that momentarily appears to be head
                 * of queue, if one exists, is a waiting writer.  This is
                 * only a probabilistic effect since a new reader will not
                 * block if there is a waiting writer behind other enabled
                 * readers that have not yet drained from the queue.
                 */
                return apparentlyFirstQueuedIsExclusive();   //该方法,具体又是在 AbstractQueuedSynchronizer中
            }
    /**
         * Returns {@code true} if the apparent first queued thread, if one
         * exists, is waiting in exclusive mode.  If this method returns
         * {@code true}, and the current thread is attempting to acquire in
         * shared mode (that is, this method is invoked from {@link
         * #tryAcquireShared}) then it is guaranteed that the current thread
         * is not the first queued thread.  Used only as a heuristic in
         * ReentrantReadWriteLock.
         */
        final boolean apparentlyFirstQueuedIsExclusive() {
            Node h, s;
            return (h = head) != null &&
                (s = h.next)  != null &&
                !s.isShared()         &&
                s.thread != null;
        }

    该方法如果头节点不为空,并头节点的下一个节点不为空,并且不是共享模式【独占模式,写锁】、并且线程不为空。则返回true,说明有当前申请读锁的线程占有写锁,并有其他写锁在申请。为什么要判断head节点的下一个节点不为空,或是thread不为空呢?因为第一个节点head节点是当前持有写锁的线程,也就是当前申请读锁的线程,这里,也就是锁降级的关键所在,如果占有的写锁不是当前线程,那线程申请读锁会直接失败。

    现在继续回到@3,讲解如果第一次尝试获取读锁失败后,该如何处理。首先,进入该方法的条件如下:

    • 没有写锁被占用时,尝试通过一次CAS去获取锁时,更新失败(说明有其他读锁在申请)。
    • 当前线程占有写锁,并且没有有其他写锁在当前线程的下一个节点等待获取写锁。;其实如果是这种情况,除非当前线程占有锁的下个线程取消,否则进入fullTryAcquireShared方法也无法获取锁。
    /**
             * Full version of acquire for reads, that handles CAS misses
             * and reentrant reads not dealt with in tryAcquireShared.
             */
            final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {                                     //@31
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                    } else if (readerShouldBlock()) {                             //@32
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {                              //@33
                            // assert firstReaderHoldCount > 0;
                        } else {                                                              //@34
                            if (rh == null) {
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != current.getId()) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                return -1;
                        }
                    }
                    if (sharedCount(c) == MAX_COUNT)                           
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {     // @35
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != current.getId())
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }

    代码@31,首先再次判断,如果当前线程不是写锁的持有者,直接返回-1,结束尝试获取读锁,需要排队去申请读锁。

    代码@32,如果需要阻塞,说明除了当前线程持有写锁外,还有其他线程已经排队在申请写锁,故,即使申请读锁的线程已经持有写锁(写锁内部再次申请读锁,俗称锁降级)还是会失败,因为有其他线程也在申请写锁,此时,只能结束本次申请读锁的请求,转而去排队,否则,将造成死锁。代码@34,就是从readHolds中移除当前线程的持有数,然后返回-1,结束尝试获取锁步骤(结束tryAcquireShared 方法)然后去排队获取。

    代码@33,因为,如果当前线程是第一个获取了写锁,那其他线程无法申请写锁(该部分在分析完,读写锁的队列机制后,才回来做更详细的解答。)

    代码@35,表示成功获取读锁,后续就是更新readHolds等内部变量,该部分在上文中已有讲解。如果是通过@35尝试获取锁成功,这就是写锁内部--》再次申请读锁(锁降级)的原理。

    至此,完成尝试获取锁步骤 tryAcquireShared 方法,我们再次回到 acquireShared,如果返回-1,那么需要排队申请,具体请看 doAcquireShared(arg);

       public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)    //@1
                doAcquireShared(arg);           //@2
        }
    /**
         * Acquires in shared uninterruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);   //@1
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { // @2,开始自旋重试
                    final Node p = node.predecessor();   // @3
                    if (p == head) {                                   // @4
                        int r = tryAcquireShared(arg);         
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);    //@5
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())                                              // @6
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    获取共享锁解读:

    代码@1,在队列尾部增加一个节点。锁模式为共享模式。

    代码@3,获取该节点的前置节点。

    代码@4,如果该节点的前置节点为head(头部),为什么前置节点是head时,可以再次尝试呢?在讲解ReentrantLock时,也讲过,head节点的初始化在第一次产生锁争用时初始化,刚开始初始化的head节点是不代表线程的,故可以尝试获取锁。如果获取失败,则将进入到shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法中,线程阻塞,等待被唤醒。

    重点分析一下获取锁后的操作:setHeadAndPropagate

    /**
         * Sets head of queue, and checks if successor may be waiting
         * in shared mode, if so propagating if either propagate > 0 or
         * PROPAGATE status was set.
         *
         * @param node the node
         * @param propagate the return value from a tryAcquireShared
         */
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below 
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0) {   // @1
                Node s = node.next;
                if (s == null || s.isShared())    // @2
                    doReleaseShared();          //@3
            }
        }
    
    /**
         * Sets head of queue to be node, thus dequeuing. Called only by
         * acquire methods.  Also nulls out unused fields for sake of GC
         * and to suppress unnecessary signals and traversals.
         *
         * @param node the node
         */
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
    
    /**
         * Release action for shared mode -- signal successor and ensure
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {   //@4
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {   //@5
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))   //@6
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed      //@7
                    break;
            }
        }

    释放共享锁的步骤:

    代码@1,如果读锁(共享锁)获取成功,或头部节点为空,或头节点取消,或刚获取读锁的线程的下一个节点为空,或在节点的下个节点也在申请读锁,则在CLH队列中传播下去唤醒线程,怎么理解这个传播呢,就是只要获取成功到读锁,那就要传播到下一个节点(如果一下个节点继续是读锁的申请,只要成功获取,就再下一个节点,直到队列尾部或为写锁的申请,停止传播)。具体请看doReleaseShared方法。

    代码@4,从队列的头部开始遍历每一个节点。

    代码@5,如果节点状态为 Node.SIGNAL,将状态设置为0,设置成功,唤醒线程。为什么会设置不成功,可能改节点被取消;还有一种情况就是有多个线程在运行该代码段,这就是PROPAGATE的含义吧,传播,请看代码@7的理解。

    代码@6,如果状态为0,则设置为Node.PROPAGATE,设置为传播,该值然后会在什么时候变化呢?在判断该节点的下一个节点是否需要阻塞时,会判断,如果状态不是Node.SIGNAL或取消状态,为了保险起见,会将前置节点状态设置为Node.SIGNAL,然后再次判断,是否需要阻塞。

    代码@7,如果处理过一次 unparkSuccessor 方法后,头节点没有发生变化,就退出该方法,那head在什么时候会改变呢?当然在是抢占锁成功的时候,head节点代表获取锁的节点。一旦获取锁成功,则又会进入setHeadAndPropagate方法,当然又会触发doReleaseShared方法,传播特性应该就是表现在这里吧。再想一下,同一时间,可以有多个多线程占有锁,那在锁释放时,写锁的释放比较简单,就是从头部节点下的第一个非取消节点,唤醒线程即可,为了在释放读锁的上下文环境中获取代表读锁的线程,将信息存入在 readHolds ThreadLocal变量中。

    到这里为止,读锁的申请就讲解完毕了,先给出如下流程图:

                                                                            尝试获取读锁过程

    从队列中获取读锁的流程如下:



    2.1.2 ReadLock 的 unlock方法详解

    public  void unlock() {
            sync.releaseShared(1);
    }
    //AbstractQueuedSynchronizer的  realseShared方法
    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    // ReentrantReadWriterLock.Sync tryReleaseShared
    protected final boolean tryReleaseShared(int unused) { 
                Thread current = Thread.currentThread();
                if (firstReader == current) {                               // @1 start               
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;                                                            // @1 end
                }
                for (;;) {                                                               // @2
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }
    
    AbstractQueuedSynchronizer的doReleaseShared
    /**
         * Release action for shared mode -- signal successor and ensure
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    锁的释放,比较简单,代码@1,主要是将当前线程所持有的锁的数量信息得到(从firstReader或cachedHoldCounter,或readHolds中获取 ),然后将数量减少1,如果持有数为1,则直接将该线程变量从readHolds ThreadLocal变量中移除,避免垃圾堆积。

    代码@2,就是在无限循环中将共享锁的数量减少一,在释放锁阶段,只有当所有的读锁,写锁被占有,才会去执行doReleaseShared 方法。

    2.2 WriterLock 源码分析

    2.2.1 lock方法详解

    public void lock() {
                sync.acquire(1);
            }
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    对上述代码是不是似曾相识,对了,在学习ReentrantLock时候,看到的一样,acquire是在AbstractQueuedSynchronizer中,关键是在 tryAcquire方法,是在不同的子类中实现的。那我们将目光移到ReentrantReadWriterLock.Sync中
    protected final boolean tryAcquire(int acquires) {
                /*
                 * Walkthrough:
                 * 1. If read count nonzero or write count nonzero
                 *    and owner is a different thread, fail.
                 * 2. If count would saturate, fail. (This can only
                 *    happen if count is already nonzero.)
                 * 3. Otherwise, this thread is eligible for lock if
                 *    it is either a reentrant acquire or
                 *    queue policy allows it. If so, update state
                 *    and set owner.
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {                                   // @1
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())                //@2
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)              
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);                                                             //@3
                    return true;
                }
                if (writerShouldBlock() ||                                                               
                    !compareAndSetState(c, c + acquires))                                   //@4
                    return false;
                setExclusiveOwnerThread(current);                                             //@5
                return true;
            }

    代码@1,如果锁的state不为0,说明有写锁,或读锁,或两种锁持有。

    代码@2,如果写锁为0,再加上c!=0,说明此时有读锁,自然返回false,表示只能排队去获取写锁;如果写锁不为0,如果持有写锁的线程不为当前线程,自然返回false,排队去获取写锁。

    代码@3,表示,当前线程持有写锁,现在是重入,所以只需要修改锁的额数量即可。

    代码@4,表示,表示通过一次CAS去获取锁的时候失败,说明被别的线程抢去了,也返回false,排队去重试获取锁。

    代码@5,成获取写锁后,将当前线程设置为占有写锁的线程。尝试获取锁方法结束。如果该方法返回false,则进入到acquireQueue方法去排队获取写锁,写锁的获取过程,与ReentrantLock获取方法一样,就不过多的解读了。

    读写锁的实现原理就分析到这了,走过路过的朋友,欢迎拍砖讨论。

    欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:

    1、源码分析RocketMQ专栏(40篇+)
    2、源码分析Sentinel专栏(12篇+)
    3、源码分析Dubbo专栏(28篇+)
    4、源码分析Mybatis专栏
    5、源码分析Netty专栏(18篇+)
    6、源码分析JUC专栏
    7、源码分析Elasticjob专栏
    8、Elasticsearch专栏
    9、源码分析Mycat专栏

    展开全文
  • 里面关于重入锁,特别AQS队列并没有提到,故借学习ReentrantLock源码几下这篇笔记。 2. ReentrantLock源码分析 2.1 类图 今天啰嗦一些,我们一步一步看: idea找到依赖的jdk下面rt.jar里面JUC包里面的ReentrantLock...

    1.可重入锁/AQS队列

    2. ReentrantLock源码分析

    2.1 类图

    今天啰嗦一些,我们一步一步看:

    • idea找到JUC包里面的ReentrantLock,右键如图:
      在这里插入图片描述
      -打开类图:
      在这里插入图片描述
    • 我们可以看到ReentrantLock里面有一个Sync抽象类,两个内部静态类NonfairSync和FairSync
    • 其中NonfairSync和FairSync是ReentrantLock公平锁和非公平锁的实现
    • 而Sync我们右键打开它的类图:
      在这里插入图片描述
    • Sync继承于AbstractQueuedSynchronizer
    • 而AbstractQueuedSynchronizer里面维护了一个以Node为节点的AQS队列。
    • AQS队列,就是是ReentrantLock的核心。

    2.2 AQS队列

    • 追根溯源,AQS队列是由AbstractQueuedSynchronizer维护
    • 而AbstractQueuedSynchronizer由继承于AbstractOwnableSynchronizer
    • 我们先看AbstractOwnableSynchronizer源码:
    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        protected AbstractOwnableSynchronizer() { }
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    
    
    • exclusiveOwnerThread从名字就能看出来独家拥有线程,也就是独占模式锁的拥有者。
    • 而AQS定义两种资源共享方式:

    1,Exclusive(独占,只有一个线程能执行,ReentrantLock使用该模式)

    2,Share(共享,多个线程可同时执行,Semaphore/CountDownLatch使用该模式)。

    • AQS队列底层其实就是链表,而在AbstractQueuedSynchronizer中链表节点是有内部类Node来充当,在Node里有这么两行代码:
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
    
    
    • exclusiveMarker to indicate a node is waiting in shared mode
      共享模式:CountDownLatch使用该模式,详细可见:【JUC】CountDownLatch源码分析
    • Marker to indicate a node is waiting in exclusive mode
      独占模式:ReentrantLock使用该模式,今天我们聊的就是这个
    • 再看一下AbstractQueuedSynchronizer的核心成员:
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        private static final long serialVersionUID = 7373984972572414691L;
      
        protected AbstractQueuedSynchronizer() { }
    
       //队列节点
        static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
            
            /** waitStatus value to indicate thread has cancelled */ 线程已经被取消,该状态的节点不会再次阻塞。
    		static final int CANCELLED =  1;
    		/** waitStatus value to indicate successor's thread needs unparking */ 线程需要去被唤醒
    		static final int SIGNAL    = -1;
    		/** waitStatus value to indicate thread is waiting on condition */ 线程正在唤醒等待条件
    		static final int CONDITION = -2;
    		/**
    		 * waitStatus value to indicate the next acquireShared should //线程的共享锁应该被无条件传播
    		 * unconditionally propagate
    		 */
    		static final int PROPAGATE = -3;
            
            //上面四个值就是下面变量waitStatus的值
            volatile int waitStatus;
            //前一个节点
            volatile Node prev;
            //下一个节点
            volatile Node next;
            //当前节点代表的线程
            volatile Thread thread;
            /**
            *等待节点的后继节点。如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占和共享)和
            *等待队列中的后继节点共用一个字段。(注:比如说当前节点A是共享的,那么它的这个字段是shared,也就是说在这个等
            *待队列中,A节点的后继节点也是shared。如果A节点不是共享的,那么它的nextWaiter就不是一个SHARED常量,即是独
            *占的。
            */
            Node nextWaiter;
    }
        //头结点
        private transient volatile Node head;
         //尾节点
        private transient volatile Node tail;
        //状态值
        private volatile int state;
    
    • 如果了解CLH队列的话你会感觉眼熟,因为AQS队列就是它的一个变体,至于CLH队列是什么,看这里:【锁】自旋锁-MCS/CLH队列
    • AbstractQueuedSynchronizer的这些成员中,volatile修饰的state是重点,volatile关键字保证了线程间可见性。具体可见:【JUC】volatile关键字相关整理
    • ReentrantLock加锁记录就保存在state中,state默认为0,表示没有被加锁,每当线程请求一个锁,state加1,state=1表示加锁成功,state>1表示锁重入,详细后面看源码聊。
    • 在多线程并发请求锁时,采用CAS修改state的值,修改成功则获取锁成功,修改失败则加入到AQS等待队列尾部,至于什么是CAS,可见:【JUC】 Java中的CAS
    • 另一个要注意的AbstractQueuedSynchronizer里Node中的状态属性waitStatus,它默认为零,还有四个值见上面Node代码,它的值会被后置结点在获取锁失败后阻塞前修改,用于提醒你在释放锁后去唤醒它,具体详细情况后面源码聊。
    • AQS结构图
      在这里插入图片描述
    • AQS队列的头结点并不关联任何线程,他是一个默认的Node节点。

    2.3 开始撸代码

    • 先看构造函数:
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    2.3.1先搞默认的非公平锁,NonfairSync类的lock方法:

    static final class NonfairSync extends Sync {
            final void lock() {
                if (compareAndSetState(0, 1))      //1
                    setExclusiveOwnerThread(Thread.currentThread());  //2
                else
                    acquire(1);  //3
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);   //4
            }
        }
    
    public final void acquire(int arg) {//5
            if (!tryAcquire(arg) &&  //6
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))   //7
                selfInterrupt();   //8
        }
    
    • 总体流程如下:

    1,CAS原子性的修改AbstractQueuedSynchronizer#state的值,由0改为1,成功则说明当前线程加锁成功.

    2,设置AbstractOwnableSynchronizer#exclusiveOwnerThread的值为当前线程,表示当前锁的拥有者是当前线程。

    3,如果1中修改失败,则进入acquire(1)。申请1个state,acquire方法中首先尝试获取锁tryAcquire(),如果获取失败,则将当前线程以独占模式Node.EXCLUSIVE加入等待队列尾部(addWaiter方法)。

    4,acquireQueued():以独占无中断模式获取锁,这个方法会一直无限循环,直到获取到资源或者被中断才返回。如果等待过程中被中断则返回true。这里有自旋锁的意思,加入队列中的线程,不断的重试检测是否可以执行任务。

    • 接下来一个一个方法撸源码:

    tryAcquire

    • tryAcquire的具体实现是在NonfairSync类中,然后调用其父类Sync 中的nonfairTryAcquire()方法。
    static final class NonfairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
        
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();//:1、获取volatile int state的值
        if (c == 0) {//2:state=0表示当前可以加锁
            if (compareAndSetState(0, acquires)) {//CAS将state设置为acquires的值
                setExclusiveOwnerThread(current);//设置当前拥有锁的线程
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {//当前锁的拥有者线程是currentThread
            int nextc = c + acquires;//将state累加上acquires
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);//设置state的值。由于这里只有获取锁的线程才能执行,所以不会出现并发,不需要额外的加锁处理
            //这也是ReentrantLock为什么是可重入锁的原因,同一个线程加多次锁(lock.lock)也就是给state的值累加而已。
            return true;
        }
        return false;//当前锁的拥有者线程不是currentThread,直接返回false,也就是获取锁失败
    }
    
    • nonfairTryAcquire的实现:如果当前没有锁,那么加锁。如果已经有了锁,那么看看当前锁的拥有者线程是不是currentThread,是则累加state的值,不是则返回失败。
    • 所以,使用ReentrantLock时,线程获得锁的标记是在state上的,state=0表示没有被加锁,state=1表示加锁成功,state>1表示锁重入。

    addWaiter和enq

    • 按照指定模式(独占还是共享)将节点添加到等待队列。
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        //1、首先尝试以快速方式添加到队列末尾
        Node pred = tail;//pred指向现有tail末尾节点
        if (pred != null) {
        //新加入节点的前一个节点是现有AQS队列的tail节点
            node.prev = pred;
          	//CAS原子性的修改tail节点
            if (compareAndSetTail(pred, node)) {    
                //修改成功,新节点成功加入AQS队列,pred节点的next节点指向新的节点
                pred.next = node;
                return node;
            }
        }
        //2、pred为空,或者修改tail节点失败,
        //则走enq方法将节点插入队列
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for(;;) {//CAS
            Node t = tail;
            if (t == null) { 
            // 必须初始化。这里是AQS队列为空的情况。
            //通过CAS的方式创建head节点,并且tail和head都指向
            //同一个节点。
                if (compareAndSetHead(new Node()))
                //注意这里初始化head节点,并不关联任何线程!!
                    tail = head;
            } else {
            //这里变更node节点的prev指针,并且移动tail指针指向node,
            //前一个节点的next指向新插入的node
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    • 上面addWaiter方法的第一行代码new Node(Thread.currentThread(), mode);,会创建一个node对象,该对象的重要属性值初始化为:
    nextWaiter = Node.EXCLUSIVE; // Node.EXCLUSIVE值为null
    thread = Thread.currentThread();
    waitStatus = 0;// 默认是0
    
    • addWaiter首先会以快速方式将node添加到队尾,如果失败则走enq方法。失败有两种可能,一个是tail为空,也就是AQS为空的情况下。另一是compareAndSetTail失败,也就是多线程并发添加到队尾,此时会出现CAS失败。
    • 注意enq方法,在t==null时,首先创建空的头节点,不关联任何的线程,nextWaiter和thread变量都是null。

    acquireQueued

    • tryAcquire失败没有获取到锁,addWaiter加入了AQS等待队列,进入acquireQueued方法中,acquireQueued方法以独占无中断模式获取锁,这个方法会一直无限循环,直到获取到资源或者被中断才返回。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//是否获取到资源
        try {
            boolean interrupted = false;//是否中断
            for (;;) {
                //获取前一个节点
                final Node p = node.predecessor();
    
    			//如果当前node节点是第二个节点,紧跟在head后面,
    			//那么tryAcquire尝试获取资源
                if (p == head && tryAcquire(arg)) {                
                    setHead(node);//获取锁成功,当前节点成为head节点
                    p.next = null; // 目的:辅助GC
                    failed = false;
                    return interrupted;//返回是否中断过
                }
                
                //当shouldParkAfterFailedAcquire返回成功,
                //也就是前驱节点是Node.SIGNAL状态时,
                //进行真正的park将当前线程挂起,并且检查中断标记,
                //如果是已经中断,则设置interrupted =true。
                //如果shouldParkAfterFailedAcquire返回false,
                //则重复上述过程,直到获取到资源或者被park。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);//添加AQS失败,取消任务
        }
    }
    
    //前面讲过,head节点不与任何线程关联,他的thread是null,
    //当然head节点的prev肯定也是null
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    //在Acquire失败后,是否要park中断
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws= pred.waitStatus;//获取到上一个节点的waitStatus
        if (ws == Node.SIGNAL)//前面讲到当一个节点状态时SIGNAL时,
        //他有责任唤醒后面的节点。所以这里判断前驱节点是SIGNAL状态,
        //则可以安心的park中断了。
            return true;
        if (ws > 0) {
            /*
             * 过滤掉中间cancel状态的节点
             * 前驱节点被取消的情况(线程允许被取消哦)。向前遍历,
             * 直到找到一个waitStatus大于0的(不是取消状态或初始状态)
             * 的节点,该节点设置为当前node的前驱节点。
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 修改前驱节点的WaitStatus为Node.SIGNAL。
             * 明确前驱节点必须为Node.SIGNAL,当前节点才可以park 
             * 注意,这个CAS也可能会失败,因为前驱节点的WaitStatus状态
             * 可能会发生变化
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    //阻塞当前线程
    //park并且检查是否被中断过
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    

    cancelAcquire

    • acquireQueued方法在出现异常时,会执行cancelAcquire方法取消当前node的acquire操作。
    private void cancelAcquire(Node node) {
        if (node == null)
            return;
    
        node.thread = null;
    
        // 跳过中间CANCELLED状态的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        Node predNext = pred.next;
    
        // 将node设置为CANCELLED状态
        node.waitStatus = Node.CANCELLED;
    
        // 如果当前节点是tail节点,则直接移除
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {//如果pred不是head节点并且是SIGNAL 状态,
                //或者可以设置为SIGNAL 状态,
                //那么将pred的next设置为node.next,也就是移除当前节点
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);//唤醒node的后继节点
            }
    
            node.next = node; // help GC
        }
    }
    private void unparkSuccessor(Node node) {
        //如果waitStatus为负数,则将其设置为0(允许失败)
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        //唤醒当前节点后面的节点。通常是紧随的next节点,
        //但是当next被取消或者为空,则从tail到node之间的所有节点,
        //往后往前查找直到找到一个waitStatus <=0的节点,将其唤醒unpark
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    • 总结一下:

    1、设置thread变量为空,并且设置状态为canceled

    2、跳过中间的已经被取消的节点

    3、如果当前节点是tail节点,则直接移除。否则:

    4、如果其前驱节点不是head节点并且(前驱节点是SIGNAL状态,或者可以被设置为SIGNAL状态),那么将当前节点移除。否则通过LockSupport.unpark()唤醒node的后继节点

    获取非公平锁过程总结
    在这里插入图片描述
    2.3.2 公平锁加锁

    • 公平锁与非公平锁的区别就在于这里,在tryAcquire方法中,首先会检查是否有任何线程等待获取的时间长于当前线程。
    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                  //这里,先会检查是否有任何线程等待获取的时间长于当前线程。
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
    • 就是看看AQS队列是否为空,如果不为空,那么head的下一个节点是否为当前请求的线程,如果不是,说明前面有其他线程排队,当前线程应该加入等待队列中。

    2.4 最后,释放锁

     public void unlock() {
            sync.release(1);
        }
        
    public final boolean release(int arg) {
        if (tryRelease(arg)) {//尝试释放资源  state
            Node h = head;
            if (h != null && h.waitStatus != 0)//如果AQS不为空,并且头节点的waitStatus不是0,之前在shouldParkAfterFailedAcquire方法内设置成了-1
                unparkSuccessor(h);//unpark后继节点
            return true;
        }
        return false;
    }
    
    //这里不需要加锁,因为只有获取锁的线程才会来释放锁,
    //所以这里直接将state减去releases即可
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        //
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)//注意这里是从AQS队列的尾节点开始查找的,
            //找到最后一个 waitStatus<=0 的那个节点,将其唤醒。
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    • 彻底释放完资源(state=0)后,会去唤醒AQS队列中的一个等待节点,该节点查找顺序为从AQS队列的尾节点开始查找的,找到最后一个 waitStatus<=0 的那个节点,通过LockSupport.unpark将其唤醒。

    【完】

    展开全文
  • 今天静下心学习了看了看重入锁的相关源码,在看这篇文章之前希望你可以对ASQ有一定的了解,因为AQS是实现重入锁,读写等等的关键,这是自己的学习AQS的链接Java并发AQS队列同步器源码学习笔记,如果不想看的话可以...

    前言

    前几天学习了解了AQS也叫列同步器AbtstractQueuedSynchronizer的相关源码,今天静下心学习了看了看重入锁的相关源码,在看这篇文章之前希望你可以对ASQ有一定的了解,因为AQS是实现重入锁,读写锁等等的关键,这是自己的学习AQS的链接Java并发AQS队列同步器源码学习笔记,如果不想看的话可以看这篇Java并发之AQS详解,写的很好。在这篇文章中我主要讲一下对重入锁主要方法源码的理解。

    接下来我们直接看源码吧,首先我们要理清ReentrantLock类的结构,看它的里面有哪些内部类,继承了谁,实现了什么方法,希望你在看这篇文章的时候可以简单的写这几行代码一步一步点进去,看看每一步都调用了什么方法。

    public class MyThread {
        public static void main(String[] args){
            ReentrantLock lock = new ReentrantLock();
            lock.tryLock();
            lock.unlock();
        }
    }

    创建对象首先要调用构造方法,我们点进ReentrantLock。

    构造方法

    可以看出重入锁ReentrantLock提供了两种构造器,默认的为非公平方法。

        public ReentrantLock() {
            sync = new NonfairSync();
        }
        
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }

    那构造器中的new的NonfairSync()对象或者fairSync()对象又是什么呢?我们接着看源码,顺着点进去NonfairSync(),看看这是个什么东东?fairSync()也是大同小异。

    NonfairSync

    可以看出NonfairSyn是重入锁ReentrantLock的静态内部类继承于Sync,至于里面的两个方法lock()、tryAcquire()后面会讲到,那么Sync又是干什么的呢?我们接着看。

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }

    Sync

    可以看出Sync也是重入锁的内部静态类,而且还是抽象的,继承了AbstractQueuedSynchronizer,也就是AQS的同步器,所以说最好对AQS有点了解最好。我们接着看,对于Sync类的代码我只保留了3个重要方法,而且可以观察到上面提到的NonfairSync中的lock方法就是实现Sync的lock方法。而NonfairSync中的tryAcquire()直接调用了它爹Sync的nonfairTryAcquire,是不是很真实。。。

        abstract static class Sync extends AbstractQueuedSynchronizer {
            
            abstract void lock();
    
            final boolean nonfairTryAcquire(int acquires) {
                ...方法体省略
            }
    
            protected final boolean tryRelease(int releases) {
                ...方法体省略
            }
        }

    好了现在起码我们将重入锁中的结构理的差不多了,接着我们看一下它的tryLock()方法,我们继续一步一步点进去看。

    tryLock()

    这里sync是Sync的实例?明显是不对的,刚才不是说了吗,Sync是ReentrantLock的静态抽象内部类,不可能实例化实例化对象的,其实这里只是一个句柄引用,使用父类来引用子类实例对象,你回头去看一下刚开始初始化ReentrantLock的构造器方法就知道了。那既然知道sync是NonfairSync的实例,那么这就显而易见了,重入锁ReentrantLock的tryLock()是调用了NonfairSync的nonfairTryAcquire(),可是你可能发现这又不对了,这NonfairSync没有这个方法啊,但是你一点发现是它爹Sync的方法,也就是实例化的对象是NonfairSync,句柄是它爹,用的是它爹的方法,一切还是没啥毛病。

        private final Sync sync;
    
        public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }

    终于到了tryAcquire()方法的核心实现了nonfairTryAcquire(int acquires)方法了。

    nonfairTryAcquire(int acquires)

    注释写的很详细了。我说一下核心思想,重入锁,顾名思义就是支持重入的锁,就是表示该锁能支持一个线程对资源进行重入加锁。那么这个方法就是支持重入的实现。可以看出核心就是维护一个private volatile int state变量,这是同步器的同步状态。在线程不持有锁的情况下是0,你看就是调用一次tryLock,就是调用一次nonfairTryAcquire(1),也就是当前线程在持有锁的情况下将state+1。

        // 非公平方法获得锁
        final boolean nonfairTryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 火获得当前线程状态
            int c = getState();
            // 判断当前线程状态是否为0
            if (c == 0) {
                // 为0就表示没有持有锁,采用CAS方法将线程状态设置为acquires,也就是1,这是第一次加锁获取资源
                if (compareAndSetState(0, acquires)) {
                    // 如果设置成功,将当前线程设置为独占式访问。
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 否则判断当前线程是否占有锁
            else if (current == getExclusiveOwnerThread()) {
                // 将当前线程状态+ acquires,也就是+1,核心就在这儿
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 否则更新同步器状态,返回true
                setState(nextc);
                return true;
            }
            return false;
        }

    接着我们看非公平方式释放锁unlock()

    unlock()

    可以看出调用了release()方法,而点进去可以看到release()方法时AbstractQueuedSynchronizer的方法,也就是Sync它爹的方法。所以说必须得了解AQS队列同步器,再次附上这位大佬的文章Java并发之AQS详解

        public void unlock() {
            sync.release(1);
        }

    我们接着点

    release(int arg)

    这里面涉及到tryRelease(arg)和unparkSuccessor(Node node)两个方法,第二个方法属于AQS的方法,具体请拜读上面大佬的文章。我们来说说第一个方法。

        public final boolean release(int arg) {
            // 如果成功释放锁
            if (tryRelease(arg)) {
                Node h = head;// 找到头结点
                if (h != null && h.waitStatus != 0)
                    // 唤醒等待队列里的下一个线程
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    我们接着点tryRelease()方法,这是锁释放的关键。

    tryRelease(int releases)

    注释很详细了,我说一下核心思想,既然上面已经提到可以多次获取锁,每次同步器状态state+1,那么现在释放锁是不是也得每调用一次unlock()也得让同步器状态-1呢?确实如此,当state=0时,表示锁已经成功释放,接着执行release()方法中的后续进程,即唤醒等待队列中的下一个线程。

        //独占式的获取同步状态
        protected final boolean tryRelease(int releases) {
            // 将当前线程状态- releases
            int c = getState() - releases;
            // 判断当前线程是否是setExclusiveOwnerThread方法最后一次设置的线程
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            // 是否成功释放标志位
            boolean free = false;
            // 判断线程状态是否为0,为0表示锁已经被成功释放
            if (c == 0) {
                free = true;// 标志位置为true
                // 独占式访问线程设置为null
                setExclusiveOwnerThread(null);
            }
            // 设置线程状态
            setState(c);
            // 返回释放锁是否成功
            return free;
        }

    到这里ReentrantLock的trylock()和unlock()已经讲完了,这里讲的是ReentrantLock默认构造器的两个方法,也就是非公平的方式获取锁,公平的方法获取锁的trylock()方法与非公平的方式相比,只有tryAcquire()中下面这点不同,它会判断当前线程所在节点是否有前驱节点,如果有,只有等待前驱节点获得并且释放锁之后才能获得。

    非公平方式的tryAcquire(int acquires) 

        // 公平方式获得锁
        protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    // hasQueuedPredecessors()判断同步队列中当前节点是否有前驱节点,如果有返回true
                    // 表示有比当前线程更早请求获得锁,需要前驱线程获得并释放锁后才能获得锁。
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
        }

    至于两种方式的lock()方法,都是调用父类AQS的acquire()方法来实现的,具体参考AQS的源码解读Java并发AQS队列同步器源码学习笔记,这里提一下大致流程。这里说一下非公平方式的lock()方法,公平方式的大同小异。

        final void lock() {
            // 在同步器状态为0时,采用CAS的方法尝试设置同步器状态为1
            if (compareAndSetState(0, 1))
                // 如果成功,将当前线程设置为独占访问状态
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 走到这里就说明已经不是第一次获得锁了,调用acquire()方法
                acquire(1);
        }

    我们接着看acquire()方法 

    acquire(int arg)

    这个方法是ReentrantLock内部Sync的爹AQS的方法,这个方法比较复杂,详细还是建议您去看AQS,这里只提一下tryAcquire(arg)方法,因为后面的方法在AQS里已经实现,只有tryAcquire(arg)这个方法留给子类让子类来实现,结果它的儿子Sync直接就没有重写,还是抽象的,而Sync将重写的这个方法的任务交给了它的两个儿子,NonfairSync和FairSync,这两个内部类重写了这个方法。而这个方法在前面已经介绍过了,就不在赘述了。

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    总结

    重入锁的实现的思想很简单,就是维护一个volatile int state的同步器状态,在线程获取锁之后可以多次获得锁,每重入一次,状态+1,而在释放锁的时候每释放一次同步状态就-1,当减到0时,就表示锁已经成功释放。至于非公平方式,就是在获得锁的时候判断当前节点之前是否有前驱节点,如果有,只有等待前驱节点获得并且释放锁之后才能获得。释放锁的方式公平和非公平方式都是一样的。

     

     

     

    参考文献

    [1] Java并发编程艺术

     

     

    展开全文
  • 重入锁和读写源码分析
  • 文章目录ReentrantLock的介绍重入性的实现原理公平与非公平 ReentrantLock的介绍 ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个,支持重入性,表示能够对共享资源能够...
  • Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
  • 心得:相较于JDK 1.7,Java 8中的HashMap有了较大的性能提升。修改了hash和resize方式,增加了红黑树的支持。学习参考资料: (1)疫苗:Java HashMap的死循环;1. HashMap要点(1)结构特点:Java中的HashMap是...
  • 我们就开始对这几行关键的代码进行分析,打好断点,debug调试,是分析源码学习源码的一个好的方法,OK,让我们开始一场浪漫绚丽的源码探索之旅,redisson的源码写的很漂亮哦。  首先,我们主要去关注加锁的过程...
  • 学习Java8的String源码以及相关问题的总结 1. String的定义 2. 字段属性 3. 构造函数 4. 长度和是否为空函数 5. charAt、codePointAt类型函数 6. getChar、getBytes类型函数 7. equal类函数(是否相等)...
  • 本原创入门教程,涵盖ZooKeeper核心内容,通过实例和大量图表,结合实战,帮助学习者理解和运用,任何问题欢迎留言。 目录: zookeeper介绍与核心概念 安装和使用 ZooKeeper分布式实现 ZooKeeper框架Curator...
  • 手写一个ReentrantLock

    2020-02-01 18:18:07
    最近学习Java语言中相关知识,看了一下ReentrantLock源码,自己手写了一个ReentrantLock。 ReentrantLock是一个可重入锁,并且在源码中通过构造函数可以使其在公平和非公平之间转换。 可重入锁即当前线程...
  • 本文以公平与非公平的加锁缩成为主线,分析整个加锁过程。准备知识简介ReentrantLock类图: NonfairSync继承关系: Node结点:作为获取失败线程的包装类, 组合了Thread引用, 实现为FIFO双向队列...
  • Java 并发源码合集

    2018-04-11 08:55:23
    【死磕Java并发】—– 深入分析synchronized的实现原理【死磕Java并发】—– 深入分析volatile的实现原理【死磕Java并发】—– Java内存模型之happens-before【死磕Java并发】—– Java内存模型之排序【死磕Java...
  • Java 读写 ReentrantReadWriteLock 源码分析 转自:https://www.javadoop.com/post/reentrant-read-write-lock#toc5 本文内容:读写 ReentrantReadWriteLock 的源码分析,基于 Java7/Java8。 阅读建议:虽然...
  • LinkedBlockingQueue是java concurrent包提供的另一个多线程安全的阻塞队列,与ArrayBlockingQueu相比,此队列的使用链表实现(不熟悉链表的同学,请查阅大学的数据结构课本),可以提供高效的并发读写性能。...
  • Java自旋

    2018-12-06 19:56:52
    自旋的由来  计算机系统资源总是有限的,有些资源需要互斥访问,因此就有了机制,只有获得的线程才能访问资源。保证了每次只有一个线程可以访问资源。当线程申请一个已经被其他线程占用的,就会出现两种...
  • HashTable是什么 HashTable是基于哈希表的Map接口的同步实现 HashTable中元素的key是唯一的,value值可重复 HashTable中元素的key和value不允许为null,如果遇到null,则返回NullPointerException ...
  • 修改JVM源码,控制抢占的线程的优先级
1 2 3 4 5 ... 20
收藏数 33,845
精华内容 13,538