精华内容
下载资源
问答
  • AQS原理

    2021-09-22 16:36:01
    AQS是AbstarctQueuedSynchronizer 的简称 ,是一个用于构建锁和同步容器的框架。 juc并发 包内许多类都是基于 AQS 构建的 ReentrantLock ReentrantReadWriteLock FutureTask AQS 解决了在实现同步容器时大量的细节...

    概念

    AQS是AbstarctQueuedSynchronizer 的简称 ,是一个用于构建锁和同步容器的框架。

    juc并发 包内许多类都是基于 AQS 构建的

    • ReentrantLock
    • ReentrantReadWriteLock
    • FutureTask

    AQS 解决了在实现同步容器时大量的细节问题。
    在这里插入图片描述

    • AQS 使用一个 FIFO 队列表示排队等待锁的线程,队列头结点称作 “哨兵节点” ,它不与任何线程关联。
    • 其他的节点与等待线程关联,每个阶段维护一个等待状态 waitStatus。

    功能

    • 独占锁:每次只能有一个线程持有锁, ReentrantLock 就是以独占方式实现的互斥锁;
    • 共享锁:允许多个线程同时获取锁,并发访问共享资源,比如 ReentrantReadWriteLock。

    内部原理

    AQS 的实现依赖内部的同步队列,也就是 FIFO 的双向队列,如果当前线程竞争锁失败,那么 AQS 会把当前线程以及等待状态信息构造成一个 Node 加入到同步队列中,同时再阻塞该线程。当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点 (线程)。
    在这里插入图片描述
    一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其实就是个双端双向链表

    • AQS 队列内部维护的是一个 FIFO 的双向链表,这种结构的特点是每个数据结构都有两个指针,分别指向直接的后继节点和直接前驱节点。
    • 双向链表可以从任意一个节点开始,很方便的访问前驱和后继。
    • 每个 Node 其实是由线程封装,当线程争抢锁失败后会封装成 Node 加入到 ASQ 队列中去。

    工作流程

    添加线程

    当出现锁竞争以及释放锁的时候,AQS 同步队列中的节点会发生变化
    在这里插入图片描述

    • 队列操作的变化:新的线程封装成 Node 节点追加到同步队列中,设置 prev 节点以及修改当前节点的前置节点的 next 节点指向自己;
    • tail 指向变化:通过同步器将 tail 重新指向新的尾部节点。

    销毁线程

    第一个 head 节点表示获取锁成功的节点,当头结点在释放同步状态时,会唤醒后继节点,如果后继节点获得锁成功,会把自己设置为头结点
    在这里插入图片描述

    • head 节点指向:修改 head 节点指向下一个获得锁的节点;
    • 新的获得锁的节点:第二个节点被 head 指向了,此时将 prev 的指针指向 null,因为它自己本身就是第一个首节点,所以 pre 指向 null。

    AQS 与 ReentrantLock 的联系

    ReentrantLock 是根据 AQS 实现的独占锁,提供了两个构造方法

     public ReentrantLock() {
            sync = new NonfairSync();
        }
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
    

    ReentrantLock 有三个内部类:Sync,NonfairSync,FairSync
    在这里插入图片描述

    这三个内部类都是基于 AQS 进行的实现,所以ReentrantLock 是基于 AQS 进行的实现。

    ReentrantLock 提供两种类型的锁:

    • 公平锁-FairSync
    • 非公平锁-NonfairSync
    • 默认实现是 NonFairSync。
    展开全文
  • 谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知...AQS实现原理 AQS中 维护了一个volatile int state(代表共享资源)和一个FIFO

    谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLockReentrantReadWriteLockCountDownLatchSemaphore等都是基于AQS来实现的。

    我们先看下AQS相关的UML图:

    image.png

    思维导图:

    image.png

    AQS实现原理

    AQS中 维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。

    这里volatile能够保证多线程下的可见性,当state=1则代表当前对象锁已经被占有,其他线程来加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,比列会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

    另外state的操作都是通过CAS来保证其并发修改的安全性。

    具体原理我们可以用一张图来简单概括:

    image.png

    AQS 中提供了很多关于锁的实现方法,

    • getState():获取锁的标志state值
    • setState():设置锁的标志state值
    • tryAcquire(int):独占方式获取锁。尝试获取资源,成功则返回true,失败则返回false。
    • tryRelease(int):独占方式释放锁。尝试释放资源,成功则返回true,失败则返回false。

    这里还有一些方法并没有列出来,接下来我们以ReentrantLock作为突破点通过源码和画图的形式一步步了解AQS内部实现原理。

    目录结构

    文章准备模拟多线程竞争锁、释放锁的场景来进行分析AQS源码:

    三个线程(线程一、线程二、线程三)同时来加锁/释放锁

    目录如下:

    • 线程一加锁成功时AQS内部实现
    • 线程二/三加锁失败时AQS中等待队列的数据模型
    • 线程一释放锁及线程二获取锁实现原理
    • 通过线程场景来讲解公平锁具体实现原理
    • 通过线程场景来讲解Condition中await()signal()实现原理

    这里会通过画图来分析每个线程加锁、释放锁后AQS内部的数据结构和实现原理

    场景分析

    线程一加锁成功

    如果同时有三个线程并发抢占锁,此时线程一抢占锁成功,线程二线程三抢占锁失败,具体执行流程如下:

    image.png

    此时AQS内部数据为:

    image.png

    线程二线程三加锁失败:

    image.png

    有图可以看出,等待队列中的节点Node是一个双向链表,这里SIGNALNodewaitStatus属性,Node中还有一个nextWaiter属性,这个并未在图中画出来,这个到后面Condition会具体讲解的。

    具体看下抢占锁代码实现:

    java.util.concurrent.locks.ReentrantLock .NonfairSync:

    static final class NonfairSync extends Sync {
    
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    

    这里使用的ReentrantLock非公平锁,线程进来直接利用CAS尝试抢占锁,如果抢占成功state值回被改为1,且设置对象独占锁线程为当前线程。如下所示:

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    

    线程二抢占锁失败

    我们按照真实场景来分析,线程一抢占锁成功后,state变为1,线程二通过CAS修改state变量必然会失败。此时AQSFIFO(First In First Out 先进先出)队列中数据如图所示:

    image.png

    我们将线程二执行的逻辑一步步拆解来看:

    java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    先看看tryAcquire()的具体实现:
    java.util.concurrent.locks.ReentrantLock .nonfairTryAcquire():

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    nonfairTryAcquire()方法中首先会获取state的值,如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加state值,这就是可重入锁的具体实现,累加state值,释放锁的时候也要依次递减state值。

    如果state为0,则执行CAS操作,尝试更新state值为1,如果更新成功则代表当前线程加锁成功。

    线程二为例,因为线程一已经将state修改为1,所以线程二通过CAS修改state的值不会成功。加锁失败。

    线程二执行tryAcquire()后会返回false,接着执行addWaiter(Node.EXCLUSIVE)逻辑,将自己加入到一个FIFO等待队列中,代码实现如下:

    java.util.concurrent.locks.AbstractQueuedSynchronizer.addWaiter():

    private Node addWaiter(Node mode) {    
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    这段代码首先会创建一个和当前线程绑定的Node节点,Node为双向链表。此时等待对内中的tail指针为空,直接调用enq(node)方法将当前线程加入等待队列尾部:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    第一遍循环时tail指针为空,进入if逻辑,使用CAS操作设置head指针,将head指向一个新创建的Node节点。此时AQS中数据:

    image.png

    执行完成之后,headtailt都指向第一个Node元素。

    接着执行第二遍循环,进入else逻辑,此时已经有了head节点,这里要操作的就是将线程二对应的Node节点挂到head节点后面。此时队列中就有了两个Node节点:

    image.png

    addWaiter()方法执行完后,会返回当前线程创建的节点信息。继续往后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    逻辑,此时传入的参数为线程二对应的Node节点信息:

    java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued():

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndChecknIterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    acquireQueued()这个方法会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功过则将当前节点设置为head节点,然后空置之前的head节点,方便后续被垃圾回收掉。

    如果加锁失败或者Node的前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法
    head节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程。

    此时AQS中的数据如下图:

    image.png

    此时线程二就静静的待在AQS的等待队列里面了,等着其他线程释放锁来唤醒它。

    线程三抢占锁失败

    看完了线程二抢占锁失败的分析,那么再来分析线程三抢占锁失败就很简单了,先看看addWaiter(Node mode)方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    此时等待队列的tail节点指向线程二,进入if逻辑后,通过CAS指令将tail节点重新指向线程三。接着线程三调用enq()方法执行入队操作,和上面线程二执行方式是一致的,入队后会修改线程二对应的Node中的waitStatus=SIGNAL。最后线程三也会被挂起。此时等待队列的数据如图:

    image.png

    线程一释放锁

    现在来分析下释放锁的过程,首先是线程一释放锁,释放锁后会唤醒head节点的后置节点,也就是我们现在的线程二,具体操作流程如下:

    image.png

    执行完后等待队列数据如下:

    image.png

    此时线程二已经被唤醒,继续尝试获取锁,如果获取锁失败,则会继续被挂起。如果获取锁成功,则AQS中数据如图:

    image.png

    接着还是一步步拆解来看,先看看线程一释放锁的代码:

    java.util.concurrent.locks.AbstractQueuedSynchronizer.release()

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    这里首先会执行tryRelease()方法,这个方法具体实现在ReentrantLock中,如果tryRelease执行成功,则继续判断head节点的waitStatus是否为0,前面我们已经看到过,headwaitStatueSIGNAL(-1),这里就会执行unparkSuccessor()方法来唤醒head的后置节点,也就是我们上面图中线程二对应的Node节点。

    此时看ReentrantLock.tryRelease()中的具体实现:

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    执行完ReentrantLock.tryRelease()后,state被设置成0,Lock对象的独占锁被设置为null。此时看下AQS中的数据:

    image.png

    接着执行java.util.concurrent.locks.AbstractQueuedSynchronizer.unparkSuccessor()方法,唤醒head的后置节点:

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    这里主要是将head节点的waitStatus设置为0。

    此时重新将head指针指向线程二对应的Node节点,且使用LockSupport.unpark方法来唤醒线程二

    被唤醒的线程二会接着尝试获取锁,用CAS指令修改state数据。
    执行完成后可以查看AQS中数据:

    image.png

    此时线程二被唤醒,线程二接着之前被park的地方继续执行,继续执行acquireQueued()方法。

    线程二唤醒继续加锁

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    此时线程二被唤醒,继续执行for循环,判断线程二的前置节点是否为head,如果是则继续使用tryAcquire()方法来尝试获取锁,其实就是使用CAS操作来修改state值,如果修改成功则代表获取锁成功。接着将线程二设置为head节点,然后空置之前的head节点数据,被空置的节点数据等着被垃圾回收

    此时线程二获取锁成功,AQS中队列数据如下:

    image.png

    等待队列中的数据都等待着被垃圾回收。

    线程二释放锁/线程三加锁

    线程二释放锁时,会唤醒被挂起的线程三,流程和上面大致相同,被唤醒的线程三会再次尝试加锁,具体代码可以参考上面内容。具体流程图如下:

    image.png

    此时AQS中队列数据如图:

    image.png

    公平锁实现原理

    上面所有的加锁场景都是基于非公平锁来实现的,非公平锁ReentrantLock的默认实现,那我们接着来看一下公平锁的实现原理,这里先用一张图来解释公平锁非公平锁的区别:

    非公平锁执行流程:

    image.png

    这里我们还是用之前的线程模型来举例子,当线程二释放锁的时候,唤醒被挂起的线程三线程三执行tryAcquire()方法使用CAS操作来尝试修改state值,如果此时又来了一个线程四也来执行加锁操作,同样会执行tryAcquire()方法。

    这种情况就会出现竞争,线程四如果获取锁成功,线程三仍然需要待在等待队列中被挂起。这就是所谓的非公平锁线程三辛辛苦苦排队等到自己获取锁,却眼巴巴的看到线程四插队获取到了锁。

    公平锁执行流程:

    image.png

    公平锁在加锁的时候,会先判断AQS等待队列中是存在节点,如果存在节点则会直接入队等待,具体代码如下.

    公平锁在获取锁是也是首先会执行acquire()方法,只不过公平锁单独实现了tryAcquire()方法:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire():

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    这里会执行ReentrantLock中公平锁的tryAcquire()方法

    #java.util.concurrent.locks.ReentrantLock.FairSync.tryAcquire():

    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    

    这里会先判断state值,如果不为0且获取锁的线程不是当前线程,直接返回false代表获取锁失败,被加入等待队列。如果是当前线程则可重入获取锁。

    如果state=0则代表此时没有线程持有锁,执行hasQueuedPredecessors()判断AQS等待队列中是否有元素存在,如果存在其他等待线程,那么自己也会加入到等待队列尾部,做到真正的先来后到,有序加锁。具体代码如下:

    #java.util.concurrent.locks.AbstractQueuedSynchronizer.hasQueuedPredecessors():

    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    这段代码很有意思,返回false代表队列中没有节点或者仅有一个节点是当前线程创建的节点。返回true则代表队列中存在等待节点,当前线程需要入队等待。

    image.png

    先判断head是否等于tail,如果队列中只有一个Node节点,那么head会等于tail

    接着判断(s = h.next) == null,这种属于一种极端情况,在enq()入队操作中,此时不是原子性操作,可能存在这种情况:

    YcAPpD.png

    在第一个红框处,例如 线程一 执行完成,此时head已经有值,而还未执行tail=head的时候,此时 线程二 判断 head != tail成立。而接着 线程一 执行完第二个红框处,此时tail = node,但是并未将head.next指向node。而这时 线程二 就会得到head.next == null成立,直接返回true。这种情况代表有节点正在做入队操作。

    如果head.next不为空,那么接着判断head.next节点是否为当前线程,如果不是则返回false。大家要记清楚,返回false代表FIFO队列中没有等待获取锁的节点,此时线程可以直接尝试获取锁,如果返回true代表有等待线程,当前线程如要入队排列,这就是体现公平锁的地方。

    非公平锁公平锁的区别:
    非公平锁性能高于公平锁性能。非公平锁可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,CPU也不必取唤醒所有线程,会减少唤起线程的数量

    非公平锁性能虽然优于公平锁,但是会存在导致线程饥饿的情况。在最坏的情况下,可能存在某个线程一直获取不到锁。不过相比性能而言,饥饿问题可以暂时忽略,这可能就是ReentrantLock默认创建非公平锁的原因之一了。

    Condition实现原理

    Condition简介

    上面已经介绍了AQS所提供的核心功能,当然它还有很多其他的特性,这里我们来继续说下Condition这个组件。

    Condition是在java 1.5中才出现的,它用来替代传统的Objectwait()notify()实现线程间的协作,相比使用Objectwait()notify(),使用Condition中的await()signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition

    其中AbstractQueueSynchronizer中实现了Condition中的方法,主要对外提供awaite(Object.wait())signal(Object.notify())调用。

    Condition Demo示例

    使用示例代码:

    /**
     * ReentrantLock 实现源码学习
     * @author 一枝花算不算浪漫
     * @date 2020/4/28 7:20
     */
    public class ReentrantLockDemo {
        static ReentrantLock lock = new ReentrantLock();
    
        public static void main(String[] args) {
            Condition condition = lock.newCondition();
    
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println("线程一加锁成功");
                    System.out.println("线程一执行await被挂起");
                    condition.await();
                    System.out.println("线程一被唤醒成功");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                    System.out.println("线程一释放锁成功");
                }
            }).start();
    
            new Thread(() -> {
                lock.lock();
                try {
                    System.out.println("线程二加锁成功");
                    condition.signal();
                    System.out.println("线程二唤醒线程一");
                } finally {
                    lock.unlock();
                    System.out.println("线程二释放锁成功");
                }
            }).start();
        }
    }
    

    执行结果如下图:

    image.png

    这里线程一先获取锁,然后使用await()方法挂起当前线程并释放锁线程二获取锁后使用signal唤醒线程一

    Condition实现原理图解

    我们还是用上面的demo作为实例,执行的流程如下:

    image.png

    线程一执行await()方法:

    先看下具体的代码实现,#java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject.await()

     public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    await()方法中首先调用addConditionWaiter()将当前线程加入到Condition队列中。

    执行完后我们可以看下Condition队列中的数据:

    image.png

    具体实现代码为:

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

    这里会用当前线程创建一个Node节点,waitStatusCONDITION。接着会释放该节点的锁,调用之前解析过的release()方法,释放锁后此时会唤醒被挂起的线程二线程二会继续尝试获取锁。

    接着调用isOnSyncQueue()方法是判断当前的线程节点是不是在同步队列中,因为上一步已经释放了锁,也就是说此时可能有线程已经获取锁同时可能已经调用了singal()方法,如果已经唤醒,那么就不应该park了,而是退出while方法,从而继续争抢锁。

    此时线程一被挂起,线程二获取锁成功。

    具体流程如下图:

    image.png

    线程二执行signal()方法:

    首先我们考虑下线程二已经获取到锁,此时AQS等待队列中已经没有了数据。

    接着就来看看线程二唤醒线程一的具体执行流程:

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    

    先判断当前线程是否为获取锁的线程,如果不是则直接抛出异常。
    接着调用doSignal()方法来唤醒线程。

    private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    
    /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    这里先从transferForSignal()方法来看,通过上面的分析我们知道Condition队列中只有线程一创建的一个Node节点,且waitStatueCONDITION,先通过CAS修改当前节点waitStatus为0,然后执行enq()方法将当前线程加入到等待队列中,并返回当前线程的前置节点。

    加入等待队列的代码在上面也已经分析过,此时等待队列中数据如下图:

    image.png

    接着开始通过CAS修改当前节点的前置节点waitStatusSIGNAL,并且唤醒当前线程。此时AQS中等待队列数据为:

    image.png

    线程一被唤醒后,继续执行await()方法中的while循环。

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    因为此时线程一的waitStatus已经被修改为0,所以执行isOnSyncQueue()方法会返回false。跳出while循环。

    接着执行acquireQueued()方法,这里之前也有讲过,尝试重新获取锁,如果获取锁失败继续会被挂起。直到另外线程释放锁才被唤醒。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    此时线程一的流程都已经分析完了,等线程二释放锁后,线程一会继续重试获取锁,流程到此终结。

    Condition总结

    我们总结下Condition和wait/notify的比较:

    • Condition可以精准的对多个不同条件进行控制,wait/notify只能和synchronized关键字一起使用,并且只能唤醒一个或者全部的等待队列;

    • Condition需要使用Lock进行控制,使用的时候要注意lock()后及时的unlock(),Condition有类似于await的机制,因此不会产生加锁方式而产生的死锁出现,同时底层实现的是park/unpark的机制,因此也不会产生先唤醒再挂起的死锁,一句话就是不会产生死锁,但是wait/notify会产生先唤醒再挂起的死锁。

    总结

    这里用了一步一图的方式结合三个线程依次加锁/释放锁来展示了ReentrantLock的实现方式和实现原理,而ReentrantLock底层就是基于AQS实现的,所以我们也对AQS有了深刻的理解。

    另外还介绍了公平锁非公平锁的实现原理,Condition的实现原理,基本上都是使用源码+绘图的讲解方式,尽量让大家更容易去理解。

    参考资料:

      1. 打通 Java 任督二脉 —— 并发数据结构的基石
        https://juejin.im/post/5c11d6376fb9a049e82b6253
      2. Java并发之AQS详解https://www.cnblogs.com/waterystone/p/4920797.html
    展开全文
  • AQS 原理

    2021-02-22 23:35:58
    1.* AQS 原理 1.概述 全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的==同步器类==工具的框架 (其它同步器工具类继承它,重用它的功能) 特点: 用 state 属性来表示资源的状态(分独占模式和共享模式),...

    1.* AQS 原理

    1.概述

    全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的==同步器类==工具的框架 (其它同步器工具类继承它,重用它的功能)

    特点:

    • 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
      • getState - 获取 state 状态
      • setState - 设置 state 状态
      • compareAndSetState - cas 机制设置 state 状态
      • 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
    • 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList条件
      • EntryList是在c++层面实现的,aqs是纯java实现
    • 条件变量来实现等待、唤醒机制,支持多个条件变量,
      • 具体的某个条件变量类似于 Monitor 的 WaitSet(持有锁后,但条件不满足,就放弃锁,进入到WaitSet中等待)

    子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)

    • tryAcquire
    • tryRelease
    • tryAcquireShared
    • tryReleaseShared
    • isHeldExclusively

    获取锁的姿势

    // 如果获取锁失败
    if (!tryAcquire(arg)) { // 只会尝试1次
        // 入队, 可以选择阻塞当前线程    (park&unpark机制实现阻塞当前线程或恢复当前线程)
    }
    

    释放锁的姿势

    // 如果释放锁成功
    if (tryRelease(arg)) {
        // 让阻塞线程恢复运行
    }
    

    2.自定义锁

    有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

    @Slf4j
    public class TestAqs {
    
        public static void main(String[] args) throws InterruptedException {
            MyLock lock = new MyLock();
    
            log.debug("main start");
    
            lock.lock();
    
            Thread t1 = new Thread(() -> {
    
                log.debug("t1 start");
                lock.lock();           // t1此时将拿不到锁,所以将会进入队列等待main线程释放锁,
                log.debug("t1 end");   //    main线程释放锁后,t1线程拿到锁后,继续运行
    
            }, "t1");
            t1.start();
    
            Thread.sleep(2000);
            log.debug("main开始释放锁...");
            lock.unlock();
            log.debug("main释放锁成功");
    
        }
    
    }
    
    // 自定义锁 (不可重入锁)
    class MyLock implements Lock{
    
        private MySync sync = new MySync();
    
        // 独占锁  同步器类
        class MySync extends AbstractQueuedSynchronizer{
    
            @Override
            protected boolean tryAcquire(int arg) {
                // 将state通过cas改为1的线程,将会加锁成功
                if (compareAndSetState(0, 1)) {
                    // 加上了锁,并设置 owner为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                setExclusiveOwnerThread(null);
                // state变量被volatile修饰,保存在同步器父类AQS中,
                //    将对state状态变量的写操作放在设置owner线程下方,是为了防止指令重排序
                setState(0);
                return true;
            }
    
            @Override // 是否持有独占锁
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            public Condition newCondition() {
                return new ConditionObject(); // AQS类中的public类ConditionObject,它实现了Condition接口
            }
    
        }
    
        @Override // 加锁 ( 不成功, 进入等待队列 )
        public void lock() {
            sync.acquire(1);
        }
    
        @Override // 加锁 (不成功,进入等待队列, 可打断)
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override // 尝试加锁 (尝试1次, 不成功返回,不进入队列)
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override // 尝试加锁 (不成功, 进入等待队列, 有时限)
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1,unit.toNanos(time));
        }
    
        @Override // 解锁
        public void unlock() {
            sync.release(1);
        }
    
        @Override // 创建条件变量
        public Condition newCondition() {
            return sync.newCondition();
        }
    
    }
    

    不可重入测试
    如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)

    lock.lock();
    log.debug("locking...");
    lock.lock(); // 第二次又去获取锁,但是该锁是不可重入锁
    log.debug("locking...");
    
    展开全文
  • aqs 原理

    2021-04-25 15:24:35
    aqs 原理 1.介绍aqs 1.1 扯淡 我的理解: 一个管理状态值和队列的同步器. 状态值:是否可以被占有.(Node的状态,比如被占有啊等待啊,初始啊...aqs提供api,子类来重写) 队列:等待获取锁的队列(FIFO) . 同步器: 一次只...

    aqs 原理

    1.介绍aqs

    1.1 扯淡

    我的理解:

    一个管理状态值和队列同步器.

    状态值:是否可以被占有.(Node的状态,比如被占有啊等待啊,初始啊...aqs提供api,子类来重写)
    
    队列:等待获取锁的队列(FIFO) .
    
    同步器: 一次只活跃一个Node(或者多个node,比如独占锁和共享锁).
    
    发生竞争的多线程,被抽象成一个队列,按照aqs的规则,合理执行.
    
    //关于依赖状态值的并发代码
    
    那么依赖状态值,来控制代码,有如下的思考过程.
        
    ==>一个这样的demo.生产/消费,如果满了,就不生产,如果没了,就生产
    
    ==>那么当程序遇到false时候,可以通过轮询或者睡眠合理的时间,等待条件为true,这样还是会有空档期,执行效率不高
    ==>这些都是,等待唤醒
    
    ==>触发唤醒
    ==>在到条件队列(wait,notify),需要注意notify,notifyAll.以及信号丢失(过早wait或者其他),就那块的小坑.
    这种信号驱动的唤醒机制,会比轮询睡眠那种快很多,可是有这个问题,notifyAll,相当于所有等待线程被唤醒,在重新竞争一次,这样还是会浪费计算机资源(竞争,和上下文操作)    
    
    ==>其实上面的问题,就是一个锁的问题.
    
    --------------------------------------------------------------------------------------    
    ==>ReentrantLock,利用的是信号驱动(park,unpark),替换这个wait,notify.
        
    ==>在到后来我们可以想办法把,锁这个东西封装,信号驱动的事情封装,在有了reentrantlock中的condition
    通过维护相应的队列,合适的状态,并且封装好.
    
    结论:利用条件队列,信号驱动,解决好计算机竞争,以及其他问题.而aqs,在使用条件队列维护状态值,的时候,就特别巧妙.就是说,如何设计这个条件队列,让他吞吐量最高,或者功能更好?    fifo 队列,信号驱动(park,unpark)
    
        
    --------------------------------------------------------------------------------------   
      synchronize, 和reentrantLock.先分开看,分别学他们的原理.再来体会区别  
        
    
    //关于锁  synchronize 与  ReentrantLock
    
    jdk为我们提供了synchronize关键字,不考虑偏向锁和轻量锁,重量锁是依靠monitor对象实现的,那么在发生竞争的时
    候,他会在hotsput里面创建一个队列(阻塞啊,等待啊),wait,notify,那里也一样.都是依赖队列
    这里的aqs,他也是干这种事的.jdk中synchronize的实现方法看不到,得翻hotspot的源码,麻烦,这里可以直接看aqs的源码
    

    源码的注释

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hpRerZ6T-1619335449443)(D:\Users\cua\Desktop\ty\aqs 原理.assets\1578115959619.png)]

    aqs 的注释, 实现锁的一个框架,依赖同步器(信号量,事件,等等) 依赖一个fifo的登台队列.这个类的目的是是一个有用的基础,大多数类型的同步器.重点是 队列 信号量

    我打算通过,各个组件的代码,来表达aqs的功能.例如,在reentrantLock中,aqs提供模版方法(模版方法,完成Acquire(),和Release(),并且完成状态值赋值),让子类实现,自己维护队列,来实现锁.他是怎么能把代码和功能,这么完美的拆分出来,理解不了.

    1.2 aqs的组成.

    1.2.1 api
    aqs{
        Node{
            前后节点
            状态值
            是否共享
            线程
           nextWaiter
        }
        ConditionObject{
            等看完,实现,再去看这的api ==>位了模拟wait notify的功能准备的,另一组队列,
        }
        获得独占锁()
        获得共享锁()
        释放锁()
        cas操作()
        偏移量
        节点信息(head,tail)
        
    }
    

    太多了,就简单看一下,

    1.2.2 数据结构

    所有阻塞线程,抽象成一个双向链表.俩俩共享一组park,unpark.

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-up4sI5XR-1619335449447)(aqs 原理.assets\1578120962188.png)]

    阻塞队列的结构图

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GuetB82n-1619335449448)(aqs 原理.assets\1578205285040.png)]

    ​ Condition 实现原理图

    1.2.3 代码逻辑

    ReentrantLock锁功能:

    比如,reentrantLock,lock在阻塞的时候.将线程信息,封装成Node,加入tail后,进行park.等待前pre节点unpark.

    具体细节逻辑,看代码.

    在结果上看:reentrantLock实现锁的时候,只做了尝试获得锁(成功修改状态),尝试释放锁(成功,修改状态).

    当然,这个获得锁,一定是cas操作. 至于,轮询,等待,都交给了aqs. ==>前者只做tryAcquire,tryRealease,并修改状态,后者解决队列的信息.

    Condition 锁功能的逻辑.

    先使用Condition,明白他的功能.他有几个特征,1.暂时释放锁 2.等待重新排队获得锁. aqs实现逻辑是.

    当Condition,需要wait的时候(这个时候已经持有锁),那就把他从主队列拿出,排在Condition队列尾部.

    当被唤醒的时候,在加入到主队列的尾部(执行signall),继续竞争锁,当然那里会重复校验,node是否在主队列中,毕竟unpark,不知道什么时候就触发了.==>首先分析,他的这个功能,也就差不多知道代码怎么写的了

    2.通过ReentrantLock理解aqs

    2.1 介绍功能

    1.锁.

    2.条件锁 Condition

    2.2 自己如何设计

    如果自己设计一个独占锁.(依靠状态值)

    lock{
     int i=0 ;
        lock(){
            i=1;
        }
        unlock(){
            i=0;
        }
    }
    ==> 不支持并发,可能一堆人获取到锁
    lock{
     int i=0 ;
        lock(){
        while(compareandsetstate(0,1))
            i=1;
        }
        unlock(){
            i=0;
        }
    }
    ==>支持并发,但是如果所有线程都在竞争,岂不是浪费cpu资源?yield的话,如果竞争多了,还是会浪费资源.
        
    lock{
     int i=0 ;
     waitQueue   queue ;
          
       lock(){
       	while(!compareandsetstate(0,1)){//如果上锁不成功
            queue.add(thisThead)
            thisthread.park() //放出cpu
        }
    }
    
        unlock(){
           waitQueue.getnextThread.unpark() //唤醒下一个
        }
    }
    
    ==>一个接一个的使用资源,并且支持并发
    

    如何设计一个条件锁 Condition.

    1.await的时候,要释放锁.那就得唤醒队列的下一个值,并且当前线程,是放在队尾,还是,放在另外的数据结构里,如果是队尾,那么唤醒的时候,又要打破结构了.所以应该放在另外的地儿(队列).
    2.当前条件 park  等待其他线程的unpark().   那就由,其他持有锁的线程,唤醒该线程.当唤醒该线程的时候,他又重新去竞争锁,那优先级是什么样的???所以,还是得加入到aqs主队列里面,重新等待获取锁.
    3.一个condition,维护一个单独队列.  await 释放锁.
    结论: 准备一个condition队列,当signal的时候,又重新竞争锁,在加入主队列
    
    
    lock.lock()
    condition1.await()
    lock.unlock()
    
    
    lock.lock()
    condition1.signall()
    lock.unlock()
    
    
    
    

    2.3 aqs与其分别做了什么(代码)

    翻阅了,ReentrantLock的lock(),unLock(). 对于锁这件事,

    ==>aqs只让reentrantLock做了两件事,尝试一次获取锁,若成功修改队列状态值.尝试一次释放锁,若成功,修改状态值.

    ==>aqs,操作利用子类的api,完成了,队列的吞吐.

    拿出几个核心方法.,走一遍,lock() unlock()

    //1.支持重入锁.
    //2.释放锁的时候,只有一个线程在操作,所以没有多余的cas操作.    
    //3.通过当前方法,释放锁,★改变队列状态setState
    //4.tryRelease  和 tryAcquire (这里粘的是公平锁api)
    protected final boolean tryRelease(int releases) {
        //计算当前的state值,重入一次+1
                int c = getState() - releases;
        //如果其他线程调用release,则会抛出异常.
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
        //如果重入锁都退出了,清空数据.
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
        //设置状态
                setState(c);
                return free;
            }
    
    
    //尝试获取锁,判断状态,如果可被占用,则进行一次cas操作(尝试占有锁)
    //已被占用的话,看是不是重入锁,这是reentrantLock,自己实现重入的判断.
    //★通过当前方法加锁,setState (cas操作)
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
    //获取锁的过程 
    
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //这里判断当前节点前一个节点,这个地方会不会,有逻辑错误???
                    //这里只有在,前一个节点是头节点的情况下,才开始阻塞,才进入cas(死循环获得锁,死circular在for(;;)),
                    //否则就进入下面的if里面的locksupport.lock方法,进入等待了.减少不必要的空转.
                    //==>牛逼.
                    //疑问??? 轮询的时候如果竞争比较大,导致head节点为空呢==>是不是就进入永久等待了??==>是否node.predecessor(); 真的会抛出nullpointException.==>不可能为空,维护一个队列着呢   
                    //如果 p==head,永远不成立,那他永远不会获取锁啊.......==>所以看看unpark方法,或者看看初始化方法,看他怎么处理的head节点==>head节点一直存在,获取锁了,就会set,if后面的代码有.所以这里.p==head,迟早会成功.  如果前一个节点,执行完毕,p的pre是是谁?????head是谁. 等式能否成立.就感觉p==head
                    //总是会执行不成功.
                    //==>如果是按照队列顺序来执行
                    //那么这个等式就是成立,不论什么时候轮到这段代码执行.
                    //从代码里也能看到,如果其他线程也执行到了这儿,并且先执行了下面的代码,那么,并没有改变其他节点的pre值,所以pre值一直存在,如果不存在,那就是有问题的,抛出异常. 如果head节点
                    
          //感觉这里代码 逻辑不通
                    //假设,在头节点,执行完毕,他没有动这个node的pre,那么p==head,就能成立,这个时候,他就可以拿资源.
                    //只有老二,获得锁成功之后,才会重新删除头节点.所以,
                    //我上面遇到的问题,是别的线程来操作,这个队列,导致,你自己变成了头节点,那么这个等式,可能永远不成立,==>首先这个命题是错误的,如果是,维护 一个队列来执行获取锁这个事,那么,按顺序来的话,他如果是老二(也就是即将被执行),那么就不可能别的线程来影响这段代码,因为别的线程进来就是wait(后面的partandcheckInterrupt方法)的.
                    //忘了上面,是在分析个啥??? 如果aqs维护好了,这个head,以及node.pre.这里应该就没啥问题.
                    //写的挺巧妙
                    //p.next = null; // help GC  这个怎么理解. 平时,当一个对象中的某一部分不用了,需要被回收了,我们可以让 object==null ,但是,这一部分与原对象还是有引用的,比如a.b=xxx, map 的某一节点值置位null,但是这个数key还是存在.最好的办法就是,map删除这个对象,内部会删除这个key,这个key也就失去了map的引用,就会被回收.去除改对象和周围节点的引用,        那么在链表中呢? ,链表中, node 被上一个节点.next引用,被下一个节点.pre 引用. 我们删除这个引用, 上个节点.next=null 下一个节点.pre=null  去除引用.就帮助gc
                    //p.next = null 为了帮助 node 被回收.
                    //在上一轮里,p.next = null ,这一轮里, node.prev = null;所以head被回收
                    // 1.判断,需要去除什么引用.  2.什么时间点去除引用.(在数据完全没有用处的时候)
                    //在aqs代码执行的时候,  Node对象存在于head为起点的引用链里.当Node 不与队列关联的时候,node就相当于一个成员变量消失了,就会被回收掉.
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
    
    
    
     //condition源码
     
     public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
         //同wait一样,执行await,需要释放当前锁
                int savedState = fullyRelease(node);
                int interruptMode = 0;
         //释放锁之后,node已经不在主队列中了,所以可以park(),等待signall的unpark
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    //模拟了wait的代码,可以监听线程是否被Interrupt.
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
         //acquireQueued(),代码执行到这里的是,已经被signall了,node,已经在主队列中了,执行这个方法
         //继续等待获得锁.
         //其余方法都是为了记录异常,并且完成相应的逻辑.
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    
    
    
    
    
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            //重新加入到队列中
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                //释放await代码的park()
                LockSupport.unpark(node.thread);
            return true;
        }
    

    2.4 总结

    1.关于公平锁,非公平锁. 
    差异:让不让插队.
    实现:在获取锁的时候,尝试先插一次队,在继续走aqs的代码.
    效果:咋说呢,排一次队,就需要线程park(),然后unpark,如果获取锁的时候,直接获取到锁,就减少了一个线程上下文操作,挺好.如果竞争比较激烈,插队次数越多,那么效率就越高呗. 如果竞争不激烈差不多吧,毕竟nofair方法,还多执行了一遍tryAcquire呢.
    
    2.分工: 
    ReentrantLock,提供单次获取锁操作,
    aqs,运用提供的方法,进行控制队列.
    
    3.从代码上看,竞争发生在尾部,头部很少会出问题吧.头部只需要设计好逻辑代码就成.尾部也会发生竞争.
    如果刚入队的哥们执行tryacquire,刚被唤醒的哥们也执行tryacquire 不就发生竞争了吗
    
    4.关于cas操作.
    Unsafe类,可以操作缓存.可以改变对象的属性值.(而且是原子操作),cas操作.==>自己敲敲api就知道了
    这里面,挺多都用unsafe里面的api.
    比如cas操作,compareAndSwapInt(Object obj, long offset, int expect, int update);
    具体,对象某个字段的内存地址便宜量,expect,和update值.
    
    
    
    

    3 通过ReentrantWriteReadLock 理解aqs.

    3.1 功能

    为了提高并发能力,针对同一把锁,有的时候,他的状态是可以允许几个方法同时执行,有的时候,只能有一个方法执行.但是,现在锁的功能是,我只能有一个方法执行.为了解决这种现象,发明了读写锁.

    ==>来满足,读读, 写读 共享 读写,写写互斥.

    3.2 如何设计

    为了满足上面提到的功能,我们应该如何设计.
    1.现在有aqs队列,如果是读读功能,那么在竞争锁的时候,先判断队列是共享状态,还是独占状态.在执行代码.
    ==>那么,如果此时是读状态,如果读状态一直持续,那么写的锁,就会出现饥饿状态.
    2.锁的升级,降级问题.  
    我在读的时候,可以写吗? (升级),已经是读读共享了(多线程持有锁),如果都读,都要写,那就会死锁了,每个写锁,都要等待其他读锁释放.
    我在写的时候,可以读吗? (降级),可以降级的,写的时候,只有一个线程持有,他进行读,不会出现死锁.
    3.关于数据结构方面
    如果读写锁竞争的线程都放在一个队列上.效率感觉都不会太高,
        ==>比如,当前是读锁,那么队列中的排队的都是写锁,如果当前是写锁,那么后续的线程,结构就会比较混乱,读写都有.
        ==>那么这样的情况下,代码变复杂了一些,只是提高了持有读锁的时候的并发性,可行吧.....
            
    那么如果,我弄两条队列,一个写队列,一个读队列.我建立他们合适的规则.是否可以提高并发性?
        ==>比如,我在读的时候,我就可以一口气让读队列所有数据获得锁.不至于像一条队列那样,数据不均匀
        ==>写的时候,锁竞争的排队,也比较均匀.    
    如果用两条队列,那么他们之间的规则是怎么样的呢?
        1.唤醒优先????当写线程操作,释放锁的时候,是唤醒读线程,还是写线程?(如果读线程释放锁,指定找写线程,因为读都执行完了)     
        2.读线程插队????? 如果锁是读线程持有,但又写线程正在等等待,让不让新来的竞争立即获得访问权限,如果可以的话,那并发性应该能提高,但是,写线程就会发生饥饿问题(现象上,有点像非公平锁的插队.)
        3.升级问题,上面已经解释了..
            
    ==>不知道,reentrantwritereadlock 是咋涉及的,看源码....        
            
    

    3.3 读写锁源码的设计

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1LgrxZp4-1619335449451)(D:\Users\cua\Desktop\ty\aqs 原理.assets\1578221922903.png)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kLtHSFbK-1619335449452)(D:\Users\cua\Desktop\ty\aqs 原理.assets\1578390380513.png)]

    3.4 aqs在里面做了什么.

    对于共享锁,aqs完成了,队列操作. 完成了共享功能,和独占功能.

    ==>ReentrantWRLock 提供 一次获得锁的动作,释放锁的动作 (tryAcquireShared tryRelease)

    这个动作,可以完成,根据队列状态值,(成功失败获得锁,分别做什么).比如你获得锁了,其实就是update state 成功. 没获得锁你就返回啥啥,你只需要提供一次动作就好,其他交给aqs 释放锁,也是update,state. 具体队列怎么运转的全部交给aqs.

    ==>而队列的Node的状态管理,全部由aqs完成.

    3.5 部分源码解读.

    //共享锁,ReentrantWriteReadLockg 一次获取锁的动作
    
            protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                //如果有独占锁,并且独占锁的线程不是当前线程,则return GG
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                
                int r = sharedCount(c);
                //如果不阻塞,就尝试一次cas操作
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    //处理缓存,以及第一读者. 缓存:每一个线程都有自己的CacheHolderCounter.
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                //如果尝试一次失败,在进行,完整的尝试获取共享锁(在没有独占锁的情况下)
                //这个和上面代码差不多,只不过是轮询,进行共享锁插队,直到获得共享锁.
                //==>允许共享锁,插队.
                return fullTryAcquireShared(current);
            }
    
    
    //把state 值的前16位当做共享锁,后16位当做读写锁,从下面的位移运算就能看出来,这个操作方式,可以借鉴!!!
            static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** Returns the number of shared holds represented in count  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** Returns the number of exclusive holds represented in count  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
    //readerShouldBlock()的后续方法,如果有前置结点,就阻塞==>为什么?当有前置节点的时候不让插队?
    //首先这个方法是判断,当前thread之前,队列中是否有数据? 如果有,那么就不走这个代码,走后面的cas操作,插队
    //进入共享锁.
    
    // 2020.0421 ,判断,是否有等待更久的结点.  
    //分析这个链表,如果有长度,那么,如果head.next!=currentThraed,就说明除,还有等待的节点.
    //如果还未初始化,那么,可能就出现并发入队的现象. head先初始化,tail紧跟随后.
    //如果tail==null,head!=null说明,有结点,正在入队.正在初始化,所以((s = h.next) == null成立. 如果都等于空,那么就没有等待节点. 不可能tail!=null head=null,head先初始化.
    //如果,下面的 定义顺序颠倒.  也是发生并发入队,先获得head 值,这个时候tail可能初始化,完成,可能初始化未完成.这种方式写代码, h != t  捕获不到这个瞬时状态,相当于漏判.(可能h==t)
    //如果,定义顺序和初始化顺序相反,先定义tail,那么 h!=t,就能捕获到,这个初始化状态
    //==> h!=t     (s = h.next) == null 捕获这期间是否初始化.  
        public final boolean 
        hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            //方法的正确性,依赖于头节点比尾节点优先初始化,当前节点是补是第一个节点.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    //这段代码,在并发条件下,判断当前队列第二位是否有值(不是自己线程)
    //==>如果是非并发代码,我们要确定有没有其他节点.  h是不是等于null(未初始化). 初始化了(就是当前线程在队列头) 
    //==>我们来分析这段代码,
    //如果t==null,head==null(跟初始化顺序有关) 		则 返回false
    //		或者 head!=null(head正在被初始化,tail还没有被赋值),那么head!=tail,h.next==null,就说明有一个刚完成初始化(此时,有一个线程持有锁,另一个线程来竞争,然后初始化队列,后者还未持有锁)
    	
    //如果t!=null,那么head!=null(初始化完成)
    //h==t 队列无等待者返回true
    //h!=t 第一种情况是上面的情况,第二种情况就是,该线程达到第二位.或者其他线程到达第二位.
    
    
    
    //Queries whether any threads have been waiting to acquire longer
    //than the current thread.   这是 源码注释,判断是不是有  "等待" 更长时间的线程,强调等待状态,那就是,不是已经获得锁的人,就是判断,队列第二位,有没有其他线程.队列第一位,正在获得锁,就不算等待的node.
    //当队列的饿head=tail的时候,要么,队列只有一个节点,他正在获得锁(只有获得锁的时候,才有setHead,头和尾才能相等.)
    //方法的意义,就是判断,这个时候,我尝试获取锁,合不合理,如果说,我是第二位,(第一位已经获得锁,)那我就可以尝试一次,very good.
    //源码的注释,很准确,后面的问题,也会依赖这个方法
    
    
    
    //如果当前是独占锁,那么走的代码,就是ReentrantLock那一套.
    
    
     private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //如果当前节点是第二节点,才尝试获取锁,增加效率
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                         //获取锁后,设置头节点,并唤醒下一个节点,下一个节点唤醒下一个节点(如果是共享锁的话)
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    ./阻塞代码
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    //取消节点,重新拼接有效的节点
                    cancelAcquire(node);
            }
        }
    
    
    
    
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            //propagate > 0,已经持有共享锁,其他判断每看懂????????????h
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
            
            
                private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
              /**以下分析都没有错,单独写一个模块解释
              解决释放锁共享锁的问题,尽管有其他线程也在释放/获得锁.根据是否需要,释放下一个节点.
              另外,我们必须循环,防止新结点添加进来.我们需要检查cas状态.
             ★★ 关于cas操作
             ★★ 当有新节点加入的时候需要park,会把上一个节点waitStatus变为-1,可能会出现cas失败
             ★★★ 释放下一个节点可能会出现并发.   当 A线程 c==0 并释放共享锁锁的时候+后面的B节点刚被唤醒(c==1了),他们俩并发执行同下面的同一段代码,这段代码通过cas操作node的状态值,避免了重复释放.
             首先,这两段代码,并发的时候,head相同,那就是,看谁先cas操作成功了,不管谁先cas成功,都不会重复unpark.  
             假设,head.waitStatus = 0 那么第一个人就把他变为PROPAGATE状态,第二个进来的,就直接退出循环了.
             假设,head.waitStatus = -1 ,那么第一个人把他变为0,第二个人就把他变为 PROPAGATE 状态.这种抢矿可以吗????????疑问.
             关于head结点可能变掉.A,B 线程有一个线程执行了unParkSuccessor,C被Unpark(),那么head改变,那么另一个线程和C进行并行.那么就继续循环,释放下一个合适的节点.
             结论: 在并发环境下,避免了重复释放,并且让本次释放动作不失效(一定是有原因的退出循环)
             疑问: 有一个已经赋值为0 了,其他又把他赋值为-3 能行吗?
             
              */
                    //问题: 这里为什么要进行cas操作的. 进行到这里了,头结点还会被改变?
                    //这个问题详细说明.最后会在总结里面介绍,并用图,解释,到底哪里会发生并发操作.
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {//代表队列有数据
                    int ws = h.waitStatus;
                    //设置当前node节点为signal成功,才去唤醒下一个节点
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        //唤醒下一个可用节点
                        unparkSuccessor(h);
                    }
                    //如果头节点,不需要唤醒,那就设置当前节点为可传播状态
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
            
       //当前节点,只会唤醒自己的下一个节点...     
    
    ==>A,B,C 三个线程(都是共享锁).
    ==>当 A 获得锁后,会unpark(B),
    ==>A,执行的快,会先执行setHeadAndPropagate[A],此时c=0.B线程被释放了还未执行c值操作,以及setHeadAndPropagate[B]
    ==>如果B特别慢,那么A线程就会走完setHeadAndPropagate[A],A的waitStatus值由0->-3 就走完了.(初始为0,是因为,他在获得共享锁的时候执行了unparkSuccessor(a),会将自己的waitStatus变为0)
    ==>如果B稍微快点,head结点被改变了(c=1),A将继续循环,这个时候.h.waitStatus = -1 或者 0(初始值为0,如果有后续结点就为-1),那么这个时候就会出现并发操作.A线程和B线程同时操作head结点,所以需要cas操作.
    	如果有后续结点(h.waitStatus=-1),在cas控制下,他只能被unpark()一次.另一个竞争失败的,会执行if(head==h)中 bread; 方法退出循环.(AB有竞争,操作同一个head,所有用cas操作)
    	如果没有后续结点,那么(h.waitStatus=0),那就if (h != null && h != tail) 直接退出循环了.
    	如果有后续结点(h.waitStatus=-1),一个执行快,一个执行慢,那么第一个线程把h.waitStatus变为0,第二个线程把他变为-3,也是有可能的.
    	如果有后续结点(h.waitStatus=0,加入到了队列中,还没有进行park,正在赋值前节点为-1),那么也能进入方法,可能会发生竞争(ABD竞争,所以用cas操作)
    
    ==>注:
    1.当新节点入队的时候,会先入队(设置tail),在进行cas操作,把前节点waitStatus变为-1.
    2.当unparkSuccessor(h) 会把自己的节点变为 0 也就这一个地方操作已经被唤醒节点了(在共享锁释放锁,和共享锁获得锁的时候).
    3.只有操作同一个head的时候,才会出现竞争. AB竞争发生在,A释放锁,B获得锁(A轮询了一圈,或者感开始轮询就发现head已经和B一样.此时h.waitStatus不一定是多少,h.waitStatus=-1,已经有新的节点加入,此时AB竞争同时操作waitStatis等于-1; 
    h.waitStatus=0,没有后续结点了初始值为0,那么在外层退出for;;
    h.waitStatus=0,有后续结点,还没进行cas操作.刚入队,此时竞争,ABD,同时竞争;入队那哥们一定会一直竞争的.他那里的判断条件也是小于=0 就会尝试赋值,包含了这种情况,所以很nb nb.
    4.当B线程执行的时候,就代表有C不存在(waitStatus=0),或者C存在是读线程(waitStatus=-1).
    
    那么这个方法的目的是什么?
    cas操作成功,释放head的下一个线程.如果没有下一个线程,则设置当前线程为可传播状态,并且解决竞争问题!!!
    continue的逻辑是为了完成每一个动作,让他满意离开,如果cas失败,就再来一次,无非就是0,-1两种情况.
    
    那这段代码,是怎么设计出来的?????
    1.为了处理队列下一个值,如果有,则解锁,如果没有则标记状态.
    2.那么这个过程,会有竞争吗???  释放锁和获得锁有竞争              刚入队的时候,和前两者都有竞争.ws=0
    如何处理,cas操作,竞争成功的线程,才能做动作.竞争失败,退出循环.
    如果顺序执行呢,node结点就变成传播状态.
    
    ==>结论:脑袋不够,设计不出来.
    
    1.先明确要干什么
    2.解决了什么样的竞争.
    3.完善每一个竞争,每一个流程的结果.
    
    参照,上面的图
    
    
        final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
            //读线程缓存计数
                HoldCounter rh = null;
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
                        //判断前面是否有等待时间更长的读线程,如果没有,cas操作,获取一次锁
                    } else if (readerShouldBlock()) {
                        // Make sure we're not acquiring read lock reentrantly
                        if (firstReader == current) {
                            //支持重入锁
                            // assert firstReaderHoldCount > 0;
                        } else {
                            if (rh == null) {
                                //这个东西,一直是记录,最后一个操作的线程
                                rh = cachedHoldCounter;
                                if (rh == null || rh.tid != getThreadId(current)) {
                                    rh = readHolds.get();
                                    if (rh.count == 0)
                                        //仅仅情空空当前线程的数据
                                        readHolds.remove();
                                }
                            }
                            if (rh.count == 0)
                                //队列中还有等待更长的结点,所以去尾部插队去
                                return -1;
                        }
                    }
                    //当没有等待更长的结点,那就正常cas操作,并且记录缓存数据.
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (sharedCount(c) == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            if (rh == null)
                                rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current))
                                rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                            cachedHoldCounter = rh; // cache for release
                        }
                        return 1;
                    }
                }
            }
    

    3.6 总结

    1.发生竞争的点在哪里?
    凡是有cas代码的地方,都有可能发生竞争.
    ==>读锁,发生在读锁持有的时候,"插队读",以及队尾的写线程
    ==>写锁,主要发生在队尾入队.
    2.关于waitStatus,初始为0,有nextNode,并且需要park的时候,会编程-1(signall),当准备unpark的时候,会变成0.
    3.调试完代码,又另一种看法,关于读写锁的运行效率.   
        这么一种代码现象,如果头节点()阻塞,此时没有队列,是否可以有读线程进来,正常逻辑的话,我们只能通过释放锁唤醒,或者读取锁的过程,唤醒下一个读线程.这里读线程就可以直接加入了,并且不需要进行入队操作,原因是,在读写锁的tryAcquireShare,里面,有判断,你是否有等待时间更长的节点,如果没有,就可以尝试一次入队. 并且这操作,不需要添加到"尾节点",贼快.(这设计也太nb了)
        ==>针对,无队列时,读节点获得锁的时候,可以无限插队.
        ==>如果,有队列呢,如果读队列后面有写队列,那就插入不进去,如果读队列后有读队列,那么这个时候,根据代码,如果有等待时间比你长的读线程(就是说队列第二位有人,读线程还没释放完),那么这个时候,你尝试获得共享锁,你会尝试两次,加入队列(当然这个时候支持重入锁的),try一次,fully一次,如果没加入进去,就加入到队列尾部.
        ==>当然,这段时间非常短,当所有的读线程都释放完,那么head节点一直变化,直到最后一个读节点的时候,这个时候,就没有比你等待更长时间的读节点,这个时候,就会插入成功.    
        ==>照顾比你等待时间更长的读节点,并且还支持你的"插队"!!!!
        ==>读锁的,顺序又保障,速度也快.
    

    4. 通过BlockingQueue理解.

    4.1 功能

    1.简单翻阅BlockingQueue 接口的注释,和其他容器一样,有添加,删除,"奉献"技能(鼬给佐助).而且注释上面有一个简答的,producer-consumer模型的代码.

    某些api,看注释,翻译一下.

    add(obj):往队列里面增加一个对象,如果队列没有空间抛出异常,反之返回true。

    offer(obj): 往队列增加一个对象,返回true/false

    put(obj): 往队列增加一个对象,如果没有空间,则会阻塞改线程,直到有空间.

    注:一般返回void,都是阻塞的. 返回boolean的,都是有非阻塞的.

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vK39zprB-1619335449453)(D:\Users\cua\Desktop\ty\aqs 原理.assets\1578537106949.png)]

    BlockingQueue提供基本功能,实现者实现基本功能就ok.
    
    ArrayBlockingQueue:
    数组形式的阻塞队列.如果是阻塞的话,那就是这个容器,在full,Empty是会阻塞,等待唤醒.producer-consumer模型呗.api同Blocking相似.
    迭代功能???
    
    LinkBlockingQueue:
    链表形式的阻塞队列.也有Blocking的功能.producer-consumer的模型.实现上未知.有什么设计特点,看完源码在写.
    迭代功能????
    
    
    
    

    4.2 如何设计

      ArrayBlockingQueue
      应该比较简单
    1.首先这是一个 producer-consumer模型 的容器 
    2.创建一个Object[],一把锁,对所有添加,删除,读上锁.
    3.针对full,Empty的情况,建立条件锁.
    	注:如果不用条件锁,那就用老的synchronize,wait,notify.
    	   读也要上锁吗
    container{
        lock lock
        condition notempty
        condition notfull
        
        take(){
        lock.lock()
        
        isEmpty()	
        notEmpty.await()
        take
        notfull.signall
        
    	lock.unlock()    
        }
        
        poll(){
        lock.lock()
        isfull notfull.await
        poll
        notEmpty.signall
    	lock.unlock()       
        }
    }	   
    
      LinkBlockingQueue
    如果,我自己设计,估计和Arraybq的想法一样,一个容器,一把锁,俩条件,控制操作,
    
    
    1.首先这是一个 producer-consumer模型 的容器 
    2.创建一个Link
    3.针对full,Empty的情况,建立条件锁.
    
    
    
    

    4.3 源码设计.

    ArrayBlockingList. 
    
    基本原理:同上面的设计想法一样.
    
    其他设计:
    
    1.添加了指针,方便操作数组.fifo(双指针)
    
    2.这个数组是如何存取的,(哪存,哪出.)依赖1,指针.
    
    ==>我理解他是一个★★双指针★★,起始都是头结点,两个指针都是从0->max,当到max之后,0继续.而队列的出入队,跟着指针走就行,这样就做到了,fifo队列.
    
    ==>这不就以前操作链表的一种方式吗,以前写线程池源码的时候,也这样弄过线程数组.
    
    ==>★★★简单来说,就是放入的顺序,和取出的顺序一致就ok.★★★
    
    3.他的迭代器是如何工作的.
    
    如果多个线程要迭代,那就创建多个iterator(),itr内部类,itrs就是这些迭代器的一个链表.
    ==>1.为什么要自己实现呢迭代器?
    因为对于blq,迭代发生的时候,可能数据已经被改变了,每一个迭代过程中,执行时间不同.所以很容易,获得的数据不是那么准.
    ==>2.itr是个啥?
    这里的迭代器,依赖指针来实现,因为指针是相当于数据而言,不是数据而言,不会出现nullpe的问题,其他迭代器不知道.
                        cursor = NONE;
                        nextIndex = NONE;
                        prevTakeIndex = DETACHED;
    比如这样,他在创建迭代器的时候,会记录当时起始指针的位置,然后,依靠这个便利.当然这部分数据里面,有修正的代码,比如,有数据删除,他就通知所有的迭代器,修正一些.具体的不想研究,因为这本身获得的就不是一个准的数据,在并发条件下不是特别准,并不能说明什么.
    ==>3.需要使用迭代器的时候,再去研究.或者等看看其他源码用到blq的时候,他是如何使用迭代器的(我看看那帮孩子咋玩的)
    
    LinkBlockQueue
    基本原理.
    
    1.两把锁(入列,出列各一把,NotFull_NotEmpty),
    ==>对于Array那种,全局一把锁,可能某些情况下,并发性不如两把锁.
    ==>应该是比较 出列 和 入列速度.不能这么说,生产,消费模型,不就为了让吞吐率更高吗?如果速度有差异,那就得口控制下两者的速度,或者制定啥规则,让生产速度合理.  比如,我先多开点线程,多生产一些,在控制消费和生产的效率在一个level,就等待少,还能消费的快.消费能力的提高,那么锁的竞争就会比较激烈,毕竟就一把锁
    ==>搞两把锁,在不考虑full,和empty的情况下(一直有资源),竞争减少了一半.加快了并发性.
    就是一些操作的难度增大了,比如迭代(需要获得两把锁),或者其他什么情况,需要获得两把锁,代码的严谨性需要提高,
    ==>尽量不要做一些,需要两把锁的操作★★
    
    2.迭代器
    ==>我看到了,使用了全局锁,而且迭代过程中,也用了全局锁(俩锁一起获得)
    ==>支持SplIterator,支持多核电脑呗
    ==>其他业务不想细看.
    

    4.4 部分代码

    ArrayBlockingQueue

    //有返回值的入队    
    public boolean offer(E e) {
            //不支持空元素
            checkNotNull(e);
            //获得全局锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
    //入股full 则返回false.            
                if (count == items.length)
                    return false;
                else {
                    //入队
                    enqueue(e);
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
    //为什么要单独提出来一个瞬时值.
    //因为cas 操作,需要一个期待值,和变更值,来完成一个cas操作. 如果这个期待值,是个变量就没有意义.
    //我拿出来一个瞬时值,获得期待值,然后进行cas操作,竞争,如果竞争成功,就代表我这段代码操作符合逻辑.是原子性的.
    //所以aqs代码,很多都是这种形式,我们自己写并发代码的时候,也会经常用.
    
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            //这里为什么要拿出来一部分瞬时值,未知..(这个时候已经获得锁了啊)????????
            final Object[] items = this.items;
            //通过put指针入队
            items[putIndex] = x;
            //如果指针到达尾部,再从头部放进去
            if (++putIndex == items.length)
                putIndex = 0;
            //计数
            count++;
            notEmpty.signal();
        }
    
    
    //阻塞的入队
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    
    //迭代器的方法
            Itr() {
                // assert lock.getHoldCount() == 0;
                lastRet = NONE;
                final ReentrantLock lock = ArrayBlockingQueue.this.lock;
                lock.lock();
                try {
                    //如果没数据,维护一下指针的数据
                    if (count == 0) {
                        // assert itrs == null;
                        cursor = NONE;
                        nextIndex = NONE;
                        prevTakeIndex = DETACHED;
                    } else {
                        //获得当前出队指针,记录下当前的迭代器需要的指针
                        final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                        prevTakeIndex = takeIndex;
                        nextItem = itemAt(nextIndex = takeIndex);
                        cursor = incCursor(takeIndex);
                        if (itrs == null) {
                            itrs = new Itrs(this);
                        } else {
                            itrs.register(this); // in this order
                            itrs.doSomeSweeping(false);
                        }
                        prevCycles = itrs.cycles;
                        // assert takeIndex >= 0;
                        // assert prevTakeIndex == takeIndex;
                        // assert nextIndex >= 0;
                        // assert nextItem != null;
                    }
                } finally {
                    lock.unlock();
                }
            }
    
    
     itrs.doSomeSweeping(false);  维护迭代器数据的方法 //懒的看了,看注释上面的意思是,尽力找到过期的数据?
      //   还是说,换一些数据,更新迭代器?   ==>删除一些没有废弃的迭代器.
    
         //扫描过期的迭代器,置位null,并且重新更新node节点,前后信息
         //需要了解迭代器和这个方法的关系,在新加入迭代器节点和迭代的时候,会进行这个方法.所以,迭代器进行到哪一个点,不知道,sweeper记录一个节点..
      void doSomeSweeping(boolean tryHarder) {
                // assert lock.getHoldCount() == 1;
                // assert head != null;
                int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
                Node o, p;
                final Node sweeper = this.sweeper;
                boolean passedGo;   // to limit search to one full sweep //完整的扫描
    			//上次扫描到的点
         		//给两个指针赋值
         		//根据上次的位置,判断是否需要完整扫描
                if (sweeper == null) {
                    o = null;
                    p = head;
                    passedGo = true;
                } else {
                    o = sweeper;
                    p = o.next;
                    passedGo = false;
                }
    
                for (; probes > 0; probes--) {
                    //下一个节点(第二个指针)为空
                    if (p == null) {
                        if (passedGo)
                            break;//说明当前没数据,退出
                        o = null;
                        p = head;
                        passedGo = true; //已经没数据了,需要重头扫了
                    }
                    final Itr it = p.get();//第2个指针的node
                    final Node next = p.next;.//第二个指针的下一个node,先把值拿出来,方便赋值
                        //判断节点是否还有效,无效,则清除状态,判断是否需要退出循环
                        //继续向下循环,不想看了.
                    if (it == null || it.isDetached()) {
                        // found a discarded/exhausted iterator
                        probes = LONG_SWEEP_PROBES; // "try harder"
                        // unlink p
                        p.clear();
                        p.next = null;
                        if (o == null) {
                            head = next;
                            if (next == null) {
                                // We've run out of iterators to track; retire
                                itrs = null;
                                return;
                            }
                        }
                        else
                            o.next = next;
                    } else {
                        o = p;
                    }
                    p = next;
                }
    
                this.sweeper = (p == null) ? null : o;
            }    
         
         
    

    LinkBlockQueue

    //阻塞入队
        public void put(E e) throws InterruptedException {
            //空
            if (e == null) throw new NullPointerException();
            //好习惯,所有的takeput 方法 都提前准备好变量
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            //put锁,上锁.
            putLock.lockInterruptibly();
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();
                }
                //入队
                enqueue(node);
                //加1
                c = count.getAndIncrement();
                //判断容量,如果没有put没满,唤醒自己的队友
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                //解锁
                putLock.unlock();
            }
            //这里有疑问,正常应该,直接唤醒notempty就ok了呗.
            //这里的意思是,当队列为空的时候,唤醒非空.(消耗队列),因为现在已经添加进去了一个.
            //在简易点说,就是,c>0,就没阻塞的.如果c=0 就说明,指定有阻塞的,现在添加成功了,改唤醒他们了.
            //这样增加执行效率(没那么多唤醒了,就没那么多take锁的竞争了),增加代码的执行效率.
            if (c == 0)
                signalNotEmpty();
        }
    //入队,就比较简单了.
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            last = last.next = node;
        }
    
    
    //非阻塞入队
     public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
    
            if (e == null) throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            int c = -1;
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();
            try {
                while (count.get() == capacity) {
                    if (nanos <= 0)
                        return false;
                    //模拟wait(3000) 的功能,这段时间内释放锁,时间到后,加入竞争
                    nanos = notFull.awaitNanos(nanos);
                }
                enqueue(new Node<E>(e));
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return true;
        }
    
    //aqs的等待 方法
    //利用Locksuppots 方法,控制队列
     public final long awaitNanos(long nanosTimeout)
                    throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
         //添加节点
                Node node = addConditionWaiter();
         //释放锁
                int savedState = fullyRelease(node);
         //计算超时时间
                final long deadline = System.nanoTime() + nanosTimeout;
                int interruptMode = 0;
         //判断是否在主队列中,signall方法,会把合适的waiter加入到队列中.
                while (!isOnSyncQueue(node)) {
                    //如果超时时间到了,就取消等待
                    if (nanosTimeout <= 0L) {
                        transferAfterCancelledWait(node);
                        break;
                    }
                    if (nanosTimeout >= spinForTimeoutThreshold)
                        //在规定时间内,不尝试获取锁,超时过后,开始主动获取锁
                        LockSupport.parkNanos(this, nanosTimeout);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                    nanosTimeout = deadline - System.nanoTime();
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
                return deadline - System.nanoTime();
            }
    
    //另外,关于,await(3000) 理解. 知识代表,3秒里,你不参加竞争,3秒过后加入竞争,等待signal唤醒,而signal唤//醒,只代表一个或者一群,被唤醒,这帮东西,重新加入竞争. aqs完全实现了synchronize那家伙提供的功能.挺厉害 
    
    
    //关于locksuport 中这个
        public static void parkNanos(Object blocker, long nanos) {
            if (nanos > 0) {
                Thread t = Thread.currentThread();
                //给 thread 对象 设置属性值,方便管理线程.
                setBlocker(t, blocker);
                //native方法,操作系统的一个啥,一个"许可"指令
                UNSAFE.park(false, nanos);
                //设置完事了,在清空一下呗.
                setBlocker(t, null);
            }
        }
    
    
    
    
    //出列的方法
    
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            //同上面一样,为了代码效率,第一,执行唤醒生产线程,需要获取对方的锁,第二,我只在队列满的时候,告诉他我,继续生产,如果队列没满,他自己也会一直很生产.
            //这样减少了锁竞争, 唤醒自己锁的线程的主要任务都给了自己. 只有特殊情况(临界值的时候,只有临界值的时候,他自己不能唤醒自己),对方才唤醒他一次.  
            //只有临界情况才有锁的竞争.
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
    
    
    
    
    
    
    
    

    5. 通过其他功能性队列了解aqs

    5.1 通过 CountDownLatch

    1.功能,countdown()一定次数,则唤醒await线程.
    2.从功能上,看比较简单.
    ==>实现上,用aqs队列,共享锁功能
    ==>构造器,给定初始state,值.
    ==>wait上锁.
    ==>每次countdown(),减少一次状态值,到达次数之后,唤醒await线程.
    

    5.2 通过 Semaphore

    和上面的countdownlatch实现原理差不多.都是利用共享锁,创建一个初始值,然后控制值变化就行了,至于队列的状态,竞争,全部交给了aqs.
    

    5.3 通过LinkedTransferQueue

    简单来说:
    功能上讲,他是一个超集,有很多api.
    效率上讲,他是无锁操作,入队非阻塞算法,操作头尾节点的算法很nb.
    无锁:cas操作.
    非阻塞算法,入队,不会阻塞.1.无边界    出队,不一定
    关于出入队的算法: 以前对于队列,我们每次出队入队,需要利用cas操作更新相应的头尾节点,如果实时更新,每次都需要进行大量的cas操作,而这一次,我们延迟更新,当head节点,距离未被匹配的节点,到达两个节点的时候,才会更新头节点.当然,如果未匹配节点长了的话,你不更新,我每次都需要从头去找匹配的的空节点,或者尾节点,效率也不高.所以,他们用2
    
    
    concurrentQueue,算法上和他相似,但是功能比他简单,所以在使用上比较方便.
    
    
    功能:
    算法设计:
    1.队列结构:
    一个队列.所有的生产队列,和请求队列,都在一个队列上,同为Node,只不过他们的状态值不同.
    例如,初始的生产数据,Node,的isdata为true,item为object,初始的消费request,isdata为false,item为null.
    数据结构:全部数据在一起,利用属性值,表达每一个节点的信息(生产node,还是消费node)
    
    
    2.head节点设计.
    我们在出队入队的时候,头尾结点的更新,以前的做法是利用cas操作,实时更新.
    我们以前的head节点是实时的,比如,以前的队列,如果head结点释放锁了之后,就会更新head的节点,现在不会了.
    而是一种延迟更新的感觉.
    当head节点,距离未被匹配的节点,到达两个节点的时候,才会更新头节点.
    这么做的目的是什么?  以前操作一次,就要进行一次cas操作,现在不一样了,相当于,我匹配两个进行一次.
    ==>这个延迟更新的东西,有将就,如果短了,就像以前的操作一次,一回合cas,导致自旋过多.
       						如果长了,意味着,我每次新加入的节点都需要从头匹配,而且前几个都不太好使,那么我循环的次数也就多了.不容易命中.也不太好.
    ==>这就是多次cas操作,和多次每个人的索引次数之间的一个抉择.==>这是人想设计的东西吗?   				==>而且在不更新头结点的情况下,还能保证fifo,以及线程安全,这特么是人写的代码吗??
    
    3.这里面没有用到aqs,不过是另一种队列同步队列的实现.也是依赖cas操作,来实现的.好牛逼.
    
    
    4.进行简单的,代码测试,测试了,linkedtransferqueue,arrayblockqueue,linkblockqueue,第一个的吞吐率,很强.
    
    5.有个特点,有点像接收生产者的意思,无限生产
    如果你是放数据的,永远成功.一直是true.就添加到队列中了,等待那边消费.
    (和request相反,别想歪了.)而这里面的request,就是take代码
        public boolean offer(E e, long timeout, TimeUnit unit) {
            xfer(e, true, ASYNC, 0);
            return true;
        }
    
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-A7qZQAOI-1619335449455)(D:\Users\cua\Desktop\ty\aqs 原理.assets\1578903244295.png)]

    网上找的图.但是代码确实是好生涩…看了整整一个上午,才把一个方法看完.而且不打算看其他方法了.不是人脑写的代码.

    //tansferQueue,所有的take,put 方法,都是依赖这个核心方法写的,所有内容,全都揉在一起了.很是nb,而且没有锁的(没有依赖aqs,)
    //hava data,表示传进来的这个节点是否有数据,如果有数据则代表他是,生产的.否则为request,消费者,相对应的,E也为null.
    private E xfer(E e, boolean haveData, int how, long nanos) {
        //如果.是生产者,但是数据为空,则抛空,==>不让你产生空数据
        if (haveData && (e == null))
            throw new NullPointerException();
        Node s = null;                        // the node to append, if needed
        
    	//标记一个位置,下面continue retry,会返回到这里
        retry:
        for (;;) {                            // restart on append race
    //头结点不为空,才循环
            for (Node h = head, p = h; p != null;) { // find & match first node
                //这里两个赋值是有讲究的,参照上面
                boolean isData = p.isData;
                Object item = p.item;
                //item!=p 我这里没懂,item是数据项,p是node项,他们俩没有可比性啊???????????
                //右面判断有两种情况,这里是找到,未匹配的.
                //一种是,take,isdata为false,item为null
                //另一种是,put,isdata为true,item为object
                if (item != p && (item != null) == isData) { // unmatched
                    //isData代表的动作和havaData(方法带进来的本结点的动作).
                    //如果头结点动作相同,则不匹配,跳出循环.(这里只看头节点是否匹配,不匹配就走下面代码,然后加入队列),也有可能头节点匹配,但是cas失败,那就再找下一个节点在继续尝试,可能head就变了.
                    if (isData == haveData)   // can't match
                        break;
                    //如果可以匹配,那么就进行cas,操作
                    if (p.casItem(item, e)) { // match
                        //如果cas成功,则唤醒线程,并看情况更新头节点
                        //按照for循环体的判断, q=p=head, 如果q!=h,那么就是,进行了第二轮循环了呗.
                        //那么现在 q 和head 都已经被cas成功了,根据注释里面的思想,当head节点距离下一个未匹配的结点达到两个,就得更新头节点了
                        for (Node q = p; q != h;) {
                            Node n = q.next;  // update by 2 unless singleton
                            if (head == h && casHead(h, n == null ? q : n)) {
                                //让head节点.next=自己.  之后就被被gc标记住了,等待回收
                                h.forgetNext();
                                break;
                            }                 // advance and retry
                            //下面这个情况比较极端,就是cas失败,或者h节点已经被改变.原来的head节点可能已经被回收掉.head的一下个也被回收掉,因为别人cas成功了,head节点一更新,直接就是第三个结点为头节点.第三个方法,q没有被匹配,但是头节点也被回收掉了,就需要重新循环了(就是说,第4个结点进行cas操作,他成功了,更新了头节点,并且这个时候,就跳出循环,先把本次cas成功的数据解决掉,唤醒该唤醒的)
                            if ((h = head)   == null ||
                                (q = h.next) == null || !q.isMatched())
                                break;        // unless slack < 2
                            //注释上面说 slack 松弛度小于2 ,也就是head结点已经被第4个结点更新了, 第二位为null被回收了,可能为null.类似于这种情况,同时竞争一个节点,结果只有一个cas成功,然后另一个就cas下一个成功了,但是后一个cas成功的,又先执行cas,head,所有会出现这几个为null的情况.
                            //q.isatched,代表,上一个还没有cas成功,方法里面判断两种情况,如果是数据项,
                            //就判断他 ((x == null) == isData); isdata为true,并且item已经被匹配,为null
                            //另一种是非数据项,他判断 item==自己==>这特么在哪赋值了????首先,如果是take方法,他进来==>找到了,在下面的awaitMatch 方法里面,如果还没匹配,会将非数据节点的item,指向自己,里面有一个赋值方法.
                            //这个情况就是发生在被后面cas成功.
                        }
                        //解锁等待的线程.
                        LockSupport.unpark(p.waiter);
                        return LinkedTransferQueue.<E>cast(item);
                    }
                }
                Node n = p.next;
                //可能更换了头节点,后面,已经没有结点了,所以需要重新赋值一下
                p = (p != n) ? n : (h = head); // Use head if p offlist
            }
    
            if (how != NOW) {                 // No matches available
                if (s == null)
                    s = new Node(e, haveData);//添加结点,里面代码挺麻烦的,不看了
                Node pred = tryAppend(s, haveData);
                if (pred == null)
                    //竞争失败,就重来呗.
                    continue retry;           // lost race vs opposite mode
                if (how != ASYNC)
                    //非异步的就等待.
                    return awaitMatch(s, pred, e, (how == TIMED), nanos);
            }
            return e; // not waiting
        }
    }
    

    6.如何使用队列.

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eoDNRAK3-1619335449456)(D:\Users\cua\Desktop\ty\aqs 原理.assets\1583395038122.png)]

    linkedtransferQueue

    concurrentLinkedQueue 多用这个吧,api简单 算法同上.

    7.aqs 攻略

    从解决问题的思路来看待锁.

    独占锁 部分

    1.如何实现锁功能的.

    目的: 只有一方占有.

    2.如果自己设计

    自己实现一个锁,需要考虑什么问题?

    依赖什么来表达持有锁?   	
        ==>lock中的什么变量,当做标志.
    
    竞争的资源如何表达?     
        ==>当有n多个线程竞争的时候,未获得锁的资源,如何表达.
    
    竞争资源如何有效率的拿到锁?    
        ==>随机唤醒? sleep(time)? yield?
        
    竞争资源的操作,用什么控制?    并发入队,并发初始化,并发修改状态...
    
    状态值
    1.依赖状态值,表达锁的状态
    class  MyLock{
            int state =0 ;
       void lock(){
            state =1;
       }
       void unlock(){
            state =0;
       }
    }
    注:当然也可以用其他标志量,比如 true false .假设依赖true,false,如何设计可重入锁,如何设计共享锁???  ==>这里可以就把当做一个标志量,表达锁的状态.
    
    乐观锁->cas,解决资源竞争问题
    2.state为共享数据.
    
    class  MyLock{
            int state =0 ;
       void lock(){
           if( unsafe.compareAndSwapInt(this, stateOffset, 0, 1) );
           	//getLock
       }
       void unlock(){
      if( unsafe.compareAndSwapInt(this, stateOffset, 1, 0));
            //unlock
    }
    
    锁的思想
    悲观锁:锁住资源.
    	==>其他竞争者,会阻塞.
    	
    乐观锁:不锁住资源,检测资源变化.
        资源变化,代表这个操作非原子,所以本次操作失败.
        如果资源未变化,代表操作原子,本次操作成功
       	==>非阻塞算法
       	
       	
    sql:
    悲观锁:    
    select * from article_info where id=1 for update  //行锁
    update article_info set count=count+1;
        
    乐观锁:   
    select count from article_info where id=1   
    update  article_info set count=count+1 where count=count;  
    
    增加竞争条件后.分析二者的执行情况
        
    i++:
    见代码;
    
    
    
    
    
    
    
    队列与唤醒机制
    3.发生竞争,未获得锁的一方,如何操作? 
        // while(true)   sleep   yield   wait-notify  ?  
    
    
    class  MyLock{
      int state =0 ;
      
       void lock(){
         while(true){
     		 if(state==0){
         		 state =1;//cas 
         		 //上锁成功
      		 }
      //sleep  yield   wait-notify	等待获得锁
       }
     }
     
      void unlock(){
       if(state==1){
        state =0;//cas
       }
    }
    
    ==>为了不持续消耗资源,可以让线程先"休息"类似于 wait notify的方式.
    ==> UNSAFE.park();     依赖于信号量的 一种, 等待唤醒机制.(操作系统提供的一种原子性操作)
    ==>什么时候唤醒.
    ==>维护一个队列,lock维护一个队列,让所有竞争方休息,节省资源.当需要唤醒的时候唤醒.
    ==>Node->Node->Node->Node.    当然不能像Synchronize那样,全部唤醒,在一起竞争资源.
    前节点,唤醒后节点.
    

    开始讲源码

    3.aqs是什么
    1.aqs是一个用来实现锁功能的一个框架. 他内部维护了一个fifo队列. 从功能上讲对外提供操作state的api,对内准备维护队列逻辑的api.
     ==>把占有锁,释放锁,抽象成一件事(等价于操作状态值,然后aqs还提供api),一次竞争事件,然后自己代码里还能合理利用这个方法.(锁就提供一次操作的api,其他任务交给aqs)
    
    2.难点,在于基本所有地方的代码都带竞争性. 所以,得很严谨,敲的时候,一定很严谨.
    
    翻阅过程中: 重点在,
    如何表达锁.
    在并发条件下,如何保证线程安全.
    使用锁的时候流程.
    哪里高效解决问题.
    
    当这些都能理解了之后,再去提会代码的编程设计.
    
    4.condition 锁,是如何实现的呢.

    从功能上讲. 他想要像synchronize里面的wait,notify 功能一样. 提供线程的等待唤醒功能.
    当然等待唤醒都需要依赖锁,而condition,就是aqs配套的 wait,notify功能.

    ==>从aqs设计出来的效果上,以及逻辑上讲是这样的
    从功能上讲,waitnotify需要在同一把锁下,并且获得,锁.使用condition一样. 而且这个condition 设计出来之后,要比synchronize的要nb, 那个只有一种wait,这个可以有多个condition.
    从调用方式上来看.和synchronize的基本一样.
    从逻辑上讲. 每次await,会退出主队列,在维护一个队列,notify的时候,在重新加入主队列继续竞争.

    ==>那么是怎么设计出来的呢. 如果要你设计一个wait notify功能,你如何设计.

    直接从功能上分析

    wait方法的目的是 释放锁(出主队) 暂停线程==>park()? 等待唤醒
    notify的目的是 唤醒某个线程==>unpark() 继续加入竞争,就是要入主队.

    如果,有多个线程在同一个condition wait? ==>维护同一个 waiter 队列,

    wait(){
    Release()
    park()

    }

    notify(){
    unpark()
    addwaiter()==>竞争到锁,会执行.
    }

    共享锁部分

    直接从功能出发,去完成.

    读读读写写写读读

    在已知,独占模式的功能设计之后,我们这个共享锁,需要设计什么功能呢.怎么实现呢?
    1.读读 写读 读写他们之间的逻辑关系.
    2.是否支持锁的升降级.
    如何实现.
    1.直接去分析,比如读读功能. 如果已经拼好的队列里.读读允许运行. 那么就让读线程unpark() 下一个线程.
    写互斥,就和原来一样.
    2.那么插队问题,如何解决. 写的话,排队.比如现在正在执行读线程,读线程插队如何解决.

    源代码是如何实现的?
    针对已存在的队列,读线程会唤醒下一个读线程. 不允许太嚣张的读写插队. 非公平锁的实现方法差不多.

    从功能上讲,我们可以用普通的逻辑,完成代码.但是,在并发环境下,要保证,这个lock不出现线程安全问题,并且实现功能而且要精简.,确实有难度.

    }
    }

    ==>为了不持续消耗资源,可以让线程先"休息"类似于 wait notify的方式.
    ==> UNSAFE.park(); 依赖于信号量的 一种, 等待唤醒机制.(操作系统提供的一种原子性操作)
    ==>什么时候唤醒.
    ==>维护一个队列,lock维护一个队列,让所有竞争方休息,节省资源.当需要唤醒的时候唤醒.
    ==>Node->Node->Node->Node. 当然不能像Synchronize那样,全部唤醒,在一起竞争资源.
    前节点,唤醒后节点.

    
    
    
    
    
    **开始讲源码**  
    
    ##### 3.aqs是什么
    
    

    1.aqs是一个用来实现锁功能的一个框架. 他内部维护了一个fifo队列. 从功能上讲对外提供操作state的api,对内准备维护队列逻辑的api.
    ==>把占有锁,释放锁,抽象成一件事(等价于操作状态值,然后aqs还提供api),一次竞争事件,然后自己代码里还能合理利用这个方法.(锁就提供一次操作的api,其他任务交给aqs)

    2.难点,在于基本所有地方的代码都带竞争性. 所以,得很严谨,敲的时候,一定很严谨.

    翻阅过程中: 重点在,
    如何表达锁.
    在并发条件下,如何保证线程安全.
    使用锁的时候流程.
    哪里高效解决问题.

    当这些都能理解了之后,再去提会代码的编程设计.

    
    
    
    
    
    
    
    
    
    ##### 4.condition 锁,是如何实现的呢.
    
    从功能上讲. 他想要像synchronize里面的wait,notify 功能一样. 提供线程的等待唤醒功能.
    当然等待唤醒都需要依赖锁,而condition,就是aqs配套的 wait,notify功能.
    
    ==>从aqs设计出来的效果上,以及逻辑上讲是这样的
    从功能上讲,waitnotify需要在同一把锁下,并且获得,锁.使用condition一样.  而且这个condition 设计出来之后,要比synchronize的要nb, 那个只有一种wait,这个可以有多个condition.
    从调用方式上来看.和synchronize的基本一样.
    从逻辑上讲. 每次await,会退出主队列,在维护一个队列,notify的时候,在重新加入主队列继续竞争.
    
    ==>那么是怎么设计出来的呢.  如果要你设计一个wait notify功能,你如何设计.
    
      直接从功能上分析  
    
      wait方法的目的是     释放锁(出主队)   暂停线程==>park()?  等待唤醒
      notify的目的是       唤醒某个线程==>unpark()  继续加入竞争,就是要入主队.
    
      如果,有多个线程在同一个condition  wait? ==>维护同一个 waiter 队列,
    
      
    
    
    wait(){
    Release()
    park()
    
    }  
    
    notify(){
    unpark()
    addwaiter()==>竞争到锁,会执行.
    }
    
    ####  共享锁部分
    
     直接从功能出发,去完成.
    
    读读读写写写读读
    
    在已知,独占模式的功能设计之后,我们这个共享锁,需要设计什么功能呢.怎么实现呢?
    1.读读 写读  读写他们之间的逻辑关系.
    2.是否支持锁的升降级.
    如何实现.
    1.直接去分析,比如读读功能. 如果已经拼好的队列里.读读允许运行. 那么就让读线程unpark() 下一个线程.
    写互斥,就和原来一样.
    2.那么插队问题,如何解决. 写的话,排队.比如现在正在执行读线程,读线程插队如何解决.
    
    源代码是如何实现的?
    针对已存在的队列,读线程会唤醒下一个读线程. 不允许太嚣张的读写插队.  非公平锁的实现方法差不多.
    
    
    从功能上讲,我们可以用普通的逻辑,完成代码.但是,在并发环境下,要保证,这个lock不出现线程安全问题,并且实现功能而且要精简.,确实有难度.
    
    
    
    
    
    
    
    
    
    
    
    
    
    展开全文
  • JAVA 中AQS原理

    千次阅读 2020-12-22 20:52:53
    AQS 原理概览 AQS 对资源的共享方式 1)Exclusive(独占) 2)Share(共享) 前言: 在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试...
  • 原创--深入AQS原理

    2021-05-12 19:24:23
    【深入AQS原理】我画了35张图就是为了让你深入 AQS 前言 谈到并发,我们不得不说AQS(AbstractQueuedSynchronizer),所谓的AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock、...
  • java aqs原理解析,AQS

    2021-03-08 04:35:57
    实现原理是任何一个技术最重要的核心,对于java中aqs原理小伙伴们了解多少呢?本篇文章小编就为大家讲解讲解。AQS全称AbstractQueuedSynchronizer是java里各种锁实现的基础,提供了对资源也就是state字段的各种获取...
  • AQS原理探究

    2021-06-16 11:46:25
    AQS原理初步探讨 如果int(state)是0则代表没有资源占用锁,如果是1则有占用; node节点封装thread,实现锁的分配,通过CAS完成去State锁占用情况的修改 注意Thread是volatile修饰的 3. 从ReentrantLook解读AQS (1) ...
  • 8.2.1 AQS 原理 概述:全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架 特点: 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁...
  • 基于ReentrantLock深入分析AQS原理 此篇文章基于JDK8来分析的,在JDK9及以后的版本源码实现略有不同,不过思路是一样的,只是在JDK9中推出了新的类型 VarHandle 变量句柄,替代Unsafe的大部分功能。 Java中大部分...
  • 这篇文章我们来说下AQS。 1、AQS简介 AQS全称为AbstractQueuedSynchronizer,是ReentrantLock、Semaphore、CountDownLatch等并发工具的基础类。 AQS与Synchronized有一个区别是: Synchronized中所有阻塞的线程都...
  • AQS原理解析

    2021-02-28 14:59:33
    AQS这个类定义了一套多线程访问共享资源的同步器框架,AQS是整个JUC包的基石,JUC包内几乎所有线程间同步的组件都依赖于AQSAQS建议实现类定义为非public的内部类。因为AQS其实是一个抽象的同步器,一个同步框架,...
  • 一、AQS是什么 AQS英文全称是AbstractQueuedSynchronizer(抽象队列同步器),这个类在java.util.concurrent.locks包下。AQS 是一个用来构建锁和同步器的框架,独占式同步器如ReentrantLock,共享式同步器如...
  • 在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于 AQS 原理的理解”。下面给大家一个示例供大家参考,面试不是背题,大家一定要加入自己的思想,即使加入不了自己的思想也要保证自己能够通俗的讲...
  • AQS原理 学习笔记

    2021-05-22 20:18:48
    一、什么是AQS AQS即AbstractQueuedSynchronizer,是一个构建锁和同步器的框架。 AQS支持独占锁(exclusive)和共享锁(share)两种模式: 独占锁:只能被一个线程获取到,如...二、AQS内部的数据结构及原理 ...
  • 今天我们来探讨一下AQS相关的面试技巧。毕竟此类问题回答的好坏会在很大程度上影响我们面试的得分。 1. AQS在java中的使用 给代码加锁,是java中处理并发问题的重要手段。 java中的很多锁都是基于抽象类AQS...
  • AQS原理简述

    2021-08-22 15:10:43
    本文主要介绍AQS对共享资源状态的管理、实现可重入锁的原理和实现阻塞队列的原理。 资源状态: AQS维护了一个变量state,使用volatile修饰保证其可见性,目的是为了让多线程可以知道此资源当前的访问状态。 /** ...
  • 多线程底层实现原理AQS
  • java并发AQS原理

    2021-12-02 11:38:42
    3.1主要原理 3.2数据结构 3.3不同模式加锁过程 3.4 自定义同步器中需要实现的重要方法 4 ReentrantLock中的AQS分析 4.1加锁过程 4.1.1第一步:尝试获取锁(利用cas更改state值) 4.1.2第二步:线程...
  • AQS原理及用法

    2021-09-12 16:42:58
    AQS原理及用法 1 AQS简介 AQS全称为AbstractQueuedSynchronizer,是Java中的一个抽象类。 AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS之后,更多的协作工具类都可以方便得被写出来。 ...
  • AQS原理 ReetrantLock原理 ReentrantReadWriteLock原理 Condition接口原理
  • JUC 基石之 AQS 原理

    2021-08-12 15:29:08
    JUC 基石之 AQS 原理 文章目录JUC 基石之 AQS 原理一、简介,什么是 AQS二、相关基础知识1. synchronized 原理2. CLH 队列3. CAS 原理三、AQS 解析1. AQS 分层架构图2. AQS 核心思想3. AQS 数据结构4. AQS 源码分析...
  • 从ReentrantLock的实现看AQS原理及应用 前言 Java中的大部分同步类(Lock、Semaphore、ReentrantLock等)都是基于AbstractQueuedSynchronizer(简称为AQS)实现的。AQS是一种提供了原子式管理同步状态、阻塞和...
  • AQS原理初探

    2021-05-11 12:31:28
    AQS原理初探 AQS全称为AbstractQueuedSynchronizer,如果直接按名字翻译的话就是抽象队列式同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类的实现都依赖于它,如ReentrantLock(可重入锁)、...
  • AQS 原理概览AQS的核心思想为:如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 24,274
精华内容 9,709
关键字:

aqs原理

友情链接: 新建文本文档.zip