精华内容
下载资源
问答
  • 抽象队列同步器AQS

    2019-09-17 18:15:57
    AQS抽象队列同步器 AQS简介 AQS源码简介 Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This ...

    AQS简介

    • 抽象队列同步器AbstractQueuedSynchronizer (简称AQS)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示同步状态,通过内置的FIFO(first-in-first-out)同步队列来控制获取共享资源的线程
    • AbstractQueuedSynchronizer 使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理状态,管理的方式是通过类似acquire和release的方式来操纵状态。
    • AbstractQueuedSynchronizer 类被设计为大多数同步组件的基类,这些同步组件都依赖于单个原子值(int)来控制同步状态,子类必须要定义获取同步与释放状态的方法
    private volatile int state;//共享变量,使用volatile修饰保证线程可见性
    
    • AQS子类应该为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用
    • 同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态。同步器也是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。
      • Exclusive独占锁:只有一个线程能执行,根据锁的获取机制,又分为“公平锁”和“非公平锁”。如ReentrantLock、ReentrantReadWriteLock.WriteLock
        • 公平锁:等待队列中按照FIFO的原则获得锁,等待时间越长的线程越先获得锁。
        • 非公平锁:线程获取锁的时候,无需等待队列直接获取锁。
      • Share共享:多个线程可同时执行,如ReentrantReadWriteLock.ReadLock、CyclicBarrier、CountDownLatch、Semaphore

    AQS原理图

    在这里插入图片描述
    AQS是个双端双向链表。当线程获取资源失败(比如tryAcquire时试图设置state状态失败),会被构造成一个结点加入CLH队列中,同时当前线程会被阻塞在队列中(通过LockSupport.park实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。

    AQS类的方法

    • 在AQS中提供了三种方法
      • getState() :获取当前的同步状态
      • setState(int newState) :设置当前同步状态
      • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能保证状态设置的原子性。如果当前状态值等于预期值,则以原子方式将同步状态设置为给定的更新值。此操作具有 volatile 读和写的内存语义

    自定义同步器实现的方法(子类中可以重写的方法)

    • boolean isHeldExclusively():该线程是否正在独占资源。
    • boolean tryAcquire(int):独占方式。通过CAS操作设置同步状态,尝试获取资源,成功则返回true,失败则返回false。
    • boolean tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
    • int tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    • boolean tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    获取同步状态与释放同步状态方法

    • 独占式获取和释放同步状态
      • void acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则返回,否则进入同步队列等待,该方法会调用tryAcquire(int arg)方法。
      • void acquireInterruptibly(int arg):与 void acquire(int arg)基本逻辑相同,但是该方法响应中断,如果当前没有获取到同步状态,那么就会进入等待队列,如果当前线程被中断(Thread().interrupt()),那么该方法将会抛出InterruptedException并返回
      • boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly(int arg)的基础上,增加了超时限制,如果当前线程没有获取到同步状态,那么将返回fase,反之返回true。
      • boolean release(int arg) :独占式的释放同步状态
    • 共享式获取和释放同步状态
      • void acquireShared(int arg):共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。
      • void acquireSharedInterruptibly(int arg):在acquireShared(int arg)的基本逻辑相同,增加了响应中断。
      • boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly的基础上,增加了超时限制。
      • boolean releaseShared(int arg):共享式的释放同步状态

    AQS具体实现及内部原理

    • FIFO队列
      • AQS中主要通过一个FIFO(first-in-first-out)来控制线程的同步。
      • 在实际程序中,AQS会将获取同步状态的线程构造成一个Node节点,并将该节点加入到队列中。如果该线程获取同步状态失败会阻塞该线程,当同步状态释放时,会把头节点中的线程唤醒,使其尝试获取同步状态。
    • Node节点结构
    static final class Node {
         static final Node SHARED = new Node(); 
         static final Node EXCLUSIVE = null;
         static final int CANCELLED =  1; //被中断或获取同步状态超时的线程将会被置为当前状态,且该状态下的线程不会再阻塞。
         static final int SIGNAL    = -1; //当前节点的线程如果释放了或取消了同步状态,将会将当前节点的状态标志位SINGAL,用于通知当前节点的下一节点,准备获取同步状态。
         static final int CONDITION = -2; //当前节点在Condition中的等待队列上,(关于Condition会在下篇文章进行介绍),其他线程调用了Condition的singal()方法后,该节点会从等待队列转移到AQS的同步队列中,等待获取同步锁。
         static final int PROPAGATE = -3; //与共享式获取同步状态有关,该状态标识的节点对应线程处于可运行的状态。
         // 0:None of the above,新结点会处于这种状态。
         volatile int waitStatus;
         volatile Node prev;
         volatile Node next;
         volatile Thread thread;
         Node nextWaiter;
    }
    
    * Node结点类是AQS的一个静态内部类,是等待队列中的结点类。这个等待队列是一个"CLH"锁队列的变体。
    	 * CLH锁即Craig, Landin, and Hagersten (CLH) locks,CLH锁是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。
    

    AQS同步队列具体实现结构

    在这里插入图片描述

    • 在AQS中的同步队列中,分别有两个指针(你也可以叫做对象的引用),一个head指针指向队列中的头节点,一个tail指针指向队列中的尾节点。
    • AQS添加尾节点
    • AQS添加头节点
    展开全文
  • 抽象队列同步器-AQS

    2021-05-20 13:10:41
    抽象队列同步器——AQS 介绍 AQS(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组建的基础框架,他使用了一个int成员变量(waitStatus)表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作...

    抽象队列同步器——AQS

    介绍

    AQS(AbstractQueuedSynchronizer),是用来构建锁或者其他同步组建的基础框架,他使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

    子类通过继承AQS这个同步器并实现它的抽象方法来管理同步状态,它既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,也是ReentrantLock,CountDownLatch等同步工具实现的基础。

    CLH lock queue

    AQS其实是一个FIFO队列,队列中的每个节点(线程)需要等前面的节点(线程)释放锁。

    结构

    如图所示,AbstractQueuedSynchronizer实现了序列化接口,继承了AbstractOwnableSynchronizer,它是一个可能被线程独占的同步器。
    在这里插入图片描述

    AbstractOwnableSynchronizer

    该抽象类所有源码如下,内容不多,可以看到主要做了一件事,那就是将该同步器标记(通过属性exclusiveOwnerThread)为一个线程独占的同步器,并提供设置和获取当前线程拥有者的set和get方法。

    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
        private static final long serialVersionUID = 3737899427754241961L;
    
        protected AbstractOwnableSynchronizer() { }
    	// 记录该同步器的拥有者
        private transient Thread exclusiveOwnerThread;
    	// 设置拥有同步器的线程
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    	// 获取当前拥有者
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    AbstractQueuedSynchronizer

    Node(内部类)

    我们先来看一下AQS

    static final class Node {
        // 标记Node为共享式
        static final Node SHARED = new Node();
        // 标记Node为独占式
        static final Node EXCLUSIVE = null;
        // 表示线程为取消状态
        static final int CANCELLED =  1;
        // 标识:代表后继结点的线程处于等待状态
        static final int SIGNAL = -1;
        // 节点在等待队列中
        static final int CONDITION = -2;
       	// 表示下一次共享式同步状态获取将会无条件地被传播下去
        static final int PROPAGATE = -3;
    	// 当前等待状态(表示同步状态)
        volatile int waitStatus;
    	// 前一节点
        volatile Node prev;
    	// 下一节点
        volatile Node next;
    	// 拥有当前节点的线程
        volatile Thread thread;
    	// 下一个等待者(独占模式为null)
        Node nextWaiter;
    	// 判断当前节点是否为共享模式
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    	// 获取线程前一节点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    	// 无参构造
        Node() {}
    	// 有参构造方法1,
        Node(Thread thread, Node mode) {   
            this.nextWaiter = mode;
            this.thread = thread;
        }
    	// 有参构造方法2
        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    Fields

    接下来看一下AQS中的属性:

    // 记录对列的头结点
    private transient volatile Node head;
    // 队列的尾节点
    private transient volatile Node tail;
    // 记录同步状态
    private volatile int state;
    
    private static final Unsafe unsafe = Unsafe.getUnsafe();// 获取Unsafe对象
    private static final long stateOffset;// 以下为记录属性偏移量
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;
    

    我们重点看一下以下这四个:

    • head:用于保存队列的头结点
    • tail:保存队列的尾节点
    • state:记录节点的同步状态

    静态代码块

    AQS中的静态代码块主要用于初始化AQS中的state、head、tail和内部类Node中waitStatus、next的偏移量,便于后面使用Unsafe中的CAS方法进行操作:

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("waitStatus"));// 内部类waitStatus
            nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));// 内部类next
    
        } catch (Exception ex) { throw new Error(ex); }
    }
    

    Methods

    acquire(int) :获取锁(独占)

    acquire()方法用于获取锁,且获取锁的方式为独占。接下来看方法流程:

    1. 调用tryAcquire() 方法尝试获取资源,从下面的tryAcquire() 的介绍我们可以看到该方法并未实现,而是由子类去根据需求具体实现,返回true则代表成功获得了资源。
    2. 若未获取到资源,则由addWaiter() 方法将一个标记为独占(nextWaiter=Node.EXCLUSIVE也就是空)的Node添加到等待队列的尾部
    3. 使用acquireQueued()不断尝试获取资源,期间可能会进入等待(park)状态,如果等待过程中被中断过,则返回true
    4. 如果线程在获取资源的过程中被中断过,则在获取到资源后(不是直接中断)才进行自我中断(selfInterrupt方法)。
    public final void acquire(int arg) {
        // 获取到资源直接返回
        if (!tryAcquire(arg) && 
            //addWaiter(Node.EXCLUSIVE) 等待者为空(独占式)
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // Node.EXCLUSIVE
    static final Node EXCLUSIVE = null;
    
    tryAcquire(int):尝试获取资源

    该方式使用了模版方法,具体的实现方式由使用它的类来决定。如ReentrantLock中的实现方式为非公平锁:

    // Aqs中的tryAcquire()
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    // ReentrantLock中的实现
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    addWaiter(Node):添加等待节点

    addWaiter()方法用于将当前线程插入等待队列的队尾,并返回当前线程所在的节点node。在acquire() 方法中,可以看到调用该方法入参为Node.EXCLUSIVE,以为独占队列,

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;// 尾节点
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) { // 将node设置为尾节点
                pred.next = node; 
                return node;
            }
        }
        enq(node);// 将node插入到队列中
        return node;// 返回当前线程所在的节点
    }
    
    enq(Node) :将节点插入队列

    AQS中的enq() 方法用于将一个Node节点插入,这里可以看到入参Node被加上了final关键字,防止在方法中被修改,我们这里把入参node视作当前节点。在enq()方法中,首先进入循环,然后判断队列的尾节点是否为null(尾节点为null则证明未初始化),如果为null,则优先初始化,在初始化方法compareAndSetHead()中,使用了Unsafe中的compareAndSwapObject()将头结点比较并交换为新建节点new Node()。设置完头之后,将头结点也赋值给尾节点。然后进入下一次循环,在该循环中将入参node的前置节点设置为尾节点(在尾节点后添加节点),之后使用compareAndSetTail() 方法比较并交换尾节点为当前节点,成功后将原tail的下一节点替换为当前节点node,最后返回被替换掉的尾节点t。

    private Node enq(final Node node) {// Node为final不可被修改
        for (;;) {// 循环
            Node t = tail;
            if (t == null) { 
                if (compareAndSetHead(new Node()))// 设置头节点
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {// 设置尾节点
                    t.next = node;
                    return t;
                }
            }
        }
    }
    // 比较并交换头节点   null:预期值
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    // 比较并交换尾节点  
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    
    acquireQueued(Node,int):等待获取资源

    acquireQueued() 用于等待获取资源,我们来一起看一下该方法的步骤:

    1. 首先进入自旋状态,判断前一节点是否为头结点,如果是头结点则尝试获取资源
    2. 若成功获取到资源则将当前节点设置为头结点,并将原头节点置为可被GC的状态,之后返回线程中断状态
    3. 若不满足上述条件,则判断前置节点满足线程等待状态后进入等待状态
    4. 若后续被唤醒,则继续自旋
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 自旋
                final Node p = node.predecessor();// 获取前一个节点
                if (p == head && tryAcquire(arg)) {// 若前置节点为头结点则尝试获取资源
                    // 成功获取到资源
                    setHead(node);// 将node设置为头结点
                    p.next = null;// 这里将p.next也设置为null方便GC回收
                    failed = false;// 成功获取到资源
                    return interrupted;// return false
                }
                /*
                 * 1,检查节点状态(调用shouldParkAfterFailedAcquire()方法)
                 * 2,之后将该线程设置为等待唤醒状态(调用parkAndCheckInterrupt()方法)
                 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed) // 获取资源失败
                cancelAcquire(node);// 取消节点的等待状态
        }
    }
    
    // 获取前一个节点,若前一个节点为null,则报空指针异常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    // 设置头结点为node
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    shouldParkAfterFailedAcquire(Node,Node):

    shouldParkAfterFailedAcquire()方法用于检查前置节点的状态,同时根据前置节点状态,接下来我们来看一下该方法的步骤:

    1. 获取当前节点前置节点的等待状态并判断;
    2. 如果等待状态为Node.SIGNAL即-1时,代表当前节点可以进入等待状态,返回true;
    3. 若等待状态大于0,即Node.CANCELLED,代表所在线程为取消状态。这时,进入do/while循环将前置节点的前一个节点设置为当前节点node的前置节点(即去除前置节点pred让GC回收),直至前一节点的状态满足pred.waitStatus <= 0;
    4. 若不满足等待状态大于0,且不等于Node.SIGNAL,则使用CAS将前置节点的等待状态设置为Node.SIGNAL
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;// 获取前置节点的 waitStatus
        if (ws == Node.SIGNAL)
            // 前置节点状态为Node.SIGNAL,返回true,代表后续节点可以休息(park)
            return true;
        if (ws > 0) {
            // ws>0代表前置线程已经取消等待
            do {
                /*
                 * 将前置节点的前一个节点设置为当前节点node的前置节点
                 * 相当于从队列中除去前置节点pred, 并直到找到下一个状态
                 * 正常的节点
                 */
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;//将往前数第一个状态正常的节点的下一节点设置为node
        } else {
            // 通过CAS将前置节点的waitStstus设置为Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    parkAndCheckInterrupt():线程进入等待状态

    parkAndCheckInterrupt()方法主要用于使当前线程处于park()状态并返回线程的中断状态。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);// 是当前线程进入等待状态
        return Thread.interrupted();// 返回线程是否中断过标识
    }
    
    总结

    我们来梳理一下整个获取锁的流程:

    1. 尝试获取资源(tryAcquire),如果未获取到则在对尾添加等待节点(addWaiter)
    2. 等待节点不断尝试获取锁,期间可能会进入等待状态(LockSupport.park()),获取成功则将当前线程节点设置为头结点(setHead)
    3. 若线程在获取资源期间有过中断操作,则在获取到资源后进行自我中断

    总结流程图如下:

    在这里插入图片描述

    release(int):释放锁

    AQS中的release()方法主要用于获取资源后的释放操作,步骤如下:

    1. 使用tryRelease()方法释放资源,这里可以看到AQS中未做具体实现,因此需要子类实现该方法时根据情况返回是否释放成功的标志
    2. 判断头结点
    public final boolean release(int arg) {
        if (tryRelease(arg)) {// 释放成功
            Node h = head;// 头结点
            if (h != null && h.waitStatus != 0)// 判断头节点是否存在
                unparkSuccessor(h);// 唤醒下一个等待的线程
            return true;
        }
        return false;
    }
    
    tryRelease(int):尝试释放锁

    tryRelease()和tryAcquire()类似,也是AQS为我们提供了一个方法,后续的具体实现方式由子类完成。

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    unparkSuccessor(Node):唤醒下一个等待者

    unparkSuccessor(Node)方法主要用于唤醒Node后第一个处于等待状态的线程。我们回看一下acquireQueued(),可以看出,在此处唤醒后,被唤醒的节点将开始下一次自旋获取资源。自此,若即节点被唤醒,且从acquireQueued()得到了资源,将会返回acquire()方法,结束整个获取资源的过程

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;// 获取等待状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//CAS操作更新waitStatus=0
    
        /*
         * 获取需要唤醒的节点
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {// 无效节点
            s = null;
            /*
             * 在这个循环中从后往前遍历,目的是找到最前面的一个有效节点
             */
            for (Node t = tail; t != null && t != node; t = t.prev)// 从尾节点向前遍历
                if (t.waitStatus <= 0)// 有效节点
                    s = t;
        }
        if (s != null)
            /*
             * 唤醒该等待节点,继续自旋
             * 若s在被唤醒之前,抛出了异常,就永远无法被unpark了
             * 而在这个过程中只有tryRelease()能引发异常
             */
            LockSupport.unpark(s.thread);
    }
    

    至此,AQS中以独占方式获取资源和释放资源的方法就告一段落了,接下来我们来一起看一下以共享模式下获取资源的过程。

    acquireShared(int):获取锁(共享)

    该方法是共享模式下获取资源的入口,我们来看一下方法的步骤:

    1. 尝试获取资源,成功则直接返回
    2. 获取失败通过doAcquireShared()进入等待队列,直至获取到资源
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) // 获取资源失败
            doAcquireShared(arg);
    }
    
    tryAcquireShared(int):获取资源

    这里和tryAcquire()一样,需要子类去做具体实现,但是返回值有所不同,具体定义如下:

    • 返回值为负值代表获取失败
    • 返回值为0代表获取成功,但没有资源供后续节点获取
    • 返回值为正值则代表获取成功,且后续节点也能获取成功
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    doAcquireShared(int):排队获取资源

    此方法用于将当前线程加入等待队列尾部,并等待被唤醒,直至获取到资源。

    private void doAcquireShared(int arg) {
        // 添加等待者(共享模式Node.SHARED)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {// 自旋
                final Node p = node.predecessor();// 获取前置节点
                if (p == head) {
                    int r = tryAcquireShared(arg); // 如果是第二个节点尝试获取
                    if (r >= 0) {// 获取成功
                        setHeadAndPropagate(node, r);// 设置头并修改状态
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();// 自我中断
                        failed = false;
                        return;
                    }
                }
                // 检查节点状态并决定是否休息(park)
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);// 取消节点的等待状态
        }
    }
    
    setHeadAndPropagate(Node, int):

    设置队列头的同时唤醒后继结点

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // 记录旧的头结点
        setHead(node);// 将node设置为头结点
        /*
         * propagate > 0代表有资源剩余
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;// 获取下一节点
            if (s == null || s.isShared())// 下一节点为空且nextWaiter为共享模式
                doReleaseShared();
        }
    }
    
    doReleaseShared():

    此方法用于唤醒后续节点或修改头结点状态为Node.PROPAGATE。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;// 头结点
            if (h != null && h != tail) {// 不为空且非尾节
                int ws = h.waitStatus;// 获取状态
                if (ws == Node.SIGNAL) { 
                    // 修改状态直至修改成功
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);// 唤醒下一个等待者
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))// 修改状态
                    continue;                // loop on failed CAS
            }
            if (h == head)// 返回
                break;
        }
    }
    

    releaseShard(int):共享模式下释放锁

    该方法是共享模式下释放资源的入口,由于在上个方法已经介绍过doReleaseShared(),因此这里不做介绍只是把源码粘贴出来:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {// 释放
            doReleaseShared();
            return true;
        }
        return false;
    }
    // 模版方法
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

    总结

    至此我们看完了共享模式和独占模式下释放锁和获取锁的整个过程,也发现了它们的一些不同之处

    1. 独占式只有获取到资源的线程释放之后,才会通知后续线程去执行获取资源的过程
    2. 而共享模式获取的过程中,根据剩余资源的多少,决定其他等待获取资源的线程能否一起获取资源

    但是也发现了他们的相同之处:

    • 在共享和独占整个获取锁的过程中,是忽略中断的

    在整个过程中我们可以发现,AQS替我们简化了锁的实现方式(我们只需要实现AQS提供的模板方法),屏蔽了同步状态管理,线程的排队,等待与唤醒等底层操作,很好地隔离了使用者和实现者所关注的区域

    在后续的学习中,会遇到一些底层使用AQS实现的并发工具,如底层依赖AQS的共享锁实现的CountDownLatch,后面我们会详细讨论:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;
    
        Sync(int count) {
            setState(count);
        }
    
        int getCount() {
            return getState();
        }
    	
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
    	
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    
    展开全文
  • 浅析抽象队列同步器(AQS) 目录 浅析抽象队列同步器(AQS) 什么是AQS AQS的原理 state状态 AQS的共享资源状态:独占式和共享式 添加锁和释放锁 什么是ReentrantLock 实现 如何使用 底层实现 除非我不想赢...

    浅析抽象队列同步器(AQS)

    目录

    浅析抽象队列同步器(AQS)

    什么是AQS

    AQS的原理

    state状态

    AQS的共享资源状态:独占式和共享式

    添加锁和释放锁

    什么是ReentrantLock

    实现

    如何使用

    底层实现


    除非我不想赢,否则没人能让我输。

    复习多线程并发包总结

     

    什么是AQS

    ​ AQS(AbstractQueuedSynchronizer)是一个抽象队列同步器,通过维护一个共享资源状态(volatile int state)和一个FIFO线程等待队列(底层是双向链表)来实现一个多线程访问共享资源的同步框架。许多同步类的实现都依赖于AQS,例如常用的ReentrantLock,CountDownLatch,Semaphore。

    关于ReentrantLock,CountDownLatch,Semaphore的用法可参考:

    [常用的三种同步类] https://blog.csdn.net/qq_42107430/article/details/103854488

    JDK1.8源码:

        /**
         * The synchronization state.
         */
        private volatile int state;
    ​
        static final class Node {
            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
            static final int CANCELLED =  1;
            static final int SIGNAL    = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
            volatile int waitStatus;
            volatile Node prev; //前驱节点
            volatile Node next; //后继节点
            volatile Thread thread;//当前线程
            Node nextWaiter; //存储在condition队列中的后继节点
            //是否为共享锁
            final boolean isShared() { 
                return nextWaiter == SHARED;
            }
    ​
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    ​
            Node() {    // Used to establish initial head or SHARED marker
            }
            //将线程构造成一个Node,添加到等待队列
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
            //这个方法会在Condition队列使用,后续单独写一篇文章分析condition
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }

     

    AQS的原理

    ​ AQS为每个共享资源变量设置一个共享资源锁,线程在需要访问共享资源时首先需要去获取共享资源缩。

    如果获取成功,便可以在当前线程中使用该共享资源,如果获取不成功,则将该线程放入线程等待队列,等待下一次资源调度。

     

    state状态

    ​ AQS维护了一个volatile int 类型的变量,用于表示当前的同步状态。Volatile虽然不能保证操作的原子性,但是可以保证操作的可见性。

    state的访问方式有以下三种,均是原子操作

    • getState()

    • setState()

    • compareAndSetState()

     /**
         * Returns the current value of synchronization state.
         * This operation has memory semantics of a {@code volatile} read.
         * @return current state value
         */
        protected final int getState() {
            return state;
        }
    ​
        /**
         * Sets the value of synchronization state.
         * This operation has memory semantics of a {@code volatile} write.
         * @param newState the new state value
         */
        protected final void setState(int newState) {
            state = newState;
        }
    ​
        /**
         * Atomically sets synchronization state to the given updated
         * value if the current state value equals the expected value.
         * This operation has memory semantics of a {@code volatile} read
         * and write.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that the actual
         *         value was not equal to the expected value.
         */
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }

     

    AQS的共享资源状态:独占式和共享式

    ​ AQS定义两种资源共享方式:独占式(Exclusive)和共享式(Share)。

    • 独占式:只有一个线程能执行,如ReentrantLock

    • 共享式:共享,多个线程可同时执行,如Semaphore/CountDownLatch

      AQS只是一个框架,定义了一个接口,具体的资源获取、释放都交由自定义同步器实现。

    不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

    • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

    以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。否则程序运行时会报错。

    再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续执行后续动作。

      一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

     

    添加锁和释放锁

    当出现锁竞争以及释放锁的时候,AQS同步队列中的节点会发生变化。

    添加节点

    添加节点时会涉及到两个变化

    • 新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己

    • 通过CAS讲tail重新指向新的尾部节点

    移除节点

    head节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点

    这个过程也是涉及到两个变化

    • 修改head节点指向下一个获得锁的节点

    • 新的获得锁的节点,将prev的指针指向null

     

    了解了AQS是什么,原理实现后,我们结合ReentrantLock来深入理解AQS是如何实现线程安全的。

    什么是ReentrantLock

    Java中除了使用关键字synchronized外,还可以使用ReentrantLock实现独占锁的功能。而且ReentrantLock相比synchronized而言功能更加丰富使用起来更为灵活,也更适合复杂的并发场景。

    实现

    ReentrantLock继承了Lock接口并实现了在接口中定义的方法。是一个可重入的独占锁。通过自定义抽象队列同步器来实现。

    Lock接口JDK源码

    void lock() // 如果锁可用就获得锁,如果锁不可用就阻塞直到锁释放
    void lockInterruptibly() // 和 lock()方法相似, 但阻塞的线程可中断,抛出 java.lang.InterruptedException异常
    boolean tryLock() // 非阻塞获取锁;尝试获取锁,如果成功返回true
    boolean tryLock(long timeout, TimeUnit timeUnit) //带有超时时间的获取锁方法
    void unlock() // 释放锁

    如何使用

    public class ReentrantLockDemo {
        private static int count=0;
        static Lock lock=new ReentrantLock();
        public static void inc(){
            lock.lock();
            try {
                Thread.sleep(1);
                count++;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                lock.unlock();
            }
        }

    这段代码主要做一件事,就是通过一个静态的incr()方法对共享变量count做连续递增,在没有加同步锁的情况下多线程访问这个方法一定会存在线程安全问题。所以用到了ReentrantLock来实现同步锁,并且在finally语句块中显式释放锁

    底层实现

    ReentrantLock.lock()

     public void lock() {
            sync.lock();
        }

    可以看到lock()方法底层调用的是sync的lock()方法。

    sync是一个静态内部类,通过继承AQS并实现了共享资源state的获取和释放的方式。

    Sync

        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
    ​
            /*定义一个抽象方法,由具体的子类去实现*/
            abstract void lock();
    ​
            /**
             * 实现非公平的tryAcquire获取资源
             */
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();//获取当前线程
                int c = getState();//获取同步状态
                if (c == 0) {//如果状态为0 CAS设置acquires
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);//如果设置成功,设置当前线程为排他所有者线程
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {//如果当前线程就是排他线程
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow 溢出报错
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);//重新设置state
                    return true;
                }
                return false;
            }
    ​
          /*
           * 尝试释放资源
           */
            protected final boolean tryRelease(int releases) {
              //计算要更新的同步状态
                int c = getState() - releases;
              //如果当前线程不是排他线程 报错
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
              //如果状态为0,设置独占排他线程为null,返回true
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
              //更新同步状态
                setState(c);
                return free;
            }
    ​
            protected final boolean isHeldExclusively() {
                // While we must in general read state before owner,
                // we don't need to do so to check if current thread is owner
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    ​
            /*获取当前线程*/
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
    ​
           /*获取当前state状态*/
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    ​
          /*判断是否被锁定*/
            final boolean isLocked() {
                return getState() != 0;
            }
    ​
        }

    Sync又有两个具体的实现,分别是NofairSync(非公平锁),FairSync(公平锁)。

    • 公平锁 表示所有线程严格按照FIFO来获取锁

    • 非公平锁 表示可以存在抢占锁的功能,也就是说不管当前队列上是否存在其他线程等待,新线程都有机会抢占锁

    NofairSync

    ​
        /**
         * Sync object for non-fair locks 非公平锁
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    ​
            /**
             * 执行锁定
             */
            final void lock() {
              //首先通过CAS设置state,如果成功,设置当前线程为排他线程(非公平的关键)
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else//如果失败再去尝试获得锁 关于acquire的具体讲解在下面
                    acquire(1);
            }
    ​
          /*调用父类sync的非公平tryAcquire获取资源*/
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }

    lock()方法简单解释一下

    • 由于这里是非公平锁,所以调用lock方法时,先去通过cas去抢占锁

    • 如果抢占锁成功,保存获得锁成功的当前线程

    • 抢占锁失败,调用acquire来走锁竞争逻辑

    compareAndSetState调用的是Unsafe类的compareAndSetState方法进行原子操作

    return unsafe.compareAndSetState(this, stateOffset, expect, update);

    UnsafeUnsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等

    public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

    这个是一个native方法, 第一个参数为需要改变的对象,第二个为偏移量(即之前求出来的headOffset的值),第三个参数为期待的值,第四个为更新后的值整个方法的作用是如果当前时刻的值等于预期值var4相等,则更新为新的期望值 var5,如果更新成功,则返回true,否则返回false;

    FairSync

    /**
         * Sync object for fair locks
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    ​
          //尝试获取锁
            final void lock() {
                acquire(1);
            }
    ​
            /**
             * 公平版本的tryAcquire 
             * Fair version of tryAcquire. 
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }

    acquire

    acquire是AQS中的方法,如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作,这里大家思考一下,acquire方法中的1的参数是用来做什么呢?如果没猜中,往前面回顾一下state这个概念

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    这个方法的主要逻辑是

    • 通过tryAcquire尝试获取独占锁,如果成功返回true,失败返回false

    • 如果tryAcquire失败,则会通过addWaiter方法将当前线程封装成Node添加到AQS队列尾部

    • acquireQueued,将Node作为参数,通过自旋去尝试获取锁。

    addWaiter

    private Node addWaiter(Node mode) { //mode=Node.EXCLUSIVE
            //将当前线程封装成Node,并且mode为独占锁
            Node node = new Node(Thread.currentThread(), mode); 
            // Try the fast path of enq; backup to full enq on failure
            // tail是AQS的中表示同步队列队尾的属性,刚开始为null,所以进行enq(node)方法
            Node pred = tail;
            if (pred != null) { //tail不为空的情况,说明队列中存在节点数据
                node.prev = pred;  //讲当前线程的Node的prev节点指向tail
                if (compareAndSetTail(pred, node)) {//通过cas讲node添加到AQS队列
                    pred.next = node;//cas成功,把旧的tail的next指针指向新的tail
                    return node;
                }
            }
            enq(node); //tail=null,将node添加到同步队列中
            return node;
        }

    ReentrantLock.unlock()

     public void unlock() {
            sync.release(1);
        }

    release

    1 释放锁 ;2 唤醒park的线程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    tryRelease

    这个动作可以认为就是一个设置锁状态的操作,而且是将状态减掉传入的参数值(参数是1),如果结果状态为0,就将排它锁的Owner设置为null,以使得其它的线程有机会进行执行。在排它锁中,加锁的时候状态会增加1(当然可以自己修改这个值),在解锁的时候减掉1,同一个锁,在可以重入后,可能会被叠加为2、3、4这些值,只有unlock()的次数与lock()的次数对应才会将Owner线程设置为空,而且也只有这种情况下才会返回true。

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases; // 这里是将锁的数量减1
        if (Thread.currentThread() != getExclusiveOwnerThread())// 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { 
    // 由于重入的关系,不是每次释放锁c都等于0,
        // 直到最后一次释放锁时,才会把当前线程释放
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    unparkSuccessor

    在方法unparkSuccessor(Node)中,就意味着真正要释放锁了,它传入的是head节点(head节点是占用锁的节点),当前线程被释放之后,需要唤醒下一个节点的线程

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {//判断后继节点是否为空或者是否是取消状态,
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) //然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点, 至于为什么从尾部开始向前遍历,因为在doAcquireInterruptibly.cancelAcquire方法的处理过程中只设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的
                    s = t;
        }
    //内部首先会发生的动作是获取head节点的next节点,如果获取到的节点不为空,则直接通过:“LockSupport.unpark()”方法来释放对应的被挂起的线程,这样一来将会有一个节点唤醒后继续进入循环进一步尝试tryAcquire()方法来获取锁
        if (s != null)
            LockSupport.unpark(s.thread); //释放许可
    }

     

    展开全文
  • AQS(AbstractQueuedSynchronizer)抽象队列同步器 AQS是并发编程包里面的一个抽象类 实现的子类有哪些,基于AQS实现了,线程池里面用到了,显示锁用到了,读写锁用到了,信号量用到了… JDK并发里面的同步组件的一...

    AQS(AbstractQueuedSynchronizer)抽象队列同步器
    AQS是并发编程包里面的一个抽象类
    在这里插入图片描述
    实现的子类有哪些,基于AQS实现了,线程池里面用到了,显示锁用到了,读写锁用到了,信号量用到了…

    JDK并发里面的同步组件的一个基础的构件,用来构建同步组件

    AQS中比较重要的成员变量:state 同步状态

        /**
         * The synchronization state.
         */
        private volatile int state;
    

    模板方法的设计模式

    模版方法:

    做一件事情 dosomething
    Dosomething (){
        doFirst();
        doSecond();
        …
        doTenth();
    }

    用模版方法做蛋糕:

    /**
     * 类说明:抽象蛋糕模型
     */
    public abstract class AbstractCake {
        protected abstract void shape();/*造型*/
        protected abstract void apply();/*涂抹*/
        protected abstract void brake();/*烤面包*/
    
        /*做个蛋糕 */
        public final void run(){
            this.shape();
            this.apply();
            this.brake();
        }
    
    }
    
    /**
     * 类说明:芝士蛋糕
     */
    public class CheeseCake  extends AbstractCake {
    
        @Override
        protected void shape() {
            System.out.println("芝士蛋糕造型");
        }
    
        @Override
        protected void apply() {
            System.out.println("芝士蛋糕涂抹");
        }
    
        @Override
        protected void brake() {
            System.out.println("芝士蛋糕烘焙");
        }
    }
    
    /**
     * 类说明:奶油蛋糕
     */
    public class CreamCake extends AbstractCake {
        @Override
        protected void shape() {
            System.out.println("奶油蛋糕造型");
        }
    
        @Override
        protected void apply() {
            System.out.println("奶油蛋糕涂抹");
        }
    
        @Override
        protected void brake() {
            System.out.println("奶油蛋糕烘焙");
        }
    }
    
    /**
     * 类说明:生产蛋糕
     */
    public class MakeCake {
        public static void main(String[] args) {
            AbstractCake cheeseCake = new CheeseCake();
            AbstractCake creamCake = new CreamCake();
            cheeseCake.run();
            creamCake.run();
        }
    }
    
    芝士蛋糕造型
    芝士蛋糕涂抹
    芝士蛋糕烘焙
    奶油蛋糕造型
    奶油蛋糕涂抹
    奶油蛋糕烘焙
    

    实现自己的同步类的时候就是用模版方法,只要实现其中几个方法就行了
    比如要实现独占锁,就要实现AQS里面的独占的方法:
    tryAcquire本身是没有任何实现的,调用它会抛出异常,这个方法是要我们去实现的

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    比如要实现共享同步组件类:

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

    实现一个类似于ReentrantLock的锁 (显示锁)

    显示锁继承Lock接口
    在这里插入图片描述
    要实现独占锁,拿锁lock()、释放锁unlock() 一定要实现,这两个方法如何实现,就要借助AQS

    /**
     *类说明:实现我们自己独占锁,不可重入
     */
    public class SelfLock implements Lock {
        // 静态内部类,自定义同步器
        private static class Sync extends AbstractQueuedSynchronizer {
    
            /*判断处于占用状态*/
            @Override
            protected boolean isHeldExclusively() {
                return getState()==1;
            }
    
            /*获得锁*/
            @Override
            protected boolean tryAcquire(int arg) {
                if(compareAndSetState(0,1)){
                    //Exclusive排他的独占的
                    //拿到这把锁后就承包了,排他
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            /*释放锁*/
            @Override
            protected boolean tryRelease(int arg) {
                if(getState()==0){
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            /* 返回一个Condition,每个condition都包含了一个condition队列*/
            Condition newCondition() {
                return new ConditionObject();
            }
        }
    
        /* 仅需要将操作代理到Sync上即可*/
        private final Sync sync = new Sync();
    
    
        //调用内部类的两个方法就行了
        public void lock() {
        	System.out.println(Thread.currentThread().getName()+" ready get lock");
            sync.acquire(1); //acquire方法里面调用的是tryAcquire
            System.out.println(Thread.currentThread().getName()+" already got lock");
        }
    
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        public void unlock() {
        	System.out.println(Thread.currentThread().getName()+" ready release lock");
            sync.release(1);
            System.out.println(Thread.currentThread().getName()+" already released lock");
        }
    
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        public boolean isLocked() {
            return sync.isHeldExclusively();
        }
    
        public boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    }
    
    /**
     *
     *类说明:测试我们自己的独占锁的实现
     */
    public class TestMyLock {
    
        public void test() {
            final Lock lock = new SelfLock();
            class Worker extends Thread {
    
    			public void run() {
                    lock.lock();
                    System.out.println(Thread.currentThread().getName());
                    try {
                        SleepTools.second(1);
                    } finally {
                        lock.unlock();
                    }
                }
            }
            // 启动4个子线程
            for (int i = 0; i < 4; i++) {
                Worker w = new Worker();
                //w.setDaemon(true);
                w.start();
            }
            // 主线程每隔1秒换行
            for (int i = 0; i < 10; i++) {
            	SleepTools.second(1);
                //System.out.println();
            }
        }
    
        //四个线程打印名字
        public static void main(String[] args) {
            TestMyLock testMyLock = new TestMyLock();
            testMyLock.test();
        }
    }
    
    Thread-0 ready get lock
    Thread-2 ready get lock
    Thread-2 already got lock
    Thread-2
    Thread-1 ready get lock
    Thread-3 ready get lock
    Thread-2 ready release lock
    Thread-2 already released lock
    Thread-1 already got lock
    Thread-1
    Thread-1 ready release lock
    Thread-1 already released lock
    Thread-0 already got lock
    Thread-0
    Thread-0 ready release lock
    Thread-3 already got lock
    Thread-3
    Thread-0 already released lock
    Thread-3 ready release lock
    Thread-3 already released lock
    

    调用acquire模版方法,acquire里面的tryAcquire方法是需要我们去实现的

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    AQS的基本思想CLH队列锁
    基于链表的自旋锁
    CLH三个人的名字头字母即Craig, Landin, and Hagersten (CLH) locks
    任意时刻只有一个线程拿到锁,其他的线程在外面排队,拿锁的线程释放了会唤醒线程去竞争锁,没有拿到锁的线程都要排队,一一排好队形成一个链表
    凡是要排队的线程,打包成一个QNode,里面至少有三个变量:当前线程、myPred(前驱节点)、locked(=true需要获得锁)
    每一个节点都在不停地自旋检测前一个节点有没有释放锁(locked==false)
    AQS的基本思想是会不断的自旋,在具体的实现上是双向列表,不会自旋很多次,两次自旋失败后就会把当前线程挂起
    在这里插入图片描述
    回头看Lock的实现

    锁的可重入
    改一下tryAcquire方法,判断是我自己的话state+1

    /**
     *类说明:实现我们自己独占锁,可重入
     */
    public class ReenterSelfLock implements Lock {
        /* 静态内部类,自定义同步器*/
        private static class Sync extends AbstractQueuedSynchronizer {
    
            /* 是否处于占用状态*/
            protected boolean isHeldExclusively() {
                return getState() > 0;
            }
    
            /* 当状态为0的时候获取锁*/
            public boolean tryAcquire(int acquires) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }else if(getExclusiveOwnerThread()==Thread.currentThread()){
                    setState(getState()+1);
                    return  true;
                }
                return false;
            }
    
            /* 释放锁,将状态设置为0*/
            protected boolean tryRelease(int releases) {
                if(getExclusiveOwnerThread()!=Thread.currentThread()){
                    throw new IllegalMonitorStateException();
                }
                if (getState() == 0)
                    throw new IllegalMonitorStateException();
    
                setState(getState()-1);
                if(getState()==0){
                    setExclusiveOwnerThread(null);
                }
                return true;
            }
    
            /* 返回一个Condition,每个condition都包含了一个condition队列*/
            Condition newCondition() {
                return new ConditionObject();
            }
        }
    
        /* 仅需要将操作代理到Sync上即可*/
        private final Sync sync = new Sync();
    
        public void lock() {
        	System.out.println(Thread.currentThread().getName()+" ready get lock");
            sync.acquire(1);
            System.out.println(Thread.currentThread().getName()+" already got lock");
        }
    
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        public void unlock() {
        	System.out.println(Thread.currentThread().getName()+" ready release lock");
            sync.release(1);
            System.out.println(Thread.currentThread().getName()+" already released lock");
        }
    
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        public boolean isLocked() {
            return sync.isHeldExclusively();
        }
    
        public boolean hasQueuedThreads() {
            return sync.hasQueuedThreads();
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    }
    
    public class TestReenterSelfLock {
    
        static final Lock lock = new ReenterSelfLock();
    
        public void reenter(int x){
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+":递归层级:"+x);
                int y = x - 1;
                if (y==0) return;
                else{
                    reenter(y);
                }
            } finally {
                lock.unlock();
            }
    
        }
    
        public void test() {
            class Worker extends Thread {
    			public void run() {
                    System.out.println(Thread.currentThread().getName());
                    SleepTools.second(1);
                    reenter(3);
                }
            }
            // 启动3个子线程
            for (int i = 0; i < 3; i++) {
                Worker w = new Worker();
                w.start();
            }
            // 主线程每隔1秒换行
            for (int i = 0; i < 100; i++) {
            	SleepTools.second(1);
            }
        }
    
        public static void main(String[] args) {
            TestReenterSelfLock testMyLock = new TestReenterSelfLock();
            testMyLock.test();
        }
    }
    
    Thread-0
    Thread-1
    Thread-2
    Thread-0 ready get lock
    Thread-0 already got lock
    Thread-0:递归层级:3
    Thread-0 ready get lock
    Thread-0 already got lock
    Thread-0:递归层级:2
    Thread-0 ready get lock
    Thread-0 already got lock
    Thread-0:递归层级:1
    Thread-0 ready release lock
    Thread-0 already released lock
    Thread-0 ready release lock
    Thread-0 already released lock
    Thread-0 ready release lock
    Thread-0 already released lock
    Thread-2 ready get lock
    Thread-2 already got lock
    Thread-2:递归层级:3
    Thread-2 ready get lock
    Thread-2 already got lock
    Thread-2:递归层级:2
    Thread-2 ready get lock
    Thread-2 already got lock
    Thread-2:递归层级:1
    Thread-2 ready release lock
    Thread-2 already released lock
    Thread-2 ready release lock
    Thread-2 already released lock
    Thread-2 ready release lock
    Thread-2 already released lock
    Thread-1 ready get lock
    Thread-1 already got lock
    Thread-1:递归层级:3
    Thread-1 ready get lock
    Thread-1 already got lock
    Thread-1:递归层级:2
    Thread-1 ready get lock
    Thread-1 already got lock
    Thread-1:递归层级:1
    Thread-1 ready release lock
    Thread-1 already released lock
    Thread-1 ready release lock
    Thread-1 already released lock
    Thread-1 ready release lock
    Thread-1 already released lock
    

    公平和非公平锁
    公平:每一个线程都放到队列的队尾,挂到最后一起排队
    非公平:新进程可以抢占拿锁

    展开全文
  • AQS(abstractqueuedSynchronizer)抽象队列同步器,基于模板方法模式,是用来构建锁或者其他同步组件的基础框架。它使用int成员变量(state)来表示同步状态,通过内置的FIFO队列(CLH队列)来完成资源获取线程的...
  • AQS,全称AbstractQueuedSynchronizer,即抽象队列同步器,和CAS共同撑起了整个java.util.concurrent包,同时也是Java并发编程上绕不开的一个概念 抽象队列同步器,以下统称AQS,用于解决的就是多线程并发访问控制...
  • Java学习笔记9-AQS抽象队列同步器 AQS抽象队列同步器 我们看下Lock相关源码发现 JDK内部将一些共性逻辑抽象为AbstractQueuedSynchronizer类,应用模版方法设计模式通过子类实现不同锁的效果。AQS提供了对资源占用、...
  • AQS(AbstractQueuedSynchronizer抽象队列同步器) 是一个抽象类,提供了一个框架,用于实现阻塞锁(ReentrantLock、ReetrantReadWriteLock,尽管这两个类不是直接继承自AQS,但是他们的内部类Sync继承了AQS)或是...
  • 同步锁的本质 —— 排队 同步的方式:独享锁 - 单个队列窗口,共享锁 - 多个队列窗口; 抢锁的方式:插队抢(非公平锁)、先来后到抢(公平锁)...手写抽象队列同步器 import java.util.Iterator; import java.util...
  • AQS是AbstractQueuedSynchronizer的简写,翻译过来就是:抽象队列同步器。AbstractQueuedSynchronizer在java.util.concurrent.locks包中,声明如下: public abstract class AbstractQueuedSynchronizer extends ...
  • 全称抽象队列同步器。类如其名,维护了队列同步的一个抽象类,在JCU包中有着基石的作用,比如可重入锁、信号量、CountDownLanch实现它的独享模式或共享模式资源获取或者释放来完成自己的功能,LCH队列图。 二、源码...
  • 并发编程之AQS(抽象队列同步器)共享锁前言Semaphore信号量Semaphore 的构造semaphore.acquire()doAcquireSharedInterruptibly(int arg)semaphore.release()unparkSuccessor(h)实列分析入队解锁出队...
  • JAVA AQS抽象队列同步器详解

    千次阅读 2017-09-21 08:47:50
    之前看到一篇很好的介绍AQS抽象队列同步器的文章,分享下。框架维护了一个state(代表共享资源,注意是volatile修饰的保证可见性)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)state的访问方式有...
  • 抽象队列同步器(AQS-AbstractQueuedSynchronizer) 从名字上来理解: 抽象:是抽象类,具体由子类实现 队列:数据结构是队列,使用队列存储数据 同步:基于它可以实现同步功能 我们就从这几个方面来入手解读,但首先,我们...
  • 通过ReentrantLock理解AbstractQueuedSynchronizer抽象队列同步器(JDK 1.8)什么是AbstractQueuedSynchronizer从ReentrantLock到AQSAbstractQueuedSynchronizer核心方法AbstractQueuedSynchronizer中的state信号量...
  • 抽象队列同步器AQS应用之阻塞队列BlockingQueueArrayBlockingQueue 类重要属性构造器put()方法AQSawait() BlockingQueue是线程间通信的工具,在任意时刻、不管并发有多高,在单JVM上,同一时间永远都只有一个线程...
  • 并发编程之AQS(抽象队列同步器)独占锁前言AQS原理同步器队列结构ReentrantLock类构造队列的数据结构信息Nodelock过程lock图解过程unlock过程unlock图解过程总结 前言 在介绍AQS之前,我专门写了CAS、Volatile、...
  • AQS抽象队列同步器

    2020-02-03 22:12:58
    AQS:同步状态的获取与释放
  • Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,...
  • AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果你有看过类似 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,246
精华内容 498
关键字:

抽象队列同步器