精华内容
下载资源
问答
  • java并发编程锁机制

    2019-12-14 12:55:23
    之前系列文章都在叙述java线程池的设计以及实现机制,没有涉及java并发编程的锁机制,这是因为锁机制与线程池是相对独立的内容,自成体系,可以把锁机制当做线程池的一个基础组件,想黑盒一样使用它。 可我们如何去...

    之前系列文章都在叙述java线程池的设计以及实现机制,没有涉及java并发编程的锁机制,这是因为锁机制与线程池是相对独立的内容,自成体系,可以把锁机制当做线程池的一个基础组件,想黑盒一样使用它。

    可我们如何去设计这样的一个黑盒,这样的一把锁?首先我们先了解清楚我们对锁的基本需求。

    程序需要用到锁,说明程序中有多个线程(进程)存在共同竞争的资源,这样的资源可以包括一个共享变量,共享文件等。就线程池来看,这样的资源就包括线程池的代表状态和工作线程数的ctl,工作队列,任务队列等,这些资源都被工作线程和调用线程所共享,为保证程序正常可控的运行,就需要一套机制对这些资源进行协调保护。

    这样的机制就是我们要讲的锁机制,在java得世界里,锁机制就包括语言级别的synchonized关键字提供的锁以及java并发包里提供的改造的CLH锁。

    先说synchonized的锁,原理是使用了monitor模式,内置一个条件队列,加锁和释放锁都提供指令级别的操作,而且做到自动化,无需手动操作,后期版本还加入锁的优化,提供轻量级锁,可重入锁,重量级锁的相互的升级与降级的转化机制,提高了并发的性能。

    而CLH锁则是并发包里提供的锁实现方式,通过自我编程方式实现的一把锁,这把锁实现的原理简单来讲就是一个state共享变量加一个同步队列,附带有多个条件队列,基于此,提供共享锁与独占锁的实现,公平锁与非公平锁的实现以及可重入锁的实现,后期其他锁像信号量,闭锁等都是扩展于state字段来满足不同的锁需求。而各种CLH锁实现依赖于原子操作CAS和基本同步器AQS.

    这一篇先大体理清楚锁机制的分类和基本功能实现,以下篇章则会详细介绍CLH锁的实现细节。

    展开全文
  • 该文章属于《Java并发编程》系列文章,如果想了解更多,请点击《Java并发编程之总目录》前言在上篇文章《Java并发编程之锁机制之Lock接口》中,我们已经了解了,Java下整个Lock接口下实现的锁机制是通过AQS(这里我们...

    该文章属于《Java并发编程》系列文章,如果想了解更多,请点击《Java并发编程之总目录》

    前言

    在上篇文章《Java并发编程之锁机制之Lock接口》中,我们已经了解了,Java下整个Lock接口下实现的锁机制是通过AQS(这里我们将AbstractQueuedSynchronizer 或AbstractQueuedLongSynchronizer统称为AQS)与Condition来实现的。那下面我们就来具体了解AQS的内部细节与实现原理。

    PS:该篇文章会以AbstractQueuedSynchronizer来进行讲解,对AbstractQueuedLongSynchronizer有兴趣的小伙伴,可以自行查看相关资料。

    AQS简介

    抽象队列同步器AbstractQueuedSynchronizer (以下都简称AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示同步状态,通过内置的FIFO(first-in-first-out)同步队列来控制获取共享资源的线程。

    该类被设计为大多数同步组件的基类,这些同步组件都依赖于单个原子值(int)来控制同步状态,子类必须要定义获取获取同步与释放状态的方法,在AQS中提供了三种方法getState()、setState(int newState)及compareAndSetState(int expect, int update)来进行操作。同时子类应该为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

    AQS类方法简介

    AQS的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

    修改同步状态方法

    在子类实现自定义同步组件的时候,需要通过AQS提供的以下三个方法,来获取与释放同步状态。

    int getState() :获取当前同步状态

    void setState(int newState) :设置当前同步状态

    boolean compareAndSetState(int expect, int update) 使用CAS设置当前状态。

    子类中可以重写的方法

    boolean isHeldExclusively():当前线程是否独占锁

    boolean tryAcquire(int arg):独占式尝试获取同步状态,通过CAS操作设置同步状态,如果成功返回true,反之返回false

    boolean tryRelease(int arg):独占式释放同步状态。

    int tryAcquireShared(int arg):共享式的获取同步状态,返回大于等于0的值,表示获取成功,反之失败。

    boolean tryReleaseShared(int arg):共享式释放同步状态。

    获取同步状态与释放同步状态方法

    当我们实现自定义同步组件时,将会调用AQS对外提供的方法同步状态与释放的方法,当然这些方法内部会调用其子类的模板方法。这里将对外提供的方法分为了两类,具体如下所示:

    独占式获取与释放同步状态

    void acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则返回,否则进入同步队列等待,该方法会调用tryAcquire(int arg)方法。

    void acquireInterruptibly(int arg):与 void acquire(int arg)基本逻辑相同,但是该方法响应中断,如果当前没有获取到同步状态,那么就会进入等待队列,如果当前线程被中断(Thread().interrupt()),那么该方法将会抛出InterruptedException。并返回

    boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly(int arg)的基础上,增加了超时限制,如果当前线程没有获取到同步状态,那么将返回fase,反之返回true。

    boolean release(int arg) :独占式的释放同步状态

    共享式获取与释放同步状态

    void acquireShared(int arg):共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。

    void acquireSharedInterruptibly(int arg):在acquireShared(int arg)的基本逻辑相同,增加了响应中断。

    boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly的基础上,增加了超时限制。

    boolean releaseShared(int arg) :共享式的释放同步状态

    AQS具体实现及内部原理

    在了解了AQS中的针对不同方式获取与释放同步状态(独占式与共享式)与修改同步状态的方法后,现在我们来了解AQS中具体的实现及其内部原理。

    AQS中FIFO队列

    在上文中我们提到AQS中主要通过一个FIFO(first-in-first-out)来控制线程的同步。那么在实际程序中,AQS会将获取同步状态的线程构造成一个Node节点,并将该节点加入到队列中。如果该线程获取同步状态失败会阻塞该线程,当同步状态释放时,会把头节点中的线程唤醒,使其尝试获取同步状态。

    Node节点结构

    下面我们就通过实际代码来了解Node节点中存储的信息。Node节点具体实现如下:

    static final class Node{

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

    }

    复制代码

    Node节点是AQS中的静态内部类,下面分别对其中的属性(注意其属性都用volatile 关键字进行修饰)进行介绍。

    int waitStatus:等待状态主要包含以下状态

    SIGNAL = -1:当前节点的线程如果释放了或取消了同步状态,将会将当前节点的状态标志位SINGAL,用于通知当前节点的下一节点,准备获取同步状态。

    CANCELLED = 1:被中断或获取同步状态超时的线程将会被置为当前状态,且该状态下的线程不会再阻塞。

    CONDITION = -2:当前节点在Condition中的等待队列上,(关于Condition会在下篇文章进行介绍),其他线程调用了Condition的singal()方法后,该节点会从等待队列转移到AQS的同步队列中,等待获取同步锁。

    PROPAGATE = -3:与共享式获取同步状态有关,该状态标识的节点对应线程处于可运行的状态。

    0:初始化状态。

    Node prev:当前节点在同步队列中的上一个节点。

    Node next:当前节点在同步队列中的下一个节点。

    Thread thread:当前转换为Node节点的线程。

    Node nextWaiter:当前节点在Condition中等待队列上的下一个节点,(关于Condition会在下篇文章进行介绍)。

    AQS同步队列具体实现结构

    通过上文的描述我们大概了解了Node节点中存储的数据与信息,现在我们来看看整个AQS下同步队列的结构。具体如下图所示:dd587c801828dd5b0ef5e5195c3b9130.png

    在AQS中的同步队列中,分别有两个指针(你也可以叫做对象的引用),一个head指针指向队列中的头节点,一个tail指针指向队列中的尾节点。

    AQS添加尾节点

    当一个线程成功获取了同步状态(或者锁),其他线程无法获取到同步状态,这个时候会将该线程构造成Node节点,并加入到同步队列中,而这个加入队列的过程必须要确保线程安全,所以在AQS中提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Nodeupdate),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。具体过程如下图所示:

    09c406f79dd07e9dfac7152948feb8d1.png

    上图中,虚线部分为之前tail指向的节点。

    AQS添加头节点

    在AQS中的同步队列中,头节点是获取同步状态成功的节点,头节点的线程会在释放同步状态时,将会唤醒其下一个节点,而下一个节点会在获取同步状态成功时将自己设置为头节点,具体过程如下图所示:d531d587ee39763fd548decb19b15ac7.png

    上图中,虚线部分为之前head指向的节点。因为设置头节点是获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要CAS来进行保证,只需要将原头节点的next指向断开就行了。

    现在我们已经了解了AQS中同步队列的头节点与尾节点的设置过程。现在我们根据实际代码进行分析,因为涉及到不同状态对同步状态的获取(独占式与共享式),所以下面会分别对这两种状态进行讲解。

    独占式同步状态获取与释放

    独占式同步状态获取

    通过acquire(int arg)方法我们可以获取到同步状态,但是需要注意的是该方法并不会响应线程的中断与获取同步状态的超时机制。同时即使当前线程已经中断了,通过该方法放入的同步队列的Node节点(该线程构造的Node),也不会从同步队列中移除。具体代码如下所示:

    public final void acquire(int arg){

    if (!tryAcquire(arg) &&

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    selfInterrupt();

    }

    复制代码

    在该方法中,主要通过子类重写的方法tryAcquire(arg)来获取同步状态,如果获取同步状态失败,则会将请求线程构造独占式Node节点(Node.EXCLUSIVE),同时将该线程加入同步队列的尾部(因为AQS中的队列是FIFO类型)。接着我们查看addWaiter(Node mode)方法具体细节:

    private Node addWaiter(Node mode){

    Node node = new Node(Thread.currentThread(), mode);//将该线程构造成Node节点

    Node pred = tail;

    if (pred != null) {//尝试将尾指针 tail 指向当前线程构造的Node节点

    node.prev = pred;

    if (compareAndSetTail(pred, node)) {

    //如果成功,那么将尾指针之前指向的节点的next指向 当前线程构造的Node节点

    pred.next = node;

    return node;

    }

    }

    enq(node);//如果当前尾指针为null,则调用enq(final Node node)方法

    return node;

    }

    复制代码

    在该方法中,主要分为两个步骤:

    如果当前尾指针(tail)不为null,那么尝试将尾指针 tail 指向当前线程构造的Node节点,如果成功,那么将尾指针之前指向的节点的next指向当前线程构造的Node节点,并返回当前节点。

    反之调用enq(final Node node)方法,将当前线程构造的节点加入同步队列中。

    接下来我们继续查看enq(final Node node)方法。

    private Node enq(final Node node){

    for (;;) {

    Node t = tail;

    if (t == null) {//如果当前尾指针为null,那么尝试将头指针 head指向当前线程构造的Node节点

    if (compareAndSetHead(new Node()))

    tail = head;

    } else {//如果当前尾指针(tail)不为null,那么尝试将尾指针 tail 指向当前线程构造的Node节点

    node.prev = t;

    if (compareAndSetTail(t, node)) {

    t.next = node;

    return t;

    }

    }

    }

    }

    复制代码

    在enq(final Node node)方法中,通过死循环(你也可以叫做自旋)的方式来保证节点的正确的添加。接下来,我们继续查看acquireQueued(final Node node, int arg)方法的处理。该方法才是整个多线程竞争同步状态的关键,大家一定要注意看!!!

    final boolean acquireQueued(final Node node, int arg){

    boolean failed = true;

    try {

    boolean interrupted = false;

    for (;;) {

    final Node p = node.predecessor();//获取该节点的上一节点

    //如果上一节点是head锁指向的节点,且该节点获取同步状态成功

    if (p == head && tryAcquire(arg)) {

    //设置head指向该节点,

    setHead(node);

    p.next = null; // 将上一节点的next指向断开

    failed = false;

    return interrupted;

    }

    //判断获取同步状态失败的线程是否需要阻塞

    if (shouldParkAfterFailedAcquire(p, node) &&

    parkAndCheckInterrupt())//阻塞并判断当前线程是否已经中断了

    interrupted = true;

    }

    } finally {

    if (failed)

    //如果线程中断了,那么就将该线程从同步队列中移除,同时唤醒下一节点

    cancelAcquire(node);

    }

    }

    复制代码

    在该方法中主要分为三个步骤:

    通过死循环(你也可以叫做自旋)的方式来获取同步状态,如果当前节点的上一节点是head指向的节点且该节点获取同步状态成功,那么会设置head指向该节点 ,同时将上一节点的next指向断开。

    a415301cb8a185eabcd66394a16a5292.png

    015d31a448db2f07986e493672c9f7b9.png

    如果当前节点的上一节点不是head指向的节点,或者获取当前节点同步状态失败,那么会先调用shouldParkAfterFailedAcquire(Node pred, Node node)方法来判断是需要否阻塞当前线程,如果该方法返回true,则调用parkAndCheckInterrupt()方法来阻塞线程。如果该方法返回false,那么该方法内部会把当前节点的上一节点的状态修改为Node.SINGAL。

    在finally语句块中,判断当前线程是否已经中断。如果中断,则通过那么cancelAcquire(Node node)方法将该线程(对应的Node节点)从同步队列中移除,同时唤醒下一节点。

    下面我们接着来看shouldParkAfterFailedAcquire(Node pred, Node node)方法,看看具体的阻塞具体逻辑,代码如下所示:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){

    int ws = pred.waitStatus;

    if (ws == Node.SIGNAL)

    //上一节点已经设置状态请求释放信号,因此当前节点可以安全地阻塞

    return true;

    if (ws > 0) {

    //上一节点,已经被中断或者超时,那么接跳过所有状态为Node.CANCELLED

    do {

    node.prev = pred = pred.prev;

    } while (pred.waitStatus > 0);

    pred.next = node;

    } else {

    //其他状态,则调用cas操作设置状态为Node.SINGAL

    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

    }

    return false;

    }

    复制代码

    在该方法中会获取上一节点的状态(waitStatus),然后进行下面的三个步骤的判断。

    如果上一节点状态为Node.SIGNAL,那么会阻塞接下来的线程(函数 return true)。

    如果上一节点的状态大于0(从上文描述的waitStatus所有状态中,我们可以得知只有Node.CANCELLED大于0)那么会跳过整个同步列表中所有状态为Node.CANCELLED的Node节点。(函数 return false)。

    如果上一节点是其他状态,则调用CAS操作设置其状态为Node.SINGAL。(函数 return false)。

    阻塞实现

    当shouldParkAfterFailedAcquire(Node pred, Node node)方法返回true时,接着会调用parkAndCheckInterrupt()方法来阻塞当前线程。该方法的返回值为当前线程是否中断。

    private final boolean parkAndCheckInterrupt(){

    LockSupport.park(this);

    return Thread.interrupted();

    }

    复制代码

    在该方法中,主要阻塞线程的方法是通过LockSupport(在后面的文章中会具体介绍)的park来阻塞当前线程。

    取消状态获取,同时唤醒下一节点

    通过对独占式获取同步状态的理解,我们知道 acquireQueued(final Node node, int arg)方法中最终会执行finally语句块中的代码,来判断当前线程是否已经中断。如果中断,则通过 cancelAcquire(Node node) 方法将取消该线程的状态获取并唤醒下一个线程节点。那么接下来我们来看看该方法的具体实现。具体代码如下:

    private void cancelAcquire(Node node){

    //如果当前节点已经不存在直接返回

    if (node == null)

    return;

    //(1)将该节点对应的线程置为null

    node.thread = null;

    //(2)跳过当前节点之前已经取消的节点

    Node pred = node.prev;

    while (pred.waitStatus > 0)

    node.prev = pred = pred.prev;

    //获取在(2)操作之后,节点的下一个节点

    Node predNext = pred.next;

    //(3)将当前中断的线程对应节点状态设置为CANCELLED

    node.waitStatus = Node.CANCELLED;

    //(4)如果当前中断的节点是尾节点,那么则将尾节点重新指向

    if (node == tail && compareAndSetTail(node, pred)) {

    compareAndSetNext(pred, predNext, null);

    } else {

    //(5)如果中断的节点的上一个节点的状态,为SINGAL或者即将为SINGAL,

    //那么将该当前中断节点移除

    int ws;

    if (pred != head &&

    ((ws = pred.waitStatus) == Node.SIGNAL ||

    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

    pred.thread != null) {

    Node next = node.next;

    if (next != null && next.waitStatus <= 0)

    compareAndSetNext(pred, predNext, next);

    } else {

    unparkSuccessor(node);//(6)将该节点移除,同时唤醒下一个节点

    }

    node.next = node; // help GC

    }

    }

    复制代码

    观察上诉代码,我们可以知道该方法干了以下这几件事

    (1)将中断线程对应的节点对应的线程置为null

    (2)跳过当前节点之前已经取消的节点(我们已经知道在Node.waitStatus的枚举中,只有CANCELLED 大于0 )bc1e4895dfb522645b9a7277592a96a2.png

    (3)将当前中断的线程对应节点状态设置为CANCELLED

    (4)在(2)的前提下,如果当前中断的节点是尾节点,那么通过CAS操作将尾节点指向(2)操作后的的节点。

    de0fc69d7f0b711bc74ce02c02027154.png

    (5)如果当前中断节点不是尾节点,且当前中断的节点的上一个节点的状态,为SINGAL或者即将为SINGAL,那么将该当前中断节点移除。

    (6)如果(5)条件不满足,那么会调用unparkSuccessor(Node node) 方法唤醒下一个节点。具体代码如下:

    private void unparkSuccessor(Node node){

    //重置该节点为初始状态

    int ws = node.waitStatus;

    if (ws < 0)

    compareAndSetWaitStatus(node, ws, 0);

    //获取中断节点的下一节点

    Node s = node.next;

    //判断下一节点的状态,如果为Node.CANCELED状态

    if (s == null || s.waitStatus > 0) {

    s = null;

    //则通过尾节点向前遍历,获取最近的waitStatus<=0的节点

    for (Node t = tail; t != null && t != node; t = t.prev)

    if (t.waitStatus <= 0)

    s = t;

    }

    //如果该节点不会null,则唤醒该节点中的线程。

    if (s != null)

    LockSupport.unpark(s.thread);

    }

    复制代码

    这里为了方便大家理解,我还是将图补充了出来,(图片有可能不是很清晰,建议大家点击浏览大图),3aca44853e4502a1a295a2f8578b67ea.png

    整体来说,unparkSuccessor(Node node)方法主要是获取中断节点后的可用节点(Node.waitStatus<=0),然后将该节点对应的线程唤醒。

    独占式同步状态释放

    当线程获取同步状态成功并执行相应逻辑后,需要释放同步状态,使得后继线程节点能够继续获取同步状态,通过调用 AQS 的 relase(int arg)方法,可以释放同步状态。具体代码如下:

    public final boolean release(int arg){

    if (tryRelease(arg)) {

    Node h = head;

    if (h != null && h.waitStatus != 0)

    unparkSuccessor(h);

    return true;

    }

    return false;

    }

    复制代码

    在该方法中,会调用模板方法tryRelease(int arg),也就是说同步状态的释放逻辑,是需要用户来自己定义的。当tryRelease(int arg)方法返回true后,如果当前头节点不为null且头节点waitStatus!=0,接着会调用unparkSuccessor(Node node)方法来唤醒下一节点(使其尝试获取同步状态)。关于unparkSuccessor(Node node)方法,上文已经分析过了,这里就不再进行描述了。

    共享式同步状态获取与释放

    共享式获取与独占式获取最主要的区别在于同一时刻是否能有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于文件的写操作均会被阻塞。而其他读操作能够同时进行。如果对文件进行写操作,那么这一时刻其他的读写操作都会被阻塞,写操作要求对资源的独占式访问,而读操作可以是共享访问的。

    共享式同步状态获取

    在了解了共享式同步状态获取与独占式获取同步状态的区别后,现在我们来看一看共享式获取的相关方法。在AQS中通过 acquireShared(int arg)方法来实现的。具体代码如下:

    public final void acquireShared(int arg){

    if (tryAcquireShared(arg) < 0)

    doAcquireShared(arg);

    }

    复制代码

    在该方法内部会调用模板方法tryAcquireShared(int arg),同独占式获取获取同步同步状态一样,也是需要用户自定义的。当tryAcquireShared(int arg)方法返回值小于0时,表示没有获取到同步状态,则调用doAcquireShared(int arg)方法获取同步状态。反之,已经获取同步状态成功,则不进行任何的操作。关于doAcquireShared(int arg)方法具体实现如下所示:

    private void doAcquireShared(int arg){

    //(1)添加共享式节点在AQS中FIFO队列中

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

    boolean interrupted = false;

    //(2)自旋获取同步状态

    for (;;) {

    final Node p = node.predecessor();

    if (p == head) {

    int r = tryAcquireShared(arg);

    if (r >= 0) {

    //当获取同步状态成功后,设置head指针

    setHeadAndPropagate(node, r);

    p.next = null; // help GC

    if (interrupted)

    selfInterrupt();

    failed = false;

    return;

    }

    }

    //(3)判断线程是否需要阻塞

    if (shouldParkAfterFailedAcquire(p, node) &&

    parkAndCheckInterrupt())

    interrupted = true;

    }

    } finally {

    //(4)如果线程已经中断,则唤醒下一节点

    if (failed)

    cancelAcquire(node);

    }

    }

    复制代码

    整体来看,共享式获取的逻辑与独占式获取的逻辑几乎一样,还是以下几个步骤:

    (1)添加共享式节点在AQS中FIFO队列中,这里需要注意节点的构造为 addWaiter(Node.SHARED),其中 Node.SHARED为Node类中的静态常量(static final Node SHARED = new Node()),且通过addWaiter(Node.SHARED)方法构造的节点状态为初始状态,也就是waitStatus= 0。

    (2)自旋获取同步状态,如果当前节点的上一节点为head节点,其获取同步状态成功,那么将调用setHeadAndPropagate(node, r);,重新设置head指向当前节点。同时重新设置该节点状态waitStutas = Node.PROPAGATE(共享状态),然后直接退出doAcquireShared(int arg)方法。具体情况如下图所示:

    5656597f9762942a6a6890ecb9651de8.png

    (3)如果不满足条件(2),那么会判断当前节点的上一节点不是head指向的节点,或者获取当前节点同步状态失败,那么会先调用shouldParkAfterFailedAcquire(Node pred, Node node)方法来判断是需要否阻塞当前线程,如果该方法返回true,则调用parkAndCheckInterrupt()方法来阻塞线程。如果该方法返回false,那么该方法内部会把当前节点的上一节点的状态修改为Node.SINGAL。具体情况如下图所示:

    8a76f55a39e5084b4ec6341790602bbc.png

    (4)如果线程已经中断,则唤醒下一节点

    前面我们提到了,共享式与独占式获取同步状态的主要不同在于其设置head指针的方式不同,下面我们就来看看共享式设置head指针的方法setHeadAndPropagate(Node node, int propagate)。具体代码如下:

    private void setHeadAndPropagate(Node node, int propagate){

    //(1)设置head 指针,指向该节点

    Node h = head; // Record old head for check below

    setHead(node);

    //(2)判断是否执行doReleaseShared();

    if (propagate > 0 || h == null || h.waitStatus < 0 ||

    (h = head) == null || h.waitStatus < 0) {

    Node s = node.next;

    //如果当前节点的下一节点是共享式获取同步状态节点,则调用doReleaseShared()方法

    if (s == null || s.isShared())

    doReleaseShared();

    }

    }

    复制代码

    在setHeadAndPropagate(Node node, int propagate)方法中有两个参数。

    第一个参数node是当前共享式获取同步状态的线程节点。

    第二个参数propagate(中文意思,繁殖、传播)是共享式获取同步状态线程节点的个数。

    其主要逻辑步骤分为以下两个步骤:

    (1)设置head 指针,指向该节点。从中我们可以看出在共享式获取中,Head节点总是指向最进获取成功的线程节点!!!

    (2)判断是否执行doReleaseShared(),从代码中我们可以得出,主要通过该条件if (s == null || s.isShared()),其中 s为当前节点的下一节点(也就是说同一时刻有可能会有多个线程同时访问)。当该条件为true时,会调用doReleaseShared()方法。关于怎么判断下一节点是否是否共享式线程节点,具体逻辑如下:

    //在共享式访问中,当前节点为SHARED类型

    final Node node = addWaiter(Node.SHARED);

    //在调用addWaiter 内部会调用Node构造方法,其中会将nextWaiter设置为Node.SHARED。

    Node(Thread thread, Node mode) { // Used by addWaiter

    this.nextWaiter = mode;

    this.thread = thread;

    }

    //SHARED为Node类静态类

    final boolean isShared(){

    return nextWaiter == SHARED;

    }

    复制代码

    下面我们继续查看doReleaseShared()方法的具体实现,具体代码如下所示:

    private void doReleaseShared(){

    for (;;) {

    Node h = head;

    if (h != null && h != tail) {

    int ws = h.waitStatus;

    if (ws == Node.SIGNAL) {

    //(1)从上图中,我们可以得知在共享式的同步队列中,如果存在堵塞节点,

    //那么head所指向的节点状态肯定为Node.SINGAL,

    //通过CAS操作将head所指向的节点状态设置为初始状态,如果成功就唤醒head下一个阻塞的线程

    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

    continue; // loop to recheck cases

    unparkSuccessor(h);//唤醒下一节点线程,上文分析过该方法,这里就不在讲了

    }

    //(2)表示该节点线程已经获取共享状态成功,则通过CAS操作将该线程节点状态设置为Node.PROPAGATE

    //从上图中,我们可以得知在共享式的同步队列中,

    else if (ws == 0 &&

    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

    continue; // loop on failed CAS

    }

    if (h == head) //如果head指针发生改变一直循环,否则跳出循环

    break;

    }

    }

    复制代码

    从代码中我们可以看出该方法主要分为两个步骤:

    (1)从上图中,我们可以得知在共享式的同步队列中,如果存在堵塞节点,那么head所指向的节点状态肯定为Node.SINGAL,通过CAS操作将head所指向的节点状态设置为初始状态,如果成功就唤醒head下一个阻塞的线程节点,反之继续循环。

    (2)如果(1)条件不满足,那么说明该节点已经获取成功的获取同步状态,那么通过CAS操作将该线程节点的状态设置为waitStatus = Node.PROPAGATE,如果CAS操作失败,就一直循环。

    共享式同步状态释放

    当线程获取同步状态成功并执行相应逻辑后,需要释放同步状态,使得后继线程节点能够继续获取同步状态,通过调用AQS的releaseShared(int arg)方法,可以释放同步状态。具体代码如下:

    public final boolean releaseShared(int arg){

    if (tryReleaseShared(arg)) {

    doReleaseShared();

    return true;

    }

    return false;

    }

    复制代码

    独占式与共享式超时获取同步状态

    因为独占式与共享式超时获取同步状态,与其本身的非超时获取同步状态逻辑几乎一样。所以下面就以独占式超时获取同步状态的相应逻辑进行讲解。

    在独占式超时获取同步状态中,会调用tryAcquireNanos(int arg, long nanosTimeout)方法,其中具体nanosTimeout参数为你传入的超时时间(单位纳秒),具体代码如下所示:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)

    throws InterruptedException{

    if (Thread.interrupted())

    throw new InterruptedException();

    return tryAcquire(arg) ||

    doAcquireNanos(arg, nanosTimeout);

    }

    复制代码

    观察代码,我们可以得知如果当前线程已经中断,会直接抛出InterruptedException,如果当前线程能够获取同步状态( 调用tryAcquire(arg)),那么就会直接返回,如果当前线程获取同步状态失败,则调用doAcquireNanos(int arg, long nanosTimeout)方法来超时获取同步状态。那下面我们接着来看该方法具体代码实现,代码如下图所示:

    private boolean doAcquireNanos(int arg, long nanosTimeout)

    throws InterruptedException{

    if (nanosTimeout <= 0L)

    return false;

    //(1)计算超时等待的结束时间

    final long deadline = System.nanoTime() + nanosTimeout;

    final Node node = addWaiter(Node.EXCLUSIVE);

    boolean failed = true;

    try {

    for (;;) {

    final Node p = node.predecessor();

    //(2)如果获取同步状态成功,直接返回

    if (p == head && tryAcquire(arg)) {

    setHead(node);

    p.next = null; // help GC

    failed = false;

    return true;

    }

    //如果获取同步状态失败,计算的剩下的时间

    nanosTimeout = deadline - System.nanoTime();

    //(3)如果超时直接退出

    if (nanosTimeout <= 0L)

    return false;

    //(4)如果没有超时,且nanosTimeout大于spinForTimeoutThreshold(1000纳秒)时,

    //则让线程等待nanosTimeout (剩下的时间,单位:纳秒。)

    if (shouldParkAfterFailedAcquire(p, node) &&

    nanosTimeout > spinForTimeoutThreshold)

    LockSupport.parkNanos(this, nanosTimeout);

    //(5)如果当前线程被中断,直接抛出异常

    if (Thread.interrupted())

    throw new InterruptedException();

    }

    } finally {

    if (failed)

    cancelAcquire(node);

    }

    }

    复制代码

    整个方法为以下几个步骤:

    (1)在线程获取同步状态之前,先计算出超时等待的结束时间。(单位精确到纳秒)

    (2)通过自旋操作获取同步状态,如果成功,则直接返回

    (3)如果获取同步失败,则计算剩下的时间。如果已经超时了就直接退出。

    (4)如果没有超时,则判断当前剩余时间nanosTimeout是否大于spinForTimeoutThreshold(1000纳秒),如果大于,则通过 LockSupport.parkNanos(this, nanosTimeout)方法让线程等待相应时间。(该方法会在根据传入的nanosTimeout时间,等待相应时间后返回。),如果nanosTimeout小于等于spinForTimeoutThreshold时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让nanosTimeout的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,线程会进入无条件的快速自旋。

    (5)在没有走(4)步骤的情况下,表示当前线程已经被中断了,则直接抛出InterruptedException。

    最后

    到现在我们基本了解了整个AQS的内部结构与其独占式与共享式获取同步状态的实现,但是其中涉及到的线程的阻塞、等待、唤醒(与LockSupport工具类相关)相关知识点我们都没有具体介绍,后续的文章会对LockSupport工具以及后期关于锁相关的等待/通知模式相关的Condition接口进行介绍。希望大家继续保持着学习的动力~~。

    总结

    整个AQS是基于其内部的FIFO队列实现同步控制。请求的线程会封装为Node节点。

    AQS分为整体分为独占式与共享式获取同步状态。其支持线程的中断,与超时获取。

    展开全文
  • Java并发锁机制

    2020-12-14 19:52:03
    目录一、Lock接口1.1 什么是锁?1.2 Lock对比synchronized关键字及常用API1.3 Lock接口源码二、队列同步器2.1 什么是...锁机制Java并发的核心之一,凡是谈及多并发多线程问题,锁机制是在所难免的,本文将对Java


    本文参考Java并发编程艺术
    锁机制是Java多并发的核心之一,凡是谈及多并发多线程问题,锁机制是在所难免的,本文将对Java并发包中与锁相关的API和组件,以及这些API和组件的使用方式和实现细节。

    一、Lock接口

    1.1 什么是锁?

    锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。
    锁的一般使用形式如下:

    Lock lock = new ReentrantLock();
    lock.lock();
    try {
    //....
    } finally {
    	lock.unlock();
    }
    

    在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放,在try外抛出异常,则停止执行,不会执行finally块。

    1.2 Lock对比synchronized关键字及常用API

    Lock接口提供的synchronized关键字不具备的主要特性如下图:
    在这里插入图片描述

    Lock是一个接口,它定义了锁获取和释放的基本操作,Lock的API如下图:
    在这里插入图片描述

    1.3 Lock接口源码

    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
    

    从源码可以看出,Lock接口仅仅是给出了一些锁获取判断操作的基本方法。

    二、队列同步器

    Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程访问控制的。

    2.1 什么是队列同步器

    队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。
    同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。

    2.2 队列同步器的接口与示例

    同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
    重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态:
    · getState():获取当前同步状态。
    · setState(int newState):设置当前同步状态。
    · compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
    同步器可重写的方法与描述如下图所示:
    在这里插入图片描述
    实现自定义同步组件时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述如下所示:
    在这里插入图片描述
    同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。
    只有掌握了同步器的工作原理才能更加深入地理解并发包中其他的并发组件,所以下面通过一个独占锁的示例来深入了解一下同步器的工作原理。
    顾名思义,独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁,如代码如下所示:

    class Mutex implements Lock {
    	// 静态内部类,自定义同步器
    	private static class Sync extends AbstractQueuedSynchronizer {
    		// 是否处于占用状态
    		protected boolean isHeldExclusively() {
    			return getState() == 1;
    		}
    		// 当状态为0的时候获取锁
    		public boolean tryAcquire(int acquires) {
    			if (compareAndSetState(0, 1)) {
    				setExclusiveOwnerThread(Thread.currentThread());
    				return true;	
    			}
    			return false;
    		}
    		// 释放锁,将状态设置为0
    		protected boolean tryRelease(int releases) {
    			if (getState() == 0) 
    				throw new IllegalMonitorStateException();
    			setExclusiveOwnerThread(null);
    			setState(0);
    			return true;
    		}
    		// 返回一个Condition,每个condition都包含了一个condition队列
    		Condition newCondition() { return new ConditionObject(); }
    	}
    	// 仅需要将操作代理到Sync上即可
    	private final Sync sync = new Sync();
    	public void lock() { 
    		sync.acquire(1); 
    	}
    	public boolean tryLock() { 
    		return sync.tryAcquire(1); 
    	}
    	public void unlock() { 
    		sync.release(1); 
    	}
    	public Condition newCondition() { 
    		return sync.newCondition(); 
    	}
    	public boolean isLocked() { 
    		return sync.isHeldExclusively(); 
    	}
    	public boolean hasQueuedThreads() { 
    		return sync.hasQueuedThreads(); 
    	}
    	public void lockInterruptibly() throws InterruptedException {
    		sync.acquireInterruptibly(1);
    	}
    	public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    		return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    	}
    }
    

    Mutex是一个自定义的同步组件,其内部自定义了一个静态内部类Sync ,该内部类继承了同步器,重写了AQS类的方法,在重写的方法中,调用getState()、setState(int state)、compareAndSetState(int expect, int update)来访问或者更新同步状态,此外Mutex类中其他方法,通过静态内部类Sync属性变量来调用AQS中的模板方法,实现Mutex自定义同步组件的相关操作。

    三、重入锁

    3.1 什么是重入锁

    重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。
    什么意思呢?简单理解就是一个线程对已获得的锁的情况下,还可以继续对该锁进行获取,典型的例子就是synchronized关键字,我们考虑这种情况,如果synchronized用来修饰方法,而该方法是递归方法,那么每次递归则都会对资源进行synchronized隐式加锁,但是在进入递归之前,前一级方法并没有结束,也就是前一层递归没有释放synchronized锁,子级递归就又加锁了,这并不会阻塞当前线程,这就是可重入锁。
    如果不是重入锁,在没有释放锁的同时还获取锁,就会阻塞线程;比如前面的代码例子,当一个线程调用Mutex的lock()方法获取锁之后,如果再次调用lock()方法,则该线程将会被自己所阻塞,原因是Mutex在实现tryAcquire(int acquires)方法时没有考虑占有锁的线程再次获取锁的场景,而在调用tryAcquire(int acquires)方法时返回了false,导致该线程被阻塞。简单地说,Mutex是一个不支持重进入的锁。
    另外,锁的公平性, 就是根据时间顺序分配锁,先请求者先获得,那么这个锁就是公平的,反之则是不公平的。

    3.2 实现重进入

    重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。

    1. 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
    2. 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
      我们以ReentrantLock锁为例分析锁的重入性。ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例,获取同步状态的代码如下所示(获取过程)。
    final boolean nonfairTryAcquire(int acquires) {
    	// 当前线程获取
    	final Thread current = Thread.currentThread();
    	// 获取锁状态
    	int c = getState();
    	// 如果没有线程占用锁
    	if (c == 0) {
    		if (compareAndSetState(0, acquires)) {
    			setExclusiveOwnerThread(current);
    			return true;
    		}
    	}
    	// 如果锁已经被占用,判断是否是当前线程占用 
    	else if (current == getExclusiveOwnerThread()) {
    		int nextc = c + acquires;
    		if (nextc < 0)
    			throw new Error("Maximum lock count exceeded");
    		setState(nextc);
    		return true;
    	}
    	return false;
    }
    

    该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态功。
    释放过程代码如下:

    protected final boolean tryRelease(int releases) {
    	// 获取释放计数器数值
    	int c = getState() - releases;
    	if (Thread.currentThread() != getExclusiveOwnerThread())
    		throw new IllegalMonitorStateException();
    	boolean free = false;
    	// 计数器计算后发现为0,则最后一次释放锁,其他线程可以获取该锁
    	if (c == 0) {
    		free = true;
    		setExclusiveOwnerThread(null);
    	}
    	setState(c);
    	return free;
    }
    

    如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为时,将占有线程设置为null,并返回true,表示释放成功。
    总之,锁的冲进入,就是根据一个计数器的值来进行判断该锁的加锁/释锁的次数,并且多次加锁/释锁的前提条件是判断是否为同一线程,在此基础上,实现锁的重入。

    四、读写锁

    4.1 什么是读写锁

    之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。也就是说 当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了。
    Java并发包提供读写锁的实现是ReentrantReadWriteLock,它提供的特性如下:
    在这里插入图片描述

    4.2 读写锁的接口与示例

    首先看看读写锁ReadWriteLock接口的源码:

    public interface ReadWriteLock {
        /**
         * Returns the lock used for reading.
         *
         * @return the lock used for reading
         */
        Lock readLock();
    
        /**
         * Returns the lock used for writing.
         *
         * @return the lock used for writing
         */
        Lock writeLock();
    }
    

    从源码中,我们可以看出ReadWriteLock仅定义了获取读锁和写锁的两个方法,即readLock()方法和writeLock()方法,比较简单,我们一般使用的是它的一个实现类ReentrantReadWriteLock。
    我们来看看ReentrantReadWriteLock内部工作状态的一些方法:
    在这里插入图片描述
    对读写锁有一些基本的了解以后,现在我们通过一个缓存示例说明读写锁的使用方式:

    public class Cache {
    	static Map<String, Object> map = new HashMap<String, Object>();
    	static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    	// 读/写锁的获取
    	static Lock r = rwl.readLock();
    	static Lock w = rwl.writeLock();
    	// 获取一个key对应的value
    	public static final Object get(String key) {
    		r.lock();
    		try {
    			return map.get(key);
    		} finally {
    			r.unlock();
    		}
    	}
    	// 设置key对应的value,并返回旧的value
    	public static final Object put(String key, Object value) {
    		w.lock();
    		try {
    			return map.put(key, value);
    		} finally {
    			w.unlock();
    		}
    	}
    	// 清空所有的内容
    	public static final void clear() {
    		w.lock();
    		try {
    			map.clear();
    		} finally {
    			w.unlock();
    		}
    	}
    }
    

    上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。在读操作get(String key)方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。写操作put(String key,Object value)方法和clear()方法,在更HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续。Cache使用读写锁提升读操作的并发性,也保证每次写操作对所有的读写操作的可见性,同时简化了编程方式。
    这里简单说下笔者在学习这一段遇到的一个小疑惑:在我们的印象中,锁一般都是互斥的,具有排他性,读写锁也不例外(笔者认为写锁具有排他性,因此读写锁也具有),现在分别考虑读锁和写锁,如果多线程读锁可以同步进行,那么为什么还需要读锁呢?是不是就可以不需要读锁,直接并发获取读操作?反正不修改变量不会导致一致性问题,经过一番思考,笔者认为,这个读书实际上是为写锁的排他性服务的,试想,如果当前有线程在进行读操作,然后有一个线程此时要进行写操作也就是获取写锁,那么我们就要阻塞其他正在读的线程或者写的线程,我们是根据什么来判断正在读的线程是哪些呢?如果没有读锁,我们就很难判断,但是有读锁,相当于给读线程打上了标记,我们就可以知道这些线程是哪些了,然后阻塞相应的线程即可,实现写锁的互斥。

    4.3 锁降级

    锁降级指的是写锁降级成为读锁(写锁—>读锁,反之不是)。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程(简而言之就是写锁获取和释放中间插入一个读锁获取的过程)。
    接下来我们看一段代码:

    public void processData() {
    	readLock.lock();
    	if (!update) {
    		// 必须先释放读锁
    		readLock.unlock();
    		// 锁降级从获取到写锁开始
    		writeLock.lock();
    		try {
    			if (!update) {
    				// 准备数据的流程(略)
    				update = true;
    			}	
    			readLock.lock();
    		} finally {
    			writeLock.unlock();
    		}
    		// 锁降级完成,写锁降级为读锁
    	}
    	try {
    		// 使用数据的流程(略)
    	} finally {
    		readLock.unlock();
    	}
    }
    

    五、Condition接口

    5.1 什么是Condition

    任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。通过Condition我们可以唤醒特定的线程对象。
    在这里插入图片描述

    5.2 代码举例

    我们编写代码,让三个线程A、B、C按照A->B->C->A的方式轮流执行:

    public class MutilaTion {
        public static void main(String[] args) throws InterruptedException {
            Data data = new Data();
            new Thread(()->{
                for (int i = 0; i < 5; i++) {
                    data.outA();
                }
            }
            ).start();
            new Thread(()->{
                for (int i = 0; i < 5; i++) {
                    data.outB();
                }
            }
            ).start();
            new Thread(()->{
                for (int i = 0; i < 5; i++) {
                data.outC();
            	}
            }
            ).start();
        }
    }
    class Data{
        private int flag = 1;
        private Lock lock = new ReentrantLock();
        private Condition conditionA = lock.newCondition();
        private Condition conditionB = lock.newCondition();
        private Condition conditionC = lock.newCondition();
    
    
        public void outA(){
            lock.lock();
            try {
                while (flag != 1){
                    try {
                        conditionA.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                flag = 2;
                System.out.println("线程A执行===========》》》》》AAAAA");
                conditionB.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void outB(){
            lock.lock();
            try {
                while (flag != 2){
                    try {
                        conditionB.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                flag = 3;
                conditionC.signal();
                System.out.println("线程B执行===========》》》》》BBBBB");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void outC(){
            lock.lock();
            try {
                while (flag != 3){
                    try {
                        conditionC.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                flag = 1;
                conditionA.signal();
                System.out.println("线程C执行===========》》》》》CCCCC");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
    

    输出结果:
    在这里插入图片描述

    展开全文
  • 第一部分:synchronized和volatile锁机制用来保护对象一致性以及操作原子性,是实现线程安全重要手段。线程安全涉及到对象两个重要状态:共享性和可变性。如果对象是不可变、线程私有那么它一定是线程...

    第一部分:synchronized和volatile

    锁机制用来保护对象的一致性以及操作的原子性,是实现线程安全的重要手段。线程安全涉及到对象两个重要的状态:共享性和可变性。如果对象是不可变的、线程私有的那么它一定是线程安全的。所以说,只有在共享的、可变的对象上面进行操作时才需要加锁,以保障线程安全。volatile和synchronized是java 5.0之前最早协调对象共享的机制。下面我们将分别介绍他们:

    synchronized

    synchronized用来形容方法或者代码块,是java提供的最早的锁机制,支持重入锁。关于synchronized的详细解析文章有很多,这里列举几个注意事项:

    第一,使用synchronized关键字时一定要尽可能的缩小范围,尽可能的在方法块里需要锁的地方使用,而不是直接用来修饰整个方法。这需要我们对方法里面的操作进行分析,哪些需要加锁,哪些不需要加锁,只在需要锁的地方加锁,这样即可以提高程序的效率,同时开放调用方法也减少了线程安全的隐患。

    第二,synchronized提供的锁机制是粗粒度的,当有线程访问当前对象的synchronized方法或代码块时,其他线程只能等待当前操作结束才可以访问。这听上去似乎存在一定的性能问题,但java 6.0以后synchronized在并发环境下性能得到了大幅提升,因此建议尽可能的使用synchronized,除非synchronized满足不了业务需求。而且synchronized使用时无需释放锁,而且JVM还提供了专门的优化支持,因此即使synchronized是古老的锁,但是它依然适用于绝大多数场景。

    volatile

    volatile用来修饰变量保证其可见性。可见性是一种复杂的属性,volatile变量不会被缓存在寄存器或者其他处理器不可见的地方,在读取volatile变量时返回的一定是最新写入的值。volatile不是线程安全的,他不能替代锁,它只在特定的场景下使用,使用时要非常小心。以下场景可以使用volatile:

    第一,对变量的写入不依懒于变量当前的值,或者你能确保只有单个线程更新变量的值;

    第二,该变量不会与其它状态变量一起纳入不变性条件中;

    第三,在访问变量时不需要加锁。

    第二部分:ReentrantLock

    ReentrantLock是一个可重入的互斥锁,继承自Lock接口。如果说synchronized是隐式锁的话,那么ReentrentLock就是显式锁。锁的申请、使用、释放都必须显式的申明。Lock接口提供了以下方法:

    public interface Lock{

    void lock();

    void lockInterruptibly() throws InterruptedException;

    boolean tryLock();

    boolean tryLock(long time,TimeUnit unit) throws InterruptedException;

    void unLock();

    Condition newCondition();

    }

    ReentrantLock实现了Lock接口,并提供了与synchronized相同的互斥性和内存可见性。那既然如此,为什么还要创建一个类似的锁机制呢?内置锁虽然好用,但是缺乏一些灵活性,而ReentrantLock则可以弥补这些不足。

    第一,轮询锁与定时锁。tryLock方法实现了可定时的与可轮询的锁实现。与synchronized相比它有更完善的错误恢复机制。内置锁中死锁是一类严重的错误,只能重启程序。而ReentrantLock可以使用可定时或者轮询的锁,它会释放已获得的锁,然后再尝试获得所有的锁。在实现具有时间限制的操作时,定时锁也非常也用。如果操作在给定时间内不能给出结果那么就会使程序提前结束。

    第二,可中断锁获取操作。lockInterruptibly方法能够在获得锁的同时保持对中断的响应。而且由于它包含在Lock中,因此无需创建其他类型的不可中断阻塞机制。

    第三部分:ReadWriteLock

    ReentrantReadWriteLock实现了一种标准的互斥读写锁,继承自ReadWriteLock。ReadWriteLock接口包含以下方法:

    public interface ReadWriteLock{

    Lock readLock();

    Lock WriteLock();

    }

    ReentrantReadWriteLock我们可以这样理解:当执行读操作的时候可以多个线程并发访问;当执行写操作的时候,只可以同时被一个线程访问。所以它使用的场景是读操作多而写操作少的并发场景。此外,ReentrantReadWriteLock还可以设置是否为公平锁,是公平锁的话则可以按照排队的顺序获取锁,非公平锁的话则是随机获得。

    第四部分:锁的公平性

    上一节讲到了ReentrantReadWriteLock实现了公平锁和非公平锁两种模式。在公平锁模式下线程按照请求的顺序来获得锁,而非公平模式下则可以插队。我们的期望是所有的锁都是公平的,毕竟插队是一种不好的行为。但实际上非公平锁比公平锁有着更高的并发效率。假设线程A持有一个锁,并且线程B也请求这个锁,由于该锁被线程A占有,所以B线程挂起,当A使用结束时释放锁,此时唤醒B,B需要重新申请获得锁。如果同时线程C也请求这个锁,并且C很可能在B获得锁之前已经获得、使用并释放了锁。这样就实现了双赢,B线程获得的锁没有延迟同时线程C也得到了执行,这提高了程序的吞吐率。

    总结:与内置锁相比,显式锁提供了一些扩展功能,在处理锁的不可用方面有着更高的灵活性,并且对队列有着更好的控制。但是显式锁无法替换synchronized,只有在synchronized无法满足需求时才会使用它。读写锁允许多个读线程并发的访问被保护对象,当访问以读取操作为主的数据结构时,能提高程序的伸缩性。

    展开全文
  • 本文着重介绍了在java并发中常见几种锁机制。1.偏向锁偏向锁是JDK1.6提出来一种锁优化机制。其核心思想是,如果程序没有竞争,则取消之前已经取得锁线程同步操作。也就是说,若某一锁被线程获取后,便进入...
  • Java并发机制锁的实现原理

    万次阅读 多人点赞 2016-07-18 20:04:21
    Java并发机制实现原理
  • synchronized 和 volatile,是最基础两个!volatile是轻量级,它在多核处理器开发中保证了共享变量可见性。...java语言规范第3版中对volatile定义如下:volatile会添加一条lock# 前缀...
  • 同步基本思想为了保证共享数据在同一时刻只被一个线程使用,我们有一种很简单实现思想,就是在共享数据里保存一个,当没有线程访问时,是空。当有第一个线程访问时,就在里保存这个线程标识并允许这个...
  • 在上篇文章《Java并发编程之锁机制之Lock接口》中,我们已经了解了,Java下整个Lock接口下实现的锁机制是通过AQS(这里我们将AbstractQueuedSynchronizer 或AbstractQueuedLongSynchronizer统称为AQS)与Condition来...
  • java并发中,加锁可以保证线程安全性,解决竞态条件问题。 竞态条件 某个计算正确性取决于多个线程交替执行时序。 常见类型:1. 先检查后执行(包括延时初始化)  2. 读取 - 修改 - 写入 操作 几种...
  • 主要为大家详细介绍了Java并发编程之显式锁机制的相关资料,具有一定参考价值,感兴趣小伙伴们可以参考一下
  • synchronized 和 volatile,是最基础两个!  volatile是轻量级,它在多核处理器开发中保证了共享变量可见性。即当一个线程修改一个共享变量时,...java语言规范第3版中对volatile定义如下:  volati...
  • Java并发编程:Concurrent锁机制解析前面,我们讲了Java自带的对象锁机制。因为我们的方法必然是在一个对象中的,所以,通过对象的锁,可以很好的控制对方法的调用。当对象的锁被一个线程持有后,其他线程想要调用该...
  • Java并发编程之锁机制之引导篇 该文章属于《Java并发编程》系列文章,如果想了解更多,请点击《Java并发编程之总目录》 前言 在前面文章中。我们了解了volatile、了解了synchronized。现在我们来了解一下Java...
  • 最近在忙公司项目,现在终于有时间来写博客啦~开心开心 前言 通过前面文章,我们已经了解了AQS(AbstractQueuedSynchronizer)内部实现与基本原理。现在我们来了解一下,Java中为...Java并发编程之锁机制之AQ...
  • 锁机制用来保护对象一致性以及操作原子性,是实现线程安全重要手段。线程安全涉及到对象两个重要状态:共享性和可变性。如果对象是不可变、线程私有那么它一定是线程安全。所以说,只有在共享、可变...
  • 一、资源和加锁1、场景描述多线程并发访问同一个资源问题,假如线程A获取变量之后修改变量值,线程C在此时也获取变量值并且修改,两个线程同时并发处理一个变量,就会导致并发问题。这种并行处理数据库情况在实际...
  • 在上篇文章《Java并发编程之锁机制之引导篇》中,我们大致了解了Lock接口(以及相关实现类)在并发编程重要作用。接下来我们就来具体了解Lock接口中声明方法以及使用优势。 Lock简介 Lock 接口实现类提供了比使用 ...
  • 1.偏向偏向是JDK1.6提出来一种优化的机制。其核心思想是,如果程序没有竞争,则取消之前已经取得锁的线程同步操作。也就是说,若某一被线程获取后,便进入偏向模式,当线程再次请求这个时,就无需再...
  • 爆炸.png ...在上篇文章 《Java并发编程之锁机制之Lock接口》中,我们已经了解了,Java下整个Lock接口下实现的锁机制是通过AQS(这里我们将AbstractQueuedSynchronizer 或AbstractQueuedLong...
  • 在前面的文章中,我曾提到过,整个Lock接口下实现的锁机制中AQS(AbstractQueuedSynchronizer,下文都称之为AQS)与Condition才是真正的实现者。也就说Condition在整个同步组件的基础框架中也起着非常重要的作用,既然...
  • 越来越多的互联网企业面临着用户量膨胀而带来的并发安全问题。接下来通过本文给大家介绍Java线程并发中常见的锁机制,感兴趣的朋友一起看看吧
  • 从这次开始接触Java1.5推出并发包中东东,先看一下jdk中并发包:接下来咱们则会集中对这些并发包中核心进行深入了解,不光要学会怎么用这些并发包中类,而且还得知道这些功能背后运行原理, 所以手踏...
  • 对于我们开发网站,如果网站访问量非常大话,那么我们就需要考虑相关的并发访问问题了。而并发问题是绝大部分程序员头疼问题,但话又说回来了,既然逃避不掉,那我们就坦然面对吧~今天就让我们一起来研究...
  • Redis并发锁1、 单线程redis为什么需要分布式虽然一个redis是单进程单线程模式,但请求并不是一定按先后顺序处理,多个请求会被redis交叉着执行,(就像单个cpu,在一个时间点只能执行一个命令,为什么多个线程...
  • 在上篇文章《Java并发编程之锁机制之AQS(AbstractQueuedSynchronizer)》中我们了解了整个AQS内部结构,与其独占式与共享式获取同步状态实现。但是并没有详细描述线程是如何进行阻塞与唤醒。我也提到了...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,831
精华内容 1,532
关键字:

java并发的锁机制

java 订阅