Mutex fairness.
Mutex can be in 2 modes of operations: normal and starvation.
In normal mode waiters are queued in FIFO order, but a woken up waiter
does not own the mutex and competes with new arriving goroutines over
the ownership. New arriving goroutines have an advantage -- they are
already running on CPU and there can be lots of them, so a woken up
waiter has good chances of losing. In such case it is queued at front
of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
it switches mutex to the starvation mode.
In starvation mode ownership of the mutex is directly handed off from
the unlocking goroutine to the waiter at the front of the queue.
New arriving goroutines don't try to acquire the mutex even if it appears
to be unlocked, and don't try to spin. Instead they queue themselves at
the tail of the wait queue.
If a waiter receives ownership of the mutex and sees that either
(1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
it switches mutex back to normal operation mode.
Normal mode has considerably better performance as a goroutine can acquire
a mutex several times in a row even if there are blocked waiters.
Starvation mode is important to prevent pathological cases of tail latency.
// sync/mutex.go 72 行func(m*Mutex)Lock(){// Fast path: grab unlocked mutex.ifatomic.CompareAndSwapInt32(&m.state,0,mutexLocked){return}// Slow path (outlined so that the fast path can be inlined)m.lockSlow()}
ifatomic.CompareAndSwapInt32(&m.state,old,new){ifold&(mutexLocked|mutexStarving)==0{break// locked the mutex with CAS}queueLifo:=waitStartTime!=0ifwaitStartTime==0{waitStartTime=runtime_nanotime()}runtime_SemacquireMutex(&m.sema,queueLifo,1)starving=starving||runtime_nanotime()-waitStartTime>starvationThresholdNsold=m.stateifold&mutexStarving!=0{ifold&(mutexLocked|mutexWoken)!=0||old>>mutexWaiterShift==0{throw("sync: inconsistent mutex state")}delta:=int32(mutexLocked-1<<mutexWaiterShift)if!starving||old>>mutexWaiterShift==1{delta-=mutexStarving}atomic.AddInt32(&m.state,delta)break}awoke=trueiter=0}else{old=m.state}
如果没有通过 CAS 获得锁,会调用 runtime.sync_runtime_SemacquireMutex 通过信号量保证资源不会被两个 Goroutine 获取。
varwaitStartTimeint64for{// for 循环里尝试获取锁ifwaitStartTime==0{// waitStartTime 只有第一次执行时才会赋值waitStartTime=runtime_nanotime()}// 如果等待时间超过 1ms 则切换到饥饿模式starving=starving||runtime_nanotime()-waitStartTime>starvationThresholdNs}
Unlock
相比之下互斥锁的解锁过程就比较简单,同样分为 Fast path 和 Slow path。
1
2
3
4
5
6
7
8
9
func(m*Mutex)Unlock(){// Fast path: drop lock bit.new:=atomic.AddInt32(&m.state,-mutexLocked)ifnew!=0{// Outlined slow path to allow inlining the fast path.// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.m.unlockSlow(new)}}
func(m*Mutex)unlockSlow(newint32){if(new+mutexLocked)&mutexLocked==0{throw("sync: unlock of unlocked mutex")}ifnew&mutexStarving==0{old:=newfor{ifold>>mutexWaiterShift==0||old&(mutexLocked|mutexWoken|mutexStarving)!=0{return}new=(old-1<<mutexWaiterShift)|mutexWokenifatomic.CompareAndSwapInt32(&m.state,old,new){runtime_Semrelease(&m.sema,false,1)return}old=m.state}}else{runtime_Semrelease(&m.sema,true,1)}}
先校验锁状态的合法性 — 如果当前互斥锁已经被解锁过了会直接抛出异常 “sync: unlock of unlocked mutex” 中止当前程序。
1
2
3
if(new+mutexLocked)&mutexLocked==0{throw("sync: unlock of unlocked mutex")}
// runtime/sema.go 65行funcsync_runtime_Semrelease(addr*uint32,handoffbool,skipframesint){semrelease1(addr,handoff,skipframes)}funcsemrelease1(addr*uint32,handoffbool,skipframesint){root:=semroot(addr)atomic.Xadd(addr,1)ifatomic.Load(&root.nwait)==0{return}// Harder case: search for a waiter and wake it.lockWithRank(&root.lock,lockRankRoot)ifatomic.Load(&root.nwait)==0{unlock(&root.lock)return}s,t0:=root.dequeue(addr)ifs!=nil{atomic.Xadd(&root.nwait,-1)}unlock(&root.lock)ifs!=nil{// May be slow or even yield, so unlock firstacquiretime:=s.acquiretimeifacquiretime!=0{mutexevent(t0-acquiretime,3+skipframes)}ifs.ticket!=0{throw("corrupted semaphore ticket")}ifhandoff&&cansemacquire(addr){s.ticket=1}readyWithTime(s,5+skipframes)ifs.ticket==1&&getg().m.locks==0{goyield()}}}
3. 小结
互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念:
如果互斥锁处于初始化状态,会通过置位 mutexLocked 加锁;
如果互斥锁处于 mutexLocked 状态并且在普通模式下工作,会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;