aqs_安全审计 - CSDN
精华内容
参与话题
  • JAVA并发编程: CAS和AQS

    万次阅读 多人点赞 2019-11-19 09:40:32
    说起JAVA并发编程,就不得不聊聊CAS(Compare And Swap)和AQS了(AbstractQueuedSynchronizer)。 CAS(Compare And Swap) 什么是CAS CAS(Compare And Swap),即比较并交换。是解决多线程并行...

    说起JAVA并发编程,就不得不聊聊CAS(Compare And Swap)和AQS了(AbstractQueuedSynchronizer)。


    CAS(Compare And Swap)

    什么是CAS

    CAS(Compare And Swap),即比较并交换。是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。

    在JAVA中,sun.misc.Unsafe 类提供了硬件级别的原子操作来实现这个CAS。 java.util.concurrent 包下的大量类都使用了这个 Unsafe.java 类的CAS操作。至于 Unsafe.java 的具体实现这里就不讨论了。

    CAS典型应用

    java.util.concurrent.atomic 包下的类大多是使用CAS操作来实现的(eg. AtomicInteger.java,AtomicBoolean,AtomicLong)。下面以 AtomicInteger.java 的部分实现来大致讲解下这些原子类的实现。

    public class AtomicInteger extends Number implements java.io.Serializable {
        private static final long serialVersionUID = 6214790243416807050L;
    
        // setup to use Unsafe.compareAndSwapInt for updates
        private static final Unsafe unsafe = Unsafe.getUnsafe();
    
        private volatile int value;// 初始int大小
        // 省略了部分代码...
    
        // 带参数构造函数,可设置初始int大小
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }
        // 不带参数构造函数,初始int大小为0
        public AtomicInteger() {
        }
    
        // 获取当前值
        public final int get() {
            return value;
        }
    
        // 设置值为 newValue
        public final void set(int newValue) {
            value = newValue;
        }
    
        //返回旧值,并设置新值为 newValue
        public final int getAndSet(int newValue) {
            /**
            * 这里使用for循环不断通过CAS操作来设置新值
            * CAS实现和加锁实现的关系有点类似乐观锁和悲观锁的关系
            * */
            for (;;) {
                int current = get();
                if (compareAndSet(current, newValue))
                    return current;
            }
        }
    
        // 原子的设置新值为update, expect为期望的当前的值
        public final boolean compareAndSet(int expect, int update) {
            return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
        }
    
        // 获取当前值current,并设置新值为current+1
        public final int getAndIncrement() {
            for (;;) {
                int current = get();
                int next = current + 1;
                if (compareAndSet(current, next))
                    return current;
            }
        }
    
        // 此处省略部分代码,余下的代码大致实现原理都是类似的
    }
    
    

    一般来说在竞争不是特别激烈的时候,使用该包下的原子操作性能比使用 synchronized 关键字的方式高效的多(查看getAndSet(),可知如果资源竞争十分激烈的话,这个for循环可能换持续很久都不能成功跳出。不过这种情况可能需要考虑降低资源竞争才是)。
    在较多的场景我们都可能会使用到这些原子类操作。一个典型应用就是计数了,在多线程的情况下需要考虑线程安全问题。通常第一映像可能就是:

    public class Counter {
        private int count;
        public Counter(){}
        public int getCount(){
            return count;
        }
        public void increase(){
            count++;
        }
    }
    

    上面这个类在多线程环境下会有线程安全问题,要解决这个问题最简单的方式可能就是通过加锁的方式,调整如下:

    public class Counter {
        private int count;
        public Counter(){}
        public synchronized int getCount(){
            return count;
        }
        public synchronized void increase(){
            count++;
        }
    }
    

    这类似于悲观锁的实现,我需要获取这个资源,那么我就给他加锁,别的线程都无法访问该资源,直到我操作完后释放对该资源的锁。我们知道,悲观锁的效率是不如乐观锁的,上面说了Atomic下的原子类的实现是类似乐观锁的,效率会比使用 synchronized 关系字高,推荐使用这种方式,实现如下:

    public class Counter {
        private AtomicInteger count = new AtomicInteger();
        public Counter(){}
        public int getCount(){
            return count.get();
        }
        public void increase(){
            count.getAndIncrement();
        }
    }
    

    AQS(AbstractQueuedSynchronizer)

    什么是AQS

    AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果你有看过类似 CountDownLatch 类的源码实现,会发现其内部有一个继承了 AbstractQueuedSynchronizer 的内部类 Sync 。可见 CountDownLatch 是基于AQS框架来实现的一个同步器.类似的同步器在JUC下还有不少。(eg. Semaphore )

    AQS用法

    如上所述,AQS管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)

     To use this class as the basis of a synchronizer, redefine the
     * following methods, as applicable, by inspecting and/or modifying
     * the synchronization state using {@link #getState}, {@link
     * #setState} and/or {@link #compareAndSetState}:
     *
     * <ul>
     * <li> {@link #tryAcquire}
     * <li> {@link #tryRelease}
     * <li> {@link #tryAcquireShared}
     * <li> {@link #tryReleaseShared}
     * <li> {@link #isHeldExclusively}
     * </ul>
    

    如JDK的文档中所说,使用AQS来实现一个同步器需要覆盖实现如下几个方法,并且使用getState,setState,compareAndSetState这几个方法来设置获取状态

    1. boolean tryAcquire(int arg)
    2. boolean tryRelease(int arg)
    3. int tryAcquireShared(int arg)
    4. boolean tryReleaseShared(int arg)
    5. boolean isHeldExclusively()

    以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法,支持独占(排他)获取锁的同步器应该实现tryAcquiretryReleaseisHeldExclusively而支持共享获取的同步器应该实现tryAcquireSharedtryReleaseSharedisHeldExclusively。下面以 CountDownLatch 举例说明基于AQS实现同步器, CountDownLatch 用同步状态持有当前计数,countDown方法调用 release从而导致计数器递减;当计数器为0时,解除所有线程的等待;await调用acquire,如果计数器为0,acquire 会立即返回,否则阻塞。通常用于某任务需要等待其他任务都完成后才能继续执行的情景。源码如下:

    public class CountDownLatch {
        /**
         * 基于AQS的内部Sync
         * 使用AQS的state来表示计数count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                // 使用AQS的getState()方法设置状态
                setState(count);
            }
    
            int getCount() {
                // 使用AQS的getState()方法获取状态
                return getState();
            }
    
            // 覆盖在共享模式下尝试获取锁
            protected int tryAcquireShared(int acquires) {
                // 这里用状态state是否为0来表示是否成功,为0的时候可以获取到返回1,否则不可以返回-1
                return (getState() == 0) ? 1 : -1;
            }
    
            // 覆盖在共享模式下尝试释放锁
            protected boolean tryReleaseShared(int releases) {
                // 在for循环中Decrement count直至成功;
                // 当状态值即count为0的时候,返回false表示 signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        // 使用给定计数值构造CountDownLatch
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        // 让当前线程阻塞直到计数count变为0,或者线程被中断
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        // 阻塞当前线程,除非count变为0或者等待了timeout的时间。当count变为0时,返回true
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        // count递减
        public void countDown() {
            sync.releaseShared(1);
        }
    
        // 获取当前count值
        public long getCount() {
            return sync.getCount();
        }
    
        public String toString() {
            return super.toString() + "[Count = " + sync.getCount() + "]";
        }
    }
    
    

    AQS实现原理浅析

    AQS的实现主要在于维护一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。队列中的每个节点是对线程的一个封装,包含线程基本信息,状态,等待的资源类型等。

    CLH结构如下:

     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
    

    下面简单看下获取资源的代码:

        public final void acquire(int arg) {
            // 首先尝试获取,不成功的话则将其加入到等待队列,再for循环获取
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
        
        // 从clh中选一个线程获取占用资源
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 当节点的先驱是head的时候,就可以尝试获取占用资源了tryAcquire
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        // 如果获取到资源,则将当前节点设置为头节点head
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 如果获取失败的话,判断是否可以休息,可以的话就进入waiting状态,直到被unpark()
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
      
       private Node addWaiter(Node mode) {
            // 封装当前线程和模式为新的节点,并将其加入到队列中
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }  
        
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { 
                    // tail为null,说明还没初始化,此时需进行初始化工作
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 否则的话,将当前线程节点作为tail节点加入到CLH中去
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    详细参考:Java并发之AQS详解


    本文大致就讲了这些东西,有些地方说的也不是特别好。也有不全的地方,AQS的东西还是有不少的,建议大家自己去看JUC下的各个类的实现,配合《JAVA并发编程实战》这本书,相信是可以看明白的,从而得到更深刻的理解。


    个人博客: Vioao’s Blog

    展开全文
  • AQS详解

    千次阅读 多人点赞 2018-11-22 16:47:47
    AQS简介 在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义。 AQS 则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。 AQS的核心...

    AQS简介

    • 在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义。
    • AQS 则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。
    • AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法。在这里插入图片描述

    同步队列

    当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。就数据结构而言,队列的实现方式无外乎两者:一是通过数组的形式,另外一种则是链表的形式。AQS中的同步队列则是通过链式方式进行实现。 接下来,很显然我们至少会抱有这样的疑问:1. 节点的数据结构是什么样的?2. 是单向还是双向?3. 是带头结点的还是不带头节点的?

    在AQS有一个静态内部类Node,这是我们同步队列的每个具体节点。在这个类中有如下属性:

    volatile int waitStatus; // 节点状态
    volatile Node prev; // 当前节点的前驱节点 
    volatile Node next; // 当前节点的后继节点 
    volatile Thread thread; // 当前节点所包装的线程对象 
    Node nextWaiter; // 等待队列中的下一个节点
    

    节点的状态如下:

    • int INITIAL = 0; // 初始状态
    • int CANCELLED = 1; // 当前节点从同步队列中取消
    • int SIGNAL = -1; // 后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继 节点的线程继续运行。
    • int CONDITION = -2; // 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了 signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中。
    • int PROPAGATE = -3; // 表示下一次共享式同步状态获取将会无条件地被传播下去。

    现在我们知道了节点的数据结构类型,并且每个节点拥有其前驱和后继节点,很显然这是一个带头结点双向链表

    也就是说AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队 列中的线程进行通知等核心方法
    在这里插入图片描述

    独占锁的获取

    调用lock()方法是获取独占锁,获取锁失败后调用AQS提供的acquire(int arg)模板方法将当前线程加入同步队列,成功则线程执行。来看ReentrantLock源码

    final void lock() {    
    	if (compareAndSetState(0, 1))        
    		setExclusiveOwnerThread(Thread.currentThread());   
    	else       
    		acquire(1); 
    }
    

    lock方法使用CAS来尝试将同步状态改为1,如果成功则将同步状态持有线程置为当前线程。否则将调用AQS提供的 acquire()方法。

    public final void acquire(int arg) {
     	    // 再次尝试获取同步状态,如果成功则方法直接返回    
     	    // 如果失败则先调用addWaiter()方法再调用acquireQueued()方法
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    

    tryAcquire(arg):再次尝试获取同步状态,成功直接方法退出,失败调用addWaiter();
    addWaiter(Node.EXCLUSIVE), arg):将当前线程以指定模式(独占式、共享式)封装为Node节点后置入同步队列

    private Node addWaiter(Node mode) {
        	// 将线程以指定模式封装为Node节点
            Node node = new Node(Thread.currentThread(), mode);
            // 获取当前队列的尾节点
            Node pred = tail;
            // 若尾节点不为空
            if (pred != null) {
                node.prev = pred;
                // 使用CAS将当前节点尾插到同步队列中
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    // CAS尾插成功,返回当前Node节点
                    return node;
                }
            }
            // 尾节点为空 || CAS尾插失败
            enq(node);
            return node;
        }
    

    分析上面的注释。程序的逻辑主要分为两个部分:1. 当前同步队列的尾节点为null,调用方法enq()插入;2. 当前队列的尾节点不为null,则采用尾插入(compareAndSetTail()方法)的方式入队。另外还会有另外一个问题: 如果 if (compareAndSetTail(pred, node))为false怎么办?会继续执行到enq()方法,同时很明显compareAndSetTail() 是一个CAS操作,通常来说如果CAS操作失败会继续自旋(死循环)进行重试。

    因此,经过我们这样的分析,enq()方法可能承担两个任务:

    1. 处理当前同步队列尾节点为null时进行入队操作;
    2. 如果CAS尾插入节点失败后负责自旋进行尝试
    private Node enq(final Node node) {
        	// 直到将当前节点插入同步队列成功为止
            for (;;) {
                Node t = tail;
                // 初始化同步队列
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                	// 不断CAS将当前节点尾插入同步队列中
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
           }
    }
    

    在上面的分析中我们可以看出在第1步中会先创建头结点,说明同步队列是带头结点的链式存储结构。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么?自然而然是在tail==null时,即当前线程是第一次插入同步队列compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在for (;;)死循环中不断尝试,直至成功return返回为止。

    因此,对enq()方法可以做这样的总结:

    1. 在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列头结点的初始化;
    2. 自旋不断尝试CAS尾插入节点直至成功为止。

    现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了。那么紧接着会有下一个问题:在同步队列中的节点(线程)会做什么事情来保证自己能够有机会获得独占式锁?

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                // 自旋
                for (;;) {
                	// 获取当前节点的前驱节点
                    final Node p = node.predecessor();
                    // 获取同步状态成功条件
                    // 前驱节点为头结点并且获取同步状态成功
                    if (p == head && tryAcquire(arg)) {
                    	// 将当前节点设置为头结点
                        setHead(node);
                        // 删除原来的头结点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
            	// 获取失败将当前节点取消
                if (failed)
                    cancelAcquire(node);
            }
    }
    

    整体来看这是一个这又是一个自旋的过程(for(;;)),代码首先获取当前节点的先驱节点,如果先驱节点是头结点的并且成功获得同步状态的时候(if (p == head && tryAcquire(arg))),表示当前节点所指向的线程能够获取锁,方法执行结束。反之,获取锁失败进入等待状态,先不断自旋将前驱节点状态置为SIGINAL
    而后调用LockSupport.park()方法将当前线程阻塞。整体示意图为下图:

    获取锁成功并且节点出队的逻辑

    // 当前节点前驱为头结点并且再次获取同步状态成功
    if (p == head && tryAcquire(arg)) {
    	//队列头结点引用指向当前节点 
    	setHead(node); 
    	//释放前驱节点 
    	p.next = null; // help GC
    	failed = false;
    	return interrupted;
    }
    
    private void setHead(Node node) {
    	head = node;
    	node.thread = null;
    	node.prev = null;
    }
    

    将当前节点通过setHead()方法设置为队列的头结点,然后将之前的头结点的next域设置为null并且pre域也为null,即与队列断开,无任何引用方便GC时能够将内存进行回收。
    在这里插入图片描述

    那么当节点在同步队列中获取锁失败的时候会调用shouldParkAfterFailedAcquire()方法。此方法主要逻辑
    是使用CAS将前驱节点状态由INITIAL置为SIGNAL,表示需要将当前节点阻塞。如果CAS失败,说明 shouldParkAfterFailedAcquire()方法返回false,然后会在acquireQueued()方法中的for (;;)死循环中不断自旋直到前驱节点状态置为SIGANL为止,返回true时才会执行方法 parkAndCheckInterrupt()方法。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获取前驱节点的节点状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
                // 前驱节点已被取消
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                // 不断重试直到找到一个前驱节点状态不为取消状态
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                // 前驱节点状态不是取消状态时,将前驱节点状态置为-1,
                // 表示后继节点应该处于等待状态
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    parkAndCheckInterrupt()方法的源码为

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    该方法的关键是会调用LookSupport.park()方法,该方法是用来阻塞当前线程的。

    整体上看,acquireQueued()在自旋过程中主要完成了两件事情:

    1. 如果当前节点的前驱节点是头节点,并且能够获得同步状态的话,当前线程能够获得锁该方法执行结束退出。
    2. 获取锁失败的话,先将节点状态设置成SIGNAL,然后调用LookSupport.park方法使得当前线程阻塞。

    独占式锁的获取过程也就是acquire()方法的执行流程

    独占锁的释放(release()方法)

    独占锁的释放调用unlock方法,而该方法实际调用了AQS的release方法

    public void unlock() {    
    	sync.release(1); 
    }
    
    public final boolean release(int arg) {    
    	if (tryRelease(arg)) {        
    		Node h = head;        
    		if (h != null && h.waitStatus != 0)            
    			unparkSuccessor(h);        
    			return true;    
    		}    
    		return false; 
    	}
    }	
    

    这段代码逻辑就比较容易理解了,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。

    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
             // 头结点的后继节点 
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
            	 // 后继节点不为null时唤醒 
                LockSupport.unpark(s.thread);
        }
    

    首先获取头节点的后继节点,当后继节点不为空的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程

    独占式锁获取与释放总结

    • 线程获取锁失败,将线程调用addWaiter()封装成Node进行入队操作。addWaiter()中enq()方法完成对同步队列的头节点初始化以及CAS尾插失败后的重试处理。
    • 入队之后排队获取锁的核心方法acquireQueued(),节点排队获取锁是一个自旋过程。当且仅当当前节点的前驱节点为头节点并且获取同步状态时,节点出队并且该节点引用的线程获取到锁。否则不满足条件时会不断自旋将前驱节点的状态置为SIGNAL后调用LockSupport.part()将当前线程阻塞。
    • 释放锁时会唤醒后继结点(后继结点不为null)。

    独占锁特性

    可中断式获取锁

    可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQS的acquireInterruptibly()方法。

       /**
         * Acquires in exclusive mode, aborting if interrupted.
         * Implemented by first checking interrupt status, then invoking
         * at least once {@link #tryAcquire}, returning on
         * success.  Otherwise the thread is queued, possibly repeatedly
         * blocking and unblocking, invoking {@link #tryAcquire}
         * until success or the thread is interrupted.  This method can be
         * used to implement method {@link Lock#lockInterruptibly}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @throws InterruptedException if the current thread is interrupted
         */
    public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
            	    // 线程获取锁失败
                doAcquireInterruptibly(arg);
        }
    

    在获取同步状态失败后就会调用doAcquireInterruptibly()方法

        /**
         * Acquires in exclusive interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    获取锁响应中断原理与acquire()几乎一样,唯一区别在于当parkAndCheckInterrupt()返回true时表示线程阻塞时被中断,抛出中断异常后线程退出。

    超时等待式获取锁(tryAcquireNanos()方法)

    通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:

    1. 在超时时间内,当前线程成功获取了锁;
    2. 当前线程在超时时间内被中断;
    3. 超时时间结束,仍未获得锁返回false。

    该方法会调用AQS的方法tryAcquireNanos(),源码为:

        /**
         * Attempts to acquire in exclusive mode, aborting if interrupted,
         * and failing if the given timeout elapses.  Implemented by first
         * checking interrupt status, then invoking at least once {@link
         * #tryAcquire}, returning on success.  Otherwise, the thread is
         * queued, possibly repeatedly blocking and unblocking, invoking
         * {@link #tryAcquire} until success or the thread is interrupted
         * or the timeout elapses.  This method can be used to implement
         * method {@link Lock#tryLock(long, TimeUnit)}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         * @param nanosTimeout the maximum number of nanoseconds to wait
         * @return {@code true} if acquired; {@code false} if timed out
         * @throws InterruptedException if the current thread is interrupted
         */
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
            	 // 实现超时等待的效果 
                doAcquireNanos(arg, nanosTimeout);
        }
    

    最终是靠doAcquireNanos()方法实现超时等待的效果

        /**
         * Acquires in exclusive timed mode.
         *
         * @param arg the acquire argument
         * @param nanosTimeout max wait time
         * @return {@code true} if acquired
         */
        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    超时获取锁逻辑与可中断获取锁基本一致,获取锁失败后,增加了一个时间处理。如果当前时间超过截止时间,线程不在等待,直接退出,返回false。否则将线程阻塞置为等待状态排队获取锁。

    展开全文
  • AQS详解(面试)

    万次阅读 多人点赞 2019-04-18 10:19:02
    AQS原理 AQS:AbstractQuenedSynchronizer抽象的队列式同步器。 AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被...

    AQS原理
    AQS:AbstractQuenedSynchronizer抽象的队列式同步器。是除了java自带的synchronized关键字之外的锁机制。
    AQS的全称为(AbstractQueuedSynchronizer),这个类在java.util.concurrent.locks包

    AQS的核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并将共享资源设置为锁定状态,如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
    CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,虚拟的双向队列即不存在队列实例,仅存在节点之间的关联关系。
    AQS是将每一条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),来实现锁的分配。

    用大白话来说,AQS就是基于CLH队列,用volatile修饰共享变量state,线程通过CAS去改变状态符,成功则获取锁成功,失败则进入等待队列,等待被唤醒。

    **注意:AQS是自旋锁:**在等待唤醒的时候,经常会使用自旋(while(!cas()))的方式,不停地尝试获取锁,直到被其他线程获取成功

    实现了AQS的锁有:自旋锁、互斥锁、读锁写锁、条件产量、信号量、栅栏都是AQS的衍生物
    AQS实现的具体方式如下:
    在这里插入图片描述
    如图示,AQS维护了一个volatile int state和一个FIFO线程等待队列,多线程争用资源被阻塞的时候就会进入这个队列。state就是共享资源,其访问方式有如下三种:
    getState();setState();compareAndSetState();

    AQS 定义了两种资源共享方式:
    1.Exclusive:独占,只有一个线程能执行,如ReentrantLock
    2.Share:共享,多个线程可以同时执行,如Semaphore、CountDownLatch、ReadWriteLock,CyclicBarrier

    不同的自定义的同步器争用共享资源的方式也不同。

    AQS底层使用了模板方法模式

    同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式是这样(模板方法模式很经典的一个应用):

    1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
    2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
      这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个运用。

    自定义同步器在实现的时候只需要实现共享资源state的获取和释放方式即可,至于具体线程等待队列的维护,AQS已经在顶层实现好了。自定义同步器实现的时候主要实现下面几种方法:
    isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    ReentrantLock为例,(可重入独占式锁):state初始化为0,表示未锁定状态,A线程lock()时,会调用tryAcquire()独占锁并将state+1.之后其他线程再想tryAcquire的时候就会失败,直到A线程unlock()到state=0为止,其他线程才有机会获取该锁。A释放锁之前,自己也是可以重复获取此锁(state累加),这就是可重入的概念。
    注意:获取多少次锁就要释放多少次锁,保证state是能回到零态的。

    以CountDownLatch为例,任务分N个子线程去执行,state就初始化 为N,N个线程并行执行,每个线程执行完之后countDown()一次,state就会CAS减一。当N子线程全部执行完毕,state=0,会unpark()主调用线程,主调用线程就会从await()函数返回,继续之后的动作。

    一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
     在acquire() acquireShared()两种方式下,线程在等待队列中都是忽略中断的,acquireInterruptibly()/acquireSharedInterruptibly()是支持响应中断的。

    AQS的简单应用
    Mutex:不可重入互斥锁,锁资源(state)只有两种状态:0:未被锁定;1:锁定。

    class Mutex implements Lock, java.io.Serializable {
        // 自定义同步器
        private static class Sync extends AbstractQueuedSynchronizer {
            // 判断是否锁定状态
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            // 尝试获取资源,立即返回。成功则返回true,否则false。
            public boolean tryAcquire(int acquires) {
                assert acquires == 1; // 这里限定只能为1个量
                if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                    setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                    return true;
                }
                return false;
            }
    
            // 尝试释放资源,立即返回。成功则为true,否则false。
            protected boolean tryRelease(int releases) {
                assert releases == 1; // 限定为1个量
                if (getState() == 0)//既然来释放,那肯定就是已占有状态了。只是为了保险,多层判断!
                    throw new IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);//释放资源,放弃占有状态
                return true;
            }
        }
    
        // 真正同步类的实现都依赖继承于AQS的自定义同步器!
        private final Sync sync = new Sync();
    
        //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
        public void lock() {
            sync.acquire(1);
        }
    
        //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        //unlock<-->release。两者语文一样:释放资源。
        public void unlock() {
            sync.release(1);
        }
    
        //锁是否占有状态
        public boolean isLocked() {
            return sync.isHeldExclusively();
        }
    }
    

    同步类在实现时一般都将自定义同步器(sync)定义为内部类,供自己使用;而同步类自己(Mutex)则实现某个接口,对外服务。
    部分转自:http://www.cnblogs.com/waterystone/p/4920797.html

    展开全文
  • 什么是AQS及其原理

    2020-10-24 16:39:55
    1、AQS简介 AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向链表。 ...

    GitHub:https://github.com/JDawnF

    1、AQS简介

    AQS全名:AbstractQueuedSynchronizer,是并发容器J.U.C(java.util.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向链表

    Sync queue:同步队列,是一个双向链表。包括head节点和tail节点。head节点主要用作后续的调度。 Condition queue:非必须,单向链表。当程序中存在cindition的时候才会存在此列表。

    AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

    AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

    状态信息通过protected类型的getState,setState,compareAndSetState进行操作

    //返回同步状态的当前值
    protected final int getState() {  
            return state;
    }
     // 设置同步状态的值
    protected final void setState(int newState) { 
            state = newState;
    }
    //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
    protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    2、AQS设计思想

    • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。

    • 利用int类型标识状态。在AQS类中有一个叫做state的成员变量

        /**
         * The synchronization state.
         */
        private volatile int state;
    • 基于AQS有一个同步组件,叫做ReentrantLock。在这个组件里,stste表示获取锁的线程数,假如state=0,表示还没有线程获取锁,1表示有线程获取了锁。大于1表示重入锁的数量。

    • 继承:子类通过继承并通过实现它的方法管理其状态(acquire和release方法操纵状态)。

    • 可以同时实现排它锁和共享锁模式(独占、共享),站在一个使用者的角度,AQS的功能主要分为两类:独占和共享。它的所有子类中,要么实现并使用了它的独占功能的api,要么使用了共享锁的功能,而不会同时使用两套api,即便是最有名的子类ReentrantReadWriteLock也是通过两个内部类读锁和写锁分别实现了两套api来实现的。

    3、AQS的大致实现思路

    AQS内部维护了一个CLH队列来管理锁。线程会首先尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到同步队列sync queue里。 接着会不断的循环尝试获取锁,条件是当前节点为head的直接后继才会尝试。如果失败就会阻塞自己直到自己被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。

    CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node)来实现锁的分配。

     

     

    推荐两篇文章:

    参照:https://blog.csdn.net/jesonjoke/article/details/80054133

    https://github.com/JDawnF/JavaGuide/blob/master/Java%E7%9B%B8%E5%85%B3/Multithread/AQS.md

    觉得有收获的,可以来一波赞赏!

    展开全文
  • 很多东西都有点忘记了,不过回答总体是没大问题的
  • AQS

    2019-04-25 19:28:48
    转载!... 一、概述  谈到并发,不得不谈ReentrantLock;而谈到ReentrantLock,不得不谈AbstractQueuedSynchronizer... 类如其名,抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步...
  • AQS的理解

    2019-04-22 10:55:38
    AQSAQS的全称:AbstractQueuedSynchronizer,抽象队列同步器 java并发包下很多API都是基于AQS来实现的加锁和释放锁等功能的,AQS是java并发包的基础类。ReentrantLock、ReentrantReadWriteLock底层都是基于AQS来...
  • AQS简介

    千次阅读 2016-03-04 09:37:37
    1、引言 JAVA内置的锁(使用同步方法和同步块)一直以来备受关注,其优势是可以花最小的空间开销创建锁(因为每个JAVA对象或者类都可以作为锁使用)和最少的时间开销获得锁(单线程可以在最短时间内获得锁)。...
  • Java中的AQS(一)AQS简介

    千次阅读 2018-05-09 11:50:46
    一、AQS的设计 AQS的设计是基于模板方法模式的,也就是说,使用者需要继承AQS并重写指定的方法,随后将AQS组合在自定义同步组件的实现中,并调用AQS提供的模板方法,而这些模板方法将会调用使用...
  • AQS同步队列与条件队列的关系

    千次阅读 2017-11-21 21:16:33
    本文图片来源: http://javadoop.com/post/AbstractQueuedSynchronizer-2 http://blog.csdn.net/tb3039450/article/details/69056169一、关系同步队列节点来源一:同步队列依赖一个双向链表来完成同步状态...同步队列
  • AQS同步队列中的节点状态

    千次阅读 2020-07-04 10:39:45
    状态 说明 SIGNAL 值为-1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,那么就会通知后继节点,让后继节点的线程能够运行 CONDITION 值为-2,节点在等待队列中,节点线程等待...
  • 后续总结一篇自己的理解 参考文献与书籍: 1.https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
  • AQS-了解节点自旋获取同步状态

    千次阅读 2020-07-04 18:00:14
  • Java并发-AQS及各种Lock锁的原理

    万次阅读 多人点赞 2019-08-23 13:22:20
    什么是AQS AQS是AbustactQueuedSynchronizer的简称,它是一个Java提高的底层同步工具类,用一个int类型的变量表示同步状态,并提供了一系列的CAS操作来管理这个同步状态。AQS的主要作用是为Java中的并发同步组件...
  • 这篇文章,我们来聊聊面试的时候比较有杀伤力的一个问题:聊聊你对AQS的理解?   之前有同学反馈,去互联网公司面试,面试官聊到并发时就问到了这个问题。当时那位同学内心估计受到了一万点伤害。。。   ...
  • 谈谈对AQS的一些理解

    千次阅读 2018-09-12 16:22:27
    AQS的概念 AQS全称AbstractQueuedSynchronizer,是java并发包中的核心类,诸如ReentrantLock,CountDownLatch等工具内部都使用了AQS去维护锁的获取与释放: AQS内部结构 首先我们可以找到这样一张图: 它...
  • AQS深度剖析

    千次阅读 多人点赞 2019-12-03 17:36:13
    鉴于CSDN对**版权保护的不作为**以及落后的运营手段,本博客将于近期关闭,并清空全部文章。 原有文章将会经过再次的校对、整理,转移至本人在**简书**的[博客空间](https://www.jianshu.com/u/3ec23ef9a408... ...
  • 同步器AQS中的同步队列与等待队列

    千次阅读 2019-03-30 00:01:55
    在单纯地使用锁,比如ReentrantLock的时候,这个锁组件内部有一个继承同步器AQS的类,实现了其抽象方法,加锁、释放锁也只是涉及到AQS中的同步队列而已,那么等待队列又是什么呢? 当使用Condition的时候,等待队列...
  • 面试题:了解Java的AQS吗?

    千次阅读 2019-06-16 01:45:46
    前言java.util.concurrent包(之后简称JUC包)中,提供了大量的同步与并发的工具类,是多线程编程的“利器”。其中locks包下,包含了多种锁,如Reen...
  • aqs专题——起源 上一章,我们学了java的关键字synchronized的应用与原理;我们知道在虚拟机底层,自jdk1.6以后做了很多优化工作;但是可针对于复杂的业务场景,最佳的优化策略可能会发生变化,要是我们利用java的...
1 2 3 4 5 ... 20
收藏数 22,820
精华内容 9,128
关键字:

aqs