精华内容
下载资源
问答
  • golang sync.Mutex互斥锁的实现原理

    千次阅读 2020-05-23 14:18:30
    golang sync.Mutex互斥锁的实现原理数据结构与状态机Lock(1)正常模式(2) 饥饿模式Unlock sync.Mutex是一个不可重入的排他锁。 这点和Java不同,golang里面的排它锁是不可重入的。 当一个 goroutine 获得了这个锁的...

    sync.Mutex是一个不可重入的排他锁。 这点和Java不同,golang里面的排它锁是不可重入的。

    当一个 goroutine 获得了这个锁的拥有权后, 其它请求锁的 goroutine 就会阻塞在 Lock 方法的调用上,直到锁被释放。

    数据结构与状态机

    sync.Mutex 由两个字段 state 和 sema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

    type Mutex struct {
    	state int32
    	sema  uint32
    }
    

    需要强调的是Mutex一旦使用之后,一定不要做copy操作。

    Mutex的状态机比较复杂,使用一个int32来表示:

    const (
    	mutexLocked = 1 << iota // mutex is locked
    	mutexWoken  //2
    	mutexStarving //4
    	mutexWaiterShift = iota //3
    )
                                                                                                 
    32                                               3             2             1             0 
     |                                               |             |             |             | 
     |                                               |             |             |             | 
     v-----------------------------------------------v-------------v-------------v-------------+ 
     |                                               |             |             |             v 
     |                 waitersCount                  |mutexStarving| mutexWoken  | mutexLocked | 
     |                                               |             |             |             | 
     +-----------------------------------------------+-------------+-------------+-------------+                                                                                                              
    

    最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放:

    在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态:

    • mutexLocked — 表示互斥锁的锁定状态;
    • mutexWoken — 表示从正常模式被从唤醒;
    • mutexStarving — 当前的互斥锁进入饥饿状态;
    • waitersCount — 当前互斥锁上等待的 goroutine 个数;

    为了保证锁的公平性,设计上互斥锁有两种状态:正常状态和饥饿状态。

    正常模式下,所有等待锁的goroutine按照FIFO顺序等待。唤醒的goroutine不会直接拥有锁,而是会和新请求锁的goroutine竞争锁的拥有。新请求锁的goroutine具有优势:它正在CPU上执行,而且可能有好几个,所以刚刚唤醒的goroutine有很大可能在锁竞争中失败。在这种情况下,这个被唤醒的goroutine会加入到等待队列的前面。 如果一个等待的goroutine超过1ms没有获取锁,那么它将会把锁转变为饥饿模式

    饥饿模式下,锁的所有权将从unlock的gorutine直接交给交给等待队列中的第一个。新来的goroutine将不会尝试去获得锁,即使锁看起来是unlock状态, 也不会去尝试自旋操作,而是放在等待队列的尾部。

    如果一个等待的goroutine获取了锁,并且满足一以下其中的任何一个条件:(1)它是队列中的最后一个;(2)它等待的时候小于1ms。它会将锁的状态转换为正常状态。

    正常状态有很好的性能表现,饥饿模式也是非常重要的,因为它能阻止尾部延迟的现象。

    Lock

    func (m *Mutex) Lock() {
    	// 如果mutex的state没有被锁,也没有等待/唤醒的goroutine, 锁处于正常状态,那么获得锁,返回.
        // 比如锁第一次被goroutine请求时,就是这种状态。或者锁处于空闲的时候,也是这种状态。
    	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    		return
    	}
    	// Slow path (outlined so that the fast path can be inlined)
    	m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
    	// 标记本goroutine的等待时间
    	var waitStartTime int64
    	// 本goroutine是否已经处于饥饿状态
    	starving := false
    	// 本goroutine是否已唤醒
    	awoke := false
    	// 自旋次数
    	iter := 0
    	old := m.state
    	for {
    		// 第一个条件:1.mutex已经被锁了;2.不处于饥饿模式(如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。)
    		// 尝试自旋的条件:参考runtime_canSpin函数
    		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    			// 进入这里肯定是普通模式
    			// 自旋的过程中如果发现state还没有设置woken标识,则设置它的woken标识, 并标记自己为被唤醒。
    			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    				awoke = true
    			}
    			runtime_doSpin()
    			iter++
    			old = m.state
    			continue
    		}
    		
    		// 到了这一步, state的状态可能是:
            // 1. 锁还没有被释放,锁处于正常状态
            // 2. 锁还没有被释放, 锁处于饥饿状态
            // 3. 锁已经被释放, 锁处于正常状态
            // 4. 锁已经被释放, 锁处于饥饿状态
            // 并且本gorutine的 awoke可能是true, 也可能是false (其它goutine已经设置了state的woken标识)
            
    		// new 复制 state的当前状态, 用来设置新的状态
            // old 是锁当前的状态
    		new := old
    		
    		// 如果old state状态不是饥饿状态, new state 设置锁, 尝试通过CAS获取锁,
            // 如果old state状态是饥饿状态, 则不设置new state的锁,因为饥饿状态下锁直接转给等待队列的第一个.
    		if old&mutexStarving == 0 {
    			new |= mutexLocked
    		}
    		// 将等待队列的等待者的数量加1
    		if old&(mutexLocked|mutexStarving) != 0 {
    			new += 1 << mutexWaiterShift
    		}
    		
    		// 如果当前goroutine已经处于饥饿状态, 并且old state的已被加锁,
            // 将new state的状态标记为饥饿状态, 将锁转变为饥饿状态.
    		if starving && old&mutexLocked != 0 {
    			new |= mutexStarving
    		}
    		
     		// 如果本goroutine已经设置为唤醒状态, 需要清除new state的唤醒标记, 因为本goroutine要么获得了锁,要么进入休眠,
            // 总之state的新状态不再是woken状态.
    		if awoke {
    			// The goroutine has been woken from sleep,
    			// so we need to reset the flag in either case.
    			if new&mutexWoken == 0 {
    				throw("sync: inconsistent mutex state")
    			}
    			new &^= mutexWoken
    		}
    
    		// 通过CAS设置new state值.
            // 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
    		if atomic.CompareAndSwapInt32(&m.state, old, new) {
    			
    			// 如果old state的状态是未被锁状态,并且锁不处于饥饿状态,
                // 那么当前goroutine已经获取了锁的拥有权,返回
    			if old&(mutexLocked|mutexStarving) == 0 {
    				break // locked the mutex with CAS
    			}
    			// If we were already waiting before, queue at the front of the queue.
    			// 设置并计算本goroutine的等待时间
    			queueLifo := waitStartTime != 0
    			if waitStartTime == 0 {
    				waitStartTime = runtime_nanotime()
    			}
    			// 既然未能获取到锁, 那么就使用sleep原语阻塞本goroutine
                // 如果是新来的goroutine,queueLifo=false, 加入到等待队列的尾部,耐心等待
                // 如果是唤醒的goroutine, queueLifo=true, 加入到等待队列的头部
    			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
    
    			// sleep之后,此goroutine被唤醒
                // 计算当前goroutine是否已经处于饥饿状态.
    			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    			// 得到当前的锁状态
    			old = m.state
    
    			// 如果当前的state已经是饥饿状态
                // 那么锁应该处于Unlock状态,那么应该是锁被直接交给了本goroutine
    			if old&mutexStarving != 0 {
    				// If this goroutine was woken and mutex is in starvation mode,
    				// ownership was handed off to us but mutex is in somewhat
    				// inconsistent state: mutexLocked is not set and we are still
    				// accounted as waiter. Fix that.
    				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
    					throw("sync: inconsistent mutex state")
    				}
    				// 当前goroutine用来设置锁,并将等待的goroutine数减1.
    				delta := int32(mutexLocked - 1<<mutexWaiterShift)
    				// 如果本goroutine是最后一个等待者,或者它并不处于饥饿状态,
                    // 那么我们需要把锁的state状态设置为正常模式.
    				if !starving || old>>mutexWaiterShift == 1 {
    					 // 退出饥饿模式
    					delta -= mutexStarving
    				}
    				// 设置新state, 因为已经获得了锁,退出、返回
    				atomic.AddInt32(&m.state, delta)
    				break
    			}
    			awoke = true
    			iter = 0
    		} else {
    			old = m.state
    		}
    	}
    }
    

    整个过程比较复杂,这里总结一下一些重点:

    1. 如果锁处于初始状态(unlock, 正常模式),则通过CAS(0 -> Locked)获取锁;如果获取失败,那么就进入slowLock的流程:

    slowLock的获取锁流程有两种模式: 饥饿模式 和 正常模式。

    (1)正常模式

    1. mutex已经被locked了,处于正常模式下;
    2. 前 Goroutine 为了获取该锁进入自旋的次数小于四次;
    3. 当前机器CPU核数大于1;
    4. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空;

    满足上面四个条件的goroutine才可以做自旋。自旋就会调用sync.runtime_doSpin 和 runtime.procyield 并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间。

    处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态new。几个不同的条件分别会更新 state 字段中存储的不同信息 — mutexLocked、mutexStarving、mutexWoken 和 mutexWaiterShift:

    计算最新的new之后,CAS更新,如果更新成功且old状态是未被锁状态,并且锁不处于饥饿状态,就代表当前goroutine竞争成功并获取到了锁返回。(这也就是当前goroutine在正常模式下竞争时更容易获得锁的原因)

    如果当前goroutine竞争失败,会调用 sync.runtime_SemacquireMutex 使用信号量保证资源不会被两个 Goroutine 获取。sync.runtime_SemacquireMutex 会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,sync.Mutex.Lock 方法的剩余代码也会继续执行。

    (2) 饥饿模式

    饥饿模式本身是为了一定程度保证公平性而设计的模式。所以饥饿模式不会有自旋的操作,新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。

    不管是正常模式还是饥饿模式,获取信号量,它就会从阻塞中立刻返回,并执行剩下代码:

    1. 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环;
    2. 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出;

    Unlock

    func (m *Mutex) Unlock() {
    	// Fast path: drop lock bit.
    	new := atomic.AddInt32(&m.state, -mutexLocked)
    	if new != 0 {
    		// Outlined slow path to allow inlining the fast path.
    		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
    		m.unlockSlow(new)
    	}
    }
    
    func (m *Mutex) unlockSlow(new int32) {
    	if (new+mutexLocked)&mutexLocked == 0 {
    		throw("sync: unlock of unlocked mutex")
    	}
    	if new&mutexStarving == 0 {
    		old := new
    		for {
    			// If there are no waiters or a goroutine has already
    			// been woken or grabbed the lock, no need to wake anyone.
    			// In starvation mode ownership is directly handed off from unlocking
    			// goroutine to the next waiter. We are not part of this chain,
    			// since we did not observe mutexStarving when we unlocked the mutex above.
    			// So get off the way.
    			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    				return
    			}
    			// Grab the right to wake someone.
    			new = (old - 1<<mutexWaiterShift) | mutexWoken
    			if atomic.CompareAndSwapInt32(&m.state, old, new) {
    				runtime_Semrelease(&m.sema, false, 1)
    				return
    			}
    			old = m.state
    		}
    	} else {
    		// Starving mode: handoff mutex ownership to the next waiter, and yield
    		// our time slice so that the next waiter can start to run immediately.
    		// Note: mutexLocked is not set, the waiter will set it after wakeup.
    		// But mutex is still considered locked if mutexStarving is set,
    		// so new coming goroutines won't acquire it.
    		runtime_Semrelease(&m.sema, true, 1)
    	}
    }
    

    互斥锁的解锁过程 sync.Mutex.Unlock 与加锁过程相比就很简单,该过程会先使用 AddInt32 函数快速解锁,这时会发生下面的两种情况:

    1. 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁;
    2. 如果该函数返回的新状态不等于 0,这段代码会调用 sync.Mutex.unlockSlow 方法开始慢速解锁:

    sync.Mutex.unlockSlow 方法首先会校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了就会直接抛出异常 sync: unlock of unlocked mutex 中止当前程序。

    在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁:

    • 在正常模式下,这段代码会分别处理以下两种情况处理;
    1. 如果互斥锁不存在等待者或者互斥锁的 mutexLocked、mutexStarving、mutexWoken 状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者;
    2. 如果互斥锁存在等待者,会通过 sync.runtime_Semrelease 唤醒等待者并移交锁的所有权;
    • 在饥饿模式下,上述代码会直接调用 sync.runtime_Semrelease 方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态;
    展开全文
  • 0 介绍 对多线程编程有过了解的朋友...理解锁的实现,首先要理解Cpu的原子操作。现代的cpu都提供一些特殊的原子操作,他们执行的时候将不能被中断,如CAS(compare and swap)、Test-And-Set。这两种操作均可以实现锁,

    0 介绍

    对多线程编程有过了解的朋友一定知道锁的概念,它的作用是为了保证临界区的代码在多线程下能够正常工作,也就是说,锁将保证共享资源在任意时刻只能有一个使用者。
    过去博主我也只是知道锁是如何使用的,对其内在原理并不了解。今天看看了看清华大学陈渝老师的网课,对锁的实现有了一定认识,特此记录。

    1 原子操作

    理解锁的实现,首先要理解Cpu的原子操作。现代的cpu都提供一些特殊的原子操作,他们执行的时候将不能被中断,如CAS(compare and swap)、Test-And-Set。这两种操作均可以实现锁,下面讲讲Test-And-Set是如何实现的。
    这个操作并不复杂,对于target,如果它是false,将其修改为true,但返回的值为rv;如果是true,什么也没有发生。这个简单的操作是原子的,下面看看如何由它来实现锁。

    boolbean TestAndSet(boolbead *target)
    {
    	boolbead *rv = *target;
    	*target = true;
    	return *rv;
    }
    

    3 自旋锁

    所谓自旋锁,百科定义如下:

    自选锁是为实现保护共享资源而提出一种锁机制。自旋锁不会引起调用者睡眠,如果自旋锁已经被别的执行单元保持,调用者就一直循环在那里看是否该自旋锁的保持者已经释放了锁,"自旋"一词就是因此而得名。

    通俗来说,如果当前锁被占用,调用者将阻塞在原地,这是与互斥锁不同的地方。对于锁来说,我们一般考虑它的上锁(lock)与解锁(unlock)操作。如下面的c语言伪代码所示,当需要在临界区前上锁时,调用锁的Acquire方法即可。如果当前已经上锁了,value为1,则后来的调用者将阻塞在while处;如果未上锁,将不会阻塞在while,同时test-and-set会将value置为1,防止未解锁前后来的调用者执行临界区代码。解锁只需要将value置为0即可。

    class Lock
    {
    int value = 0;
    
    void Acquire()
    {
    	while(test-and-set(&value))
    		;
    }
    
    void Release()
    {
    	value = 0;	
    }
    
    };
    

    自旋锁也有一些缺点,首先它是一个忙等待锁,在while处阻塞的时候会消耗cpu资源直到阻塞结束。第二个,自旋锁的使用可能会造成死锁。比如一个低优先级的进程在占用临界区资源时,来了一个高优先级进程夺走了cpu,然后高优先级进程也在acquire锁,但是锁却在低优先级进程手里,这时死锁就发生了。

    4 互斥锁

    互斥锁与自旋锁不同的地方在于,当调用者发现锁已经被使用的时候,会使当前进程睡眠,加入到等待队列,然后使用调度机制(schedule函数)执行其他进程,而不是原地等待。解锁时拿出队头睡眠的进程唤醒即可。其余和自旋锁一致。

    class Lock
    {
    int value = 0;
    WaitQueue q;
    
    void Acquire()
    {
    	while(test-and-set(&value))
    	{
    		add this TCB to the waitqueue q;
    		schedule();
    	}
    }
    
    void Release()
    {
    	value = 0;	
    	remove thread t from q;
    	wake(t);
    }
    
    };
    
    展开全文
  • 加锁过程很重要,请耐心从上往下,跟着我提示看完,会有不一样理解。当然也需要深思 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node也就是整个QAS核心也就是列表节点。 static final class ...

    一.Node

    不了解AQS的请先看这里
    加锁过程很重要,请耐心从上往下,跟着我的提示看完,会有不一样的理解。当然也需要深思

    java.util.concurrent.locks.AbstractQueuedSynchronizer.Node也就是整个QAS的核心也就是列表的节点。

    static final class Node {
            static final Node SHARED = new Node();
            
            static final Node EXCLUSIVE = null;
    
            static final int CANCELLED =  1;
            
            static final int SIGNAL    = -1;
            
            static final int CONDITION = -2;
    
            static final int PROPAGATE = -3;
    
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
    
            /**
             * Returns true if node is waiting in shared mode.
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    上面也就是Node的源代码,熟悉数据结构的同学应该很容易理解大部分的属性,比如prev(前驱节点)、next(后继节点),这里也就不带着大家一起看了。
    – Thread:保存线程的引用。

    – nextWaiter:下一个阻塞队列,Condition.await()阻塞的队列。

    – waitStatus:状态值,也就是控制线程阻塞的核心属性。

    说明
    1(CANCELLED) 也就是当前的线程被中断(也可以说取消),就是线程无效
    -1(SIGNAL) 节点的后继节点被挂起,也就是说有别的线程等待当前的线程释放锁,请看下面源码回头看会更清楚
    -2(CONDITION) 通过Condition.await()阻塞的线程,当线程被释放时,状态变为-1,进入阻塞的队列,争抢锁的资源
    0 初始化的默认值
    -3(PROPAGATE) 共享的这里不涉及

    二.通过源码查看上面属性值的作用

    请大家打开源码阅读下面文章,我不能把所有的代码都说一遍,只说核心的东西,所以大家需要源代码跟上思路

    1.互斥锁的加锁实现(ReentrantLock-NonfairSync(非公平))

    NonfairSync也就是非公平队列,是AbstractQueuedSynchronizer的实现类,而AbstractQueuedSynchronizer中封装了Node内部类。就像是电脑的CPU是核心,但是需要显示器键盘操作系统等,才可以让他发挥作用。另外,jdk团队使用模版设计模式。

    code1.1
    final void lock() {
    	// 开始默认值为0,如果CAS成功设置为1,说明开始无线程占用,一个线程成功占用。
    	if (compareAndSetState(0, 1))
    		// 设置进入的线程为当前线程。
    		setExclusiveOwnerThread(Thread.currentThread());
    	else
    		// 已经被锁的逻辑,向下查看code1.2
    		acquire(1);
    	}
    
    code1.2
        public final void acquire(int arg) {
        	// tryAcquire:代码直接调用nonfairTryAcquire(),查看code1.3
            if (!tryAcquire(arg) &&
            	// 下面方法的逻辑就是把线程放入等待队列。
            	// code1.4---addWaiter()
            	// code1.5---acquireQueued()
            	// 查看code1.4后查看code1.5
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    code1.3

    该方法是实现重入锁的核心代码。查看完毕,返回code1.2

    该代码如果是同一个线程进入,state状态值加1。

    final boolean nonfairTryAcquire(int acquires) {
    			// 获取当前线程
                final Thread current = Thread.currentThread();
                // 获取状态值
                int c = getState();
                // 如果为0,说明,之前的线程退出代码块,可以尝试进入
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
       			// 不为0情况,并且当前线程就是已经拿到锁的线程。
                else if (current == getExclusiveOwnerThread()) {
                	// 状态值加1,标记线程进入的次数。
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                // 如果线程为退出,而且请求的线程不是进入的线程。
                return false;
            }
    
    code1.4

    当前的节点封装为node,并放入队列中。此方法的代码作为code1.5的参数。查看完毕,请看code1.5
    就算是阻塞的第一个节点,也有一个前驱节点,也就是head,代表已经进入代码块的线程

        private Node addWaiter(Node mode) {
        	// 声明一个node
            Node node = new Node(Thread.currentThread(), mode);
            // 拿到尾节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                // 通过cas设置成尾节点
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            /*查看code1.6*/
            enq(node);
            return node;
        }
    
    code1.5

    查看完毕

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                	// 获取前驱节点,通过code1.6的操作,前驱节点不可能为空,请大家自己查看node.predecessor().
                    final Node p = node.predecessor();
                    /*code1.4的解释已经提到,head代表当前的线程,因为p==head所以那么说明不存在前面的节点了,调用这个方法之前,这个要放入等待队列的线程p已经是可以进入加锁的代码快了,tryAcquire尝试进入代码,如果失败,说明这个线程已经执行结束,或者取消,返回false,这里回头看一下code1.3的selfInterrupt方法再回来。*/
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    /*shouldParkAfterFailedAcquire方法把当前线程节点的前驱节点waitstatus值设置为-1,再结合每一个等待线程都有一个前驱节点,更加证明了head节点代表当前正在执行的线程,这里需要结合后边的unlock解锁方法,大家才能懂,因为解锁释放线程的时候是释放head的后继节点*/
                    // 同时证明了-1的值是说后面有线程等待当前线程的结束
                    if (shouldParkAfterFailedAcquire(p, node) &&
                    	// 前面设置成功后,lockSupport阻塞当前线程。。
                    	//到这里加锁完毕,恭喜你。
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                	// 已经取消的线程状态设置为1,但jdk团队考虑的非常周到,大家自行阅读,我有点累了
                    cancelAcquire(node);
            }
        }
    
    code1.6

    查看完毕,返回code1.4
    队列为空的时候(该线程是第一个等待的线程),或者cas把新的节点设置为尾节点失败的时候进入该方法,该方法作用是当tail为空的时候初始化队列的头尾节点,并把当前节点设置为尾节点。

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                /*第一个线程进入,所以通过cas初始化尾节点和头节点是一个默认的node,也就是waitStatus的值为0,为死循环所以只有成功才能走到下面的t!=null的代码块,也就是必须成功。*/
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                	// 设置新的node为为节点,之前尾节点设置为新node的前驱节点。
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    2.互斥锁的解锁实现(ReentrantLock-NonfairSync(非公平))

    开始就是调用一个方法sync.release()。

    public void unlock() {
            sync.release(1);
        }
    
    code2.1
        public final boolean release(int arg) {
        	// 解锁,返回true证明可以让别的线程进入,false表示重入了多个,只解锁了部分。查看code1.2
            if (tryRelease(arg)) {
            	// 如果已经完成释放,可以让等待队列中的线程进入,结合之前所讲的,head表示当前线程,值为-1则表示有线程等待。
                Node h = head;
                if (h != null && h.waitStatus != 0)
                	// 释放等待的线程 查看code2.3
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    code2.2

    解锁过程,每次给state减1,当state为0,则全部退出,可以让别的线程进入,查看完毕,返回code2.1

           protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    code2.3
       private void unparkSuccessor(Node node) {
    
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    		// 释放的是后面的节点
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
            	// 如果后面的节点已经取消,从后面找一个状态为阻塞的释放
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
            	// 释放选中的线程
                LockSupport.unpark(s.thread);
        }
    

    总结

    这里介绍了等待队列,还有jdk实现的Condition可以完成挂起和释放的队列,有兴趣的大家自行查看。

    展开全文
  • 本文参考——... ... 引言 互斥锁大都会使用,但是要了解其原理就要花费一番功夫了。尽管我们说互斥锁是用来保护一个临界区,实际上保护是临界区中被操纵数据。 互斥锁还是分为三类:快速互斥锁...

    本文参考——http://www.bitscn.com/os/linux/201608/725217.html

    http://blog.csdn.net/jianchaolv/article/details/7544316

    引言

    互斥锁大都会使用,但是要了解其原理就要花费一番功夫了。尽管我们说互斥锁是用来保护一个临界区,实际上保护的是临界区中被操纵的数据。

    互斥锁还是分为三类:快速互斥锁/递归互斥锁/检测互斥锁

    futex

    要想了解互斥锁的内部实现,先来了解一下futex(fast Userspace mutexes)的作用。

    内核态和用户态的混合机制。

    还没有futex的时候,内核是如何维护同步与互斥的呢?系统内核维护一个对象,这个对象对所有进程可见,这个对象是用来管理互斥锁并且通知阻塞的进程。如果进程A要进入临界区,先去内核查看这个对象,有没有别的进程在占用这个临界区,出临界区的时候,也去内核查看这个对象,有没有别的进程在等待进入临界区。

    互斥锁

    1、互斥锁的结构?

    在futex的基础上用的内存共享变量来实现的。

    2、不能锁住的时候,是如何进入休眠,又如何等待被唤醒的呢?

    进入锁的时候就会区检查那个共享变量,如果不能获取锁,就会通过futex系统调用进入休眠。如果有人释放锁,就会通过futex来唤醒。

    3、互斥锁的属性?

    指定锁的适用范围。

    4、经典案例——生产者消费者

    转载于:https://www.cnblogs.com/sylz/p/6030201.html

    展开全文
  •   互斥锁是并发程序中对共享资源进行访问控制主要手段,对此Go语言提供了非常简单易用Mutex,Mutex为一结构体类型,对外暴露两个方法Lock()和Unlock()分别用于加锁和解锁。   Mutex使用起来非常方便,但其...
  • 看了看linux 2.6 kernel的源码,下面结合代码来分析一下在X86体系结构下,互斥锁的实现原理。 代码分析 1. 首先介绍一下互斥锁所使用的数据结构:struct mutex {&nbsp;引用计数器&nbsp;1: 所可以利用。 &...
  • 互斥锁基本原理 互斥锁是一个二元变量,其状态为开锁(允许0)和上锁(禁止1),将某个共享资源与某个特定互斥锁在逻辑上绑定(要申请该资源必须先获取锁)。 访问公共资源前,必须申请该互斥锁,若处于开锁状态,则...
  • 看了看linux 2.6 kernel的源码,下面结合代码来分析一下在X86体系结构下,互斥锁的实现原理。 代码分析 1. 首先介绍一下互斥锁所使用的数据结构:struct mutex {引用计数器1: 所可以利用。 小于等于0:该锁已被...
  • Linux 互斥锁、原子操作实现原理

    万次阅读 多人点赞 2016-09-18 14:42:26
    futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如...在Linux下,信号量和线程互斥锁的实现都是通过futex系统调用。 futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,048
精华内容 419
关键字:

互斥锁的实现原理