# 单机锁实现原理
两种模式的转换条件:
- 默认为正常模式
- 正常模式 -> 饥饿模式:
- 饥饿模式 -> 正常模式:
# Sync.Mutex
# 数据结构 与 状态位
type Mutex struct { | |
state int32 | |
sema uint32 | |
} |
- state: 锁中最核心的状态字段,不同 bit 位分别存储了 mutexLocked (是否上锁)、mutexWoken (是否有等待的 goroutine 被唤醒)、mutexStarving (是否处于饥饿模式) 等信息。
- sema: 用于阻塞和唤醒 goroutine 的信号量。
# state 含义
state 是一个复合状态字段,通过位标志 (bit flag) 来表示多种锁的状态,其结构如下:
const ( | |
mutexLocked = 1 << iota // 表示锁已被持有(上锁状态) | |
mutexWoken // 标记是否有 goroutine 已经被唤醒 | |
mutexStarving // 标记锁是否处于饥饿模式 | |
mutexWaiterShift = iota // 等待者计数器的起始位偏移量 | |
starvationThresholdNs = 1e6 // 饥饿模式切换的等待时间阈值(单位:纳秒) | |
) |
- mutexLocked = 1: state 最低位的一个 bit 位标志是否上锁,0 - 未上锁,1 - 已上锁
- mutexWoken = 2: state 的第 2 个 bit 位标志是否有 goroutine 已经被唤醒,0 - 未唤醒,1 - 已唤醒
- mutexStarving = 4: state 的第 3 个 bit 位标志锁是否处于饥饿模式,0 - 非饥饿模式,1 - 饥饿模式
- mutexWaiterShift = 3: 其作用是表示,右侧存在 3bit 位标识特殊信息,并非是移位运算,没有额外的作用
- starvationThresholdNs = 1ms: sync.Mutex 进入饥饿模式的等待时间阈值
以二进制位分布来看 (低位在右):
[ Waiter Count (29 bits) ][ Starving ][ Woken ][ Locked ]
31 3 2 1 0
-
低 3 位是状态位
-
高 29 位是等待者数量
-
state & mutexLocked: 判断是否上锁
-
state | mutexLocked: 上锁
-
state & mutexWoken: 判断是否存在抢锁的协程
-
state | mutexWoken: 更新状态,标识存在抢锁的协程
-
state &^ mutexWoken: 更新状态,标识不存在抢锁的协程
-
state & mutexStarving: 判断是否处于饥饿模式
-
state | mutexStarving: 置为饥饿模式
-
state >> mutexWaiterShift: 获取等待队列中的等待者数量
-
state += 1 << mutexWaiterShift: 阻塞等待的协程数 + 1
# Mutex.Lock()
- 首先尝试通过原子操作将 state 字段的最低位设置为 1,表示上锁,如果成功则直接返回,否则进入慢速路径
func (m *Mutex) Lock() { | |
// Fast path: grab unlocked mutex. | |
// 当锁完全空闲(state == 0)时,直接 CAS 设置为 mutexLocked | |
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { | |
if race.Enabled { | |
race.Acquire(unsafe.Pointer(m)) | |
} | |
return | |
} | |
// Slow path (outlined so that the fast path can be inlined) | |
// 否则进入 lockSlow () | |
m.lockSlow() | |
} |
- Fast Path: 直接 CAS 抢锁
- Slow Path: 竞争失败后进入复杂逻辑(自旋、阻塞、饥饿判定等)
- Mutex.lockSlow()
func (m *Mutex) lockSlow() { | |
// 标识当前 goroutine 在抢锁过程中的等待时间,单位为 ns | |
var waitStartTime int64 | |
// 标识当前 goroutine 是否处于饥饿模式 | |
starving := false | |
// 标识当前是否已有协程在等待锁 | |
awoke := false | |
// 标识当前 goroutine 参与自旋的次数 | |
iter := 0 | |
// 临时存储锁的 state 值 | |
old := m.state | |
for { | |
// 进入该 if 分支,说明抢锁失败 | |
// 1. 当前锁已被持有(state & mutexLocked == 1 | |
// 2. 当前锁处于饥饿模式(state & mutexStarving == 1) | |
// 3. 满足自旋条件 (runtime_canSpin (iter)) | |
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { | |
// 进入该 if 分支,说明 | |
// 1. 当前 goroutine 还没设置过唤醒标志 (awoke == false) | |
// 2. 没有其他协程被唤醒 (old & mutexWoken == 0) | |
// 3. 阻塞队列中存在等待的协程 (old>> mutexWaiterShift != 0) | |
// 4. CAS 将 mutexWoken 置为 1,避免其他协程被唤醒和自己抢锁 | |
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && | |
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { | |
awoke = true | |
} | |
// 调用 runtime_doSpin () 告知调度器 P 当前处于自旋模式 | |
runtime_doSpin() | |
// 更新自旋 iter 和锁状态值 | |
iter++ | |
old = m.state | |
// 进行下一次尝试 | |
continue | |
} | |
// 自旋抢锁失败后处理...... | |
// 从自旋中走出来后,会存在两种分支,要么加锁成功,要么陷入自锁,不论是何种情形,都会先对 sync.Mutex 的状态新值 new 进行更新 | |
new := old | |
// Don't try to acquire starving mutex, new arriving goroutines must queue. | |
// 表示当前不是饥饿模式 | |
if old&mutexStarving == 0 { | |
// 在新的 new 中置为已加锁,即尝试抢锁 | |
new |= mutexLocked | |
} | |
// 如果 old 状态值,处于 locked 或者饥饿状态,则当前 goroutine 在这一轮注定无法抢锁成功 | |
if old&(mutexLocked|mutexStarving) != 0 { | |
// 将阻塞队列的 goroutine 数量 + 1 | |
new += 1 << mutexWaiterShift | |
} | |
// 如果是饥饿模式,并且锁已经被占有 | |
if starving && old&mutexLocked != 0 { | |
// 将新值置为饥饿模式 | |
new |= mutexStarving | |
} | |
// 如果当前 goroutine 被唤醒过,则将 mutexWoken 置为 0,表示当前 goroutine 已经处理完被唤醒后的逻辑 | |
if awoke { | |
// The goroutine has been woken from sleep, | |
// so we need to reset the flag in either case. | |
if new&mutexWoken == 0 { | |
throw("sync: inconsistent mutex state") | |
} | |
new &^= mutexWoken | |
} | |
// 尝试更新 state 值,如果更新成功,则表示抢锁成功,退出循环 | |
// 通过 CAS 操作,用构造的新值替换旧值 | |
if atomic.CompareAndSwapInt32(&m.state, old, new) { | |
// 验证是否加锁成功,如果旧值是未加锁状态且为正常模式,则意味着教唆标识位正是由当前 goroutine 完成的更新,说明加锁成功,返回即可。 | |
if old&(mutexLocked|mutexStarving) == 0 { | |
// 😈 加锁成功出口 | |
break // locked the mutex with CAS | |
} | |
// If we were already waiting before, queue at the front of the queue. | |
// 如果当前 goroutine 已经等待过,则采用 LIFO(后进先出)队列方式排队,否则采用 FIFO(先进先出)队列方式排队。这样可以优化唤醒顺序,减少饥饿。 | |
queueLifo := waitStartTime != 0 | |
//waitStartTime 是否初始化过 | |
// 如果是第一次进入等待,则记录当前时间,用于后续判断是否进入饥饿模式 | |
if waitStartTime == 0 { | |
waitStartTime = runtime_nanotime() | |
} | |
// 🏜阻塞挂起:当前 goroutine 进入阻塞模式,等待锁释放。queueLifo 决定排队顺序,1 表示只等待一个信号量 | |
runtime_SemacquireMutex(&m.sema, queueLifo, 1) | |
// 🤩从阻塞态被唤醒后 | |
// 计算当前等锁的时间是否超过 1ms,超过则开启饥饿模式。饥饿模式下,锁会优先分配给等待时间最长的 goroutine,防止饿死 | |
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs | |
// 获取最新锁状态 | |
old = m.state | |
// 唤醒前是否处于饥饿模式 | |
// 如果原本就是饥饿模式,而当前 goroutine 正好被唤醒,意味着一定加锁成功了 | |
if old&mutexStarving != 0 { | |
// 如果锁处于加锁或唤醒状态,或者没有等待者,说明状态异常,直接抛出错误。 | |
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { | |
throw("sync: inconsistent mutex state") | |
} | |
// 更新锁的状态,由 0 改为 1, 置为已加锁状态 | |
// 阻塞等待 goroutine 数量 -1(当前 goroutine 已经获得锁) | |
delta := int32(mutexLocked - 1<<mutexWaiterShift) | |
// 等锁的时间小于 1ms,或者 当前 goroutine 是阻塞队列中的最后一个 goroutine,则退出饥饿模式,恢复正常模式 | |
if !starving || old>>mutexWaiterShift == 1 { | |
// | |
delta -= mutexStarving | |
} | |
atomic.AddInt32(&m.state, delta) | |
// 加锁成功 出口 | |
break | |
} | |
// 如果不是饥饿模式 | |
// 当前 goroutine 正在抢锁 | |
awoke = true | |
// 出队后,迭代值恢复 0 值 | |
iter = 0 | |
} else { | |
// CAS 操作失败,重新获取锁的状态值 | |
old = m.state | |
} | |
} | |
if race.Enabled { | |
race.Acquire(unsafe.Pointer(m)) | |
} | |
} |
# Mutex.Unlock()
- Unlock()
func (m *Mutex) Unlock() { | |
if race.Enabled { | |
_ = m.state | |
race.Release(unsafe.Pointer(m)) | |
} | |
// 如果当前除了当前 goroutine 之外,没有其他协程在参与竞争 | |
// 通过原子操作解锁,将 state 的 mutexLocked 位清零,表示当前 goroutine 释放了锁 | |
new := atomic.AddInt32(&m.state, -mutexLocked) | |
// 如果不为 0,说明还有其他 goroutine 在参与竞争,此时进入慢路径 unlockSlow, 负责唤醒阻塞队列中写一个等待者。 | |
if new != 0 { | |
// Outlined slow path to allow inlining the fast path. | |
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. | |
m.unlockSlow(new) | |
} | |
} |
- unlockSlow()
func (m *Mutex) unlockSlow(new int32) { | |
// 解锁前是否已经加锁了,如果此前未加锁,则直接抛出 fatal,防止非法操作 | |
if (new+mutexLocked)&mutexLocked == 0 { | |
// 无法释放一把未被占用的锁 | |
fatal("sync: unlock of unlocked mutex") | |
} | |
// 如果此时不处于饥饿模式 | |
if new&mutexStarving == 0 { | |
// 非饥饿模式下唤醒逻辑 | |
old := new | |
for { | |
// 判断是否要有从阻塞队列唤醒 goroutine 的动作 | |
// 1. 如果没有等待的阻塞协程 (old>> mutexWaiterShift == 0) | |
// 2. 说明已经有其他协程加锁成功了,应该由其他的写成来唤醒 (old & mutexLocked != 0) | |
// 3. 有协程已被唤醒 (old & mutexWoken != 0) | |
// 4. 处于饥饿模式 (old & mutexStarving != 0) | |
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { | |
return | |
} | |
// 更新状态: 等待者数量减 1,设置 mutexWoken 为 1,表示当前 goroutine 已经完成唤醒操作 | |
new = (old - 1<<mutexWaiterShift) | mutexWoken | |
// CAS 更新状态成功后,唤醒一个阻塞的 goroutine | |
if atomic.CompareAndSwapInt32(&m.state, old, new) { | |
// 唤醒后通过信号量通知 | |
runtime_Semrelease(&m.sema, false, 1) | |
return | |
} | |
// CAS 更新状态失败,重新获取锁的状态值 | |
old = m.state | |
} | |
} else { | |
// 处于饥饿模式,直接唤醒阻塞队列的队首的 goroutine,保证公平性 | |
runtime_Semrelease(&m.sema, true, 1) | |
} | |
} |
# RWMutex
- 从逻辑上,可以把 RWMutex 理解为一把读锁加一把写锁
- 写锁具有严格的排他性,当其被占用,其他试图取写锁或者读锁的 goroutine 均被阻塞
- 读锁具有有限的共享性,当其被占用,试图取写锁的 goroutine 会阻塞,适度取读锁的 goroutine 与当前 goroutine 共享读锁
- 综上,RWMutex 适合于读多写少的场景,最理想的情况,当所有操作均使用读锁,则可实现无锁化;最悲观的情况,所有操作均使用写锁,则 RWMutex 退化为普通的 Mutex
# 数据结构
type RWMutex struct { | |
// 互斥锁 | |
w Mutex // held if there are pending writers | |
// 关联写锁阻塞队列的信号量 | |
writerSem uint32 // semaphore for writers to wait for completing readers | |
// 关联读锁阻塞队列的信号量 | |
readerSem uint32 // semaphore for readers to wait for completing writers | |
// 表示当前等待或者占用读锁的 goroutine 数量 | |
readerCount atomic.Int32 // number of pending readers | |
// 表示当前 goroutine 回去写锁前,还需要等待多少个读锁的 goroutine 释放读锁 | |
readerWait atomic.Int32 // number of departing readers | |
} | |
// 共享读锁的 goroutine 数量上限 | |
const rwmutexMaxReaders = 1 << 30 |
# RLock 读锁加锁
func (rw *RWMutex) RLock() { | |
if race.Enabled { | |
_ = rw.w.state | |
race.Disable() | |
} | |
// 将 RWMutex 的 readerCount +1,表示占用或等待锁的 goroutine +1 | |
// 如果返回值小于 0,说明有其他协程未释放写锁,此时需要将当前 goroutine 添加到读锁的阻塞队列中并阻塞挂起当前协程,直到写锁释放 (runtime_SemacquireRWMutexR) | |
if rw.readerCount.Add(1) < 0 { | |
// A writer is pending, wait for it. | |
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0) | |
} | |
if race.Enabled { | |
race.Enable() | |
race.Acquire(unsafe.Pointer(&rw.readerSem)) | |
} | |
} |
# RUnlock && rUnlockSlow
func (rw *RWMutex) RUnlock() { | |
if race.Enabled { | |
_ = rw.w.state | |
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) | |
race.Disable() | |
} | |
// RWMutex 的 readerCount -1,表示占用或等待读锁的 goroutine 数量 -1 | |
// 如果返回值小于 0,说明有其他协程正在获取写锁,则走入慢路径 rUnlockSlow 流程中 | |
// 快路径下只做计数操作,慢路径才涉及唤醒写锁等待者,保证写锁优先 | |
if r := rw.readerCount.Add(-1); r < 0 { | |
// Outlined slow-path to allow the fast-path to be inlined | |
rw.rUnlockSlow(r) | |
} | |
if race.Enabled { | |
race.Enable() | |
} | |
} |
func (rw *RWMutex) rUnlockSlow(r int32) { | |
// 如果当前这把锁没有被加过读锁却调用 RUnLock,直接抛出致命错误,防止非法操作 | |
if r+1 == 0 || r+1 == -rwmutexMaxReaders { | |
race.Enable() | |
fatal("sync: RUnlock of unlocked RWMutex") | |
} | |
// A writer is pending. | |
// 如果有写锁正在等待(即 readerCount < 0 ),则每个读锁释放时都让 readerWait -1,表示一个读锁释放了 | |
// 当最后一个读锁释放(rw.readerWait.Add (-1) == 0),说明所有的读锁都已经释放,此时唤醒等待写锁的 goroutine | |
if rw.readerWait.Add(-1) == 0 { | |
// The last reader unblocks the writer. | |
此时唤醒等待写锁的goroutine | |
runtime_Semrelease(&rw.writerSem, false, 1) | |
} | |
} |
# Lock 写锁加锁
func (rw *RWMutex) Lock() { | |
if race.Enabled { | |
_ = rw.w.state | |
race.Disable() | |
} | |
// First, resolve competition with other writers. | |
// 1. 先竞争写锁,保证同一时刻只有一个写锁请求进入 | |
rw.w.Lock() | |
// Announce to readers there is a pending writer. | |
// 2. 通知所有读锁有写锁请求,将 readerCount 变为负数,阻止新的读锁进入 | |
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders | |
// Wait for active readers. | |
// 3. 等待所有活跃读锁释放,如果有活跃读锁,则将当前 goroutine 加入读锁阻塞队列,并阻塞当前 goroutine | |
if r != 0 && rw.readerWait.Add(r) != 0 { | |
runtime_SemacquireRWMutex(&rw.writerSem, false, 0) | |
} | |
...... | |
} |
# Unlock 写锁解锁
func (rw *RWMutex) Unlock() { | |
...... | |
// Announce to readers there is no active writer. | |
// 1. 通知读锁没有活跃写锁,将 readerCount 恢复为正数,允许新的读锁进入 | |
r := rw.readerCount.Add(rwmutexMaxReaders) | |
if r >= rwmutexMaxReaders { | |
race.Enable() | |
fatal("sync: Unlock of unlocked RWMutex") | |
} | |
// Unblock blocked readers, if any. | |
// 2. 唤醒所有被阻塞的读锁 | |
for i := 0; i < int(r); i++ { | |
runtime_Semrelease(&rw.readerSem, false, 0) | |
} | |
// Allow other writers to proceed. | |
// 3. 释放写锁,允许其他写锁请求进入 | |
rw.w.Unlock() | |
if race.Enabled { | |
race.Enable() | |
} | |
} |