# 单机锁实现原理

两种模式的转换条件:

  • 默认为正常模式
  • 正常模式 -> 饥饿模式:
  • 饥饿模式 -> 正常模式:

# Sync.Mutex

# 数据结构 与 状态位

type Mutex struct {
	state int32
	sema  uint32
}
  • state: 锁中最核心的状态字段,不同 bit 位分别存储了 mutexLocked (是否上锁)、mutexWoken (是否有等待的 goroutine 被唤醒)、mutexStarving (是否处于饥饿模式) 等信息。
  • sema: 用于阻塞和唤醒 goroutine 的信号量。

# state 含义

state 是一个复合状态字段,通过位标志 (bit flag) 来表示多种锁的状态,其结构如下:

const (
	mutexLocked        = 1 << iota // 表示锁已被持有(上锁状态)
	mutexWoken                     // 标记是否有 goroutine 已经被唤醒
	mutexStarving                  // 标记锁是否处于饥饿模式
	mutexWaiterShift     = iota          // 等待者计数器的起始位偏移量
	starvationThresholdNs = 1e6 // 饥饿模式切换的等待时间阈值(单位:纳秒)
)
  • mutexLocked = 1: state 最低位的一个 bit 位标志是否上锁,0 - 未上锁,1 - 已上锁
  • mutexWoken = 2: state 的第 2 个 bit 位标志是否有 goroutine 已经被唤醒,0 - 未唤醒,1 - 已唤醒
  • mutexStarving = 4: state 的第 3 个 bit 位标志锁是否处于饥饿模式,0 - 非饥饿模式,1 - 饥饿模式
  • mutexWaiterShift = 3: 其作用是表示,右侧存在 3bit 位标识特殊信息,并非是移位运算,没有额外的作用
  • starvationThresholdNs = 1ms: sync.Mutex 进入饥饿模式的等待时间阈值

以二进制位分布来看 (低位在右):

  [ Waiter Count (29 bits) ][ Starving ][ Woken ][ Locked ]
  31                    3      2          1         0
  • 低 3 位是状态位

  • 高 29 位是等待者数量

  • state & mutexLocked: 判断是否上锁

  • state | mutexLocked: 上锁

  • state & mutexWoken: 判断是否存在抢锁的协程

  • state | mutexWoken: 更新状态,标识存在抢锁的协程

  • state &^ mutexWoken: 更新状态,标识不存在抢锁的协程

  • state & mutexStarving: 判断是否处于饥饿模式

  • state | mutexStarving: 置为饥饿模式

  • state >> mutexWaiterShift: 获取等待队列中的等待者数量

  • state += 1 << mutexWaiterShift: 阻塞等待的协程数 + 1

# Mutex.Lock()

  1. 首先尝试通过原子操作将 state 字段的最低位设置为 1,表示上锁,如果成功则直接返回,否则进入慢速路径
func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
    // 当锁完全空闲(state == 0)时,直接 CAS 设置为 mutexLocked
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
    // 否则进入 lockSlow ()
	m.lockSlow()
}
  • Fast Path: 直接 CAS 抢锁
  • Slow Path: 竞争失败后进入复杂逻辑(自旋、阻塞、饥饿判定等)
  1. Mutex.lockSlow()
func (m *Mutex) lockSlow() {
	// 标识当前 goroutine 在抢锁过程中的等待时间,单位为 ns
	var waitStartTime int64
	// 标识当前 goroutine 是否处于饥饿模式
	starving := false
	// 标识当前是否已有协程在等待锁
	awoke := false
	// 标识当前 goroutine 参与自旋的次数
	iter := 0
	// 临时存储锁的 state 值
	old := m.state
	for {
		// 进入该 if 分支,说明抢锁失败
        // 1. 当前锁已被持有(state & mutexLocked == 1
        // 2. 当前锁处于饥饿模式(state & mutexStarving == 1)
        // 3. 满足自旋条件 (runtime_canSpin (iter))
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 进入该 if 分支,说明
            // 1. 当前 goroutine 还没设置过唤醒标志 (awoke == false)
            // 2. 没有其他协程被唤醒 (old & mutexWoken == 0)
            // 3. 阻塞队列中存在等待的协程 (old>> mutexWaiterShift != 0)
            // 4. CAS 将 mutexWoken 置为 1,避免其他协程被唤醒和自己抢锁
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
            // 调用 runtime_doSpin () 告知调度器 P 当前处于自旋模式
			runtime_doSpin()
            // 更新自旋 iter 和锁状态值
			iter++
			old = m.state
            // 进行下一次尝试
			continue
		}
		
		// 自旋抢锁失败后处理......
        // 从自旋中走出来后,会存在两种分支,要么加锁成功,要么陷入自锁,不论是何种情形,都会先对 sync.Mutex 的状态新值 new 进行更新
		
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		// 表示当前不是饥饿模式
		if old&mutexStarving == 0 {
            // 在新的 new 中置为已加锁,即尝试抢锁
			new |= mutexLocked
		}
		// 如果 old 状态值,处于 locked 或者饥饿状态,则当前 goroutine 在这一轮注定无法抢锁成功
		if old&(mutexLocked|mutexStarving) != 0 {
            // 将阻塞队列的 goroutine 数量 + 1
			new += 1 << mutexWaiterShift
		}
		// 如果是饥饿模式,并且锁已经被占有
		if starving && old&mutexLocked != 0 {
            // 将新值置为饥饿模式
			new |= mutexStarving
		}
		// 如果当前 goroutine 被唤醒过,则将 mutexWoken 置为 0,表示当前 goroutine 已经处理完被唤醒后的逻辑
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken
		}
		
		
		// 尝试更新 state 值,如果更新成功,则表示抢锁成功,退出循环
        // 通过 CAS 操作,用构造的新值替换旧值
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			// 验证是否加锁成功,如果旧值是未加锁状态且为正常模式,则意味着教唆标识位正是由当前 goroutine 完成的更新,说明加锁成功,返回即可。
			if old&(mutexLocked|mutexStarving) == 0 {
                // 😈 加锁成功出口
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.
			//  如果当前 goroutine 已经等待过,则采用 LIFO(后进先出)队列方式排队,否则采用 FIFO(先进先出)队列方式排队。这样可以优化唤醒顺序,减少饥饿。
			queueLifo := waitStartTime != 0
			//waitStartTime 是否初始化过
			// 如果是第一次进入等待,则记录当前时间,用于后续判断是否进入饥饿模式
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			// 🏜阻塞挂起:当前 goroutine 进入阻塞模式,等待锁释放。queueLifo 决定排队顺序,1 表示只等待一个信号量
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			// 🤩从阻塞态被唤醒后
            // 计算当前等锁的时间是否超过 1ms,超过则开启饥饿模式。饥饿模式下,锁会优先分配给等待时间最长的 goroutine,防止饿死
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			// 获取最新锁状态
			old = m.state
			// 唤醒前是否处于饥饿模式
			// 如果原本就是饥饿模式,而当前 goroutine 正好被唤醒,意味着一定加锁成功了
			if old&mutexStarving != 0 {
				// 如果锁处于加锁或唤醒状态,或者没有等待者,说明状态异常,直接抛出错误。
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				// 更新锁的状态,由 0 改为 1, 置为已加锁状态
				// 阻塞等待 goroutine 数量 -1(当前 goroutine 已经获得锁)
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
                // 等锁的时间小于 1ms,或者 当前 goroutine 是阻塞队列中的最后一个 goroutine,则退出饥饿模式,恢复正常模式
				if !starving || old>>mutexWaiterShift == 1 {
					// 
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				// 加锁成功 出口
				break
			}
			// 如果不是饥饿模式
			// 当前 goroutine 正在抢锁
			awoke = true
			// 出队后,迭代值恢复 0 值
			iter = 0
		} else {
			// CAS 操作失败,重新获取锁的状态值
			old = m.state
		}
	}
	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

# Mutex.Unlock()

  1. Unlock()
func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}
    // 如果当前除了当前 goroutine 之外,没有其他协程在参与竞争
	// 通过原子操作解锁,将 state 的 mutexLocked 位清零,表示当前 goroutine 释放了锁   
	new := atomic.AddInt32(&m.state, -mutexLocked)
	// 如果不为 0,说明还有其他 goroutine 在参与竞争,此时进入慢路径 unlockSlow, 负责唤醒阻塞队列中写一个等待者。
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}
  1. unlockSlow()
func (m *Mutex) unlockSlow(new int32) {
	// 解锁前是否已经加锁了,如果此前未加锁,则直接抛出 fatal,防止非法操作
	if (new+mutexLocked)&mutexLocked == 0 {
		// 无法释放一把未被占用的锁
		fatal("sync: unlock of unlocked mutex")
	}
	// 如果此时不处于饥饿模式
	if new&mutexStarving == 0 {
		// 非饥饿模式下唤醒逻辑
		old := new
		for {
			// 判断是否要有从阻塞队列唤醒 goroutine 的动作
			// 1. 如果没有等待的阻塞协程 (old>> mutexWaiterShift == 0) 
			// 2. 说明已经有其他协程加锁成功了,应该由其他的写成来唤醒 (old & mutexLocked != 0)
			// 3. 有协程已被唤醒 (old & mutexWoken != 0)
			// 4. 处于饥饿模式 (old & mutexStarving != 0)
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 更新状态: 等待者数量减 1,设置 mutexWoken 为 1,表示当前 goroutine 已经完成唤醒操作
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			// CAS 更新状态成功后,唤醒一个阻塞的 goroutine
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 唤醒后通过信号量通知
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			// CAS 更新状态失败,重新获取锁的状态值
			old = m.state
		}
	} else {
		// 处于饥饿模式,直接唤醒阻塞队列的队首的 goroutine,保证公平性
		runtime_Semrelease(&m.sema, true, 1)
	}
}

# RWMutex

  • 从逻辑上,可以把 RWMutex 理解为一把读锁加一把写锁
  • 写锁具有严格的排他性,当其被占用,其他试图取写锁或者读锁的 goroutine 均被阻塞
  • 读锁具有有限的共享性,当其被占用,试图取写锁的 goroutine 会阻塞,适度取读锁的 goroutine 与当前 goroutine 共享读锁
  • 综上,RWMutex 适合于读多写少的场景,最理想的情况,当所有操作均使用读锁,则可实现无锁化;最悲观的情况,所有操作均使用写锁,则 RWMutex 退化为普通的 Mutex

# 数据结构

type RWMutex struct {
	// 互斥锁
	w           Mutex        // held if there are pending writers
    // 关联写锁阻塞队列的信号量
	writerSem   uint32       // semaphore for writers to wait for completing readers
    // 关联读锁阻塞队列的信号量
	readerSem   uint32       // semaphore for readers to wait for completing writers
    // 表示当前等待或者占用读锁的 goroutine 数量
	readerCount atomic.Int32 // number of pending readers
    // 表示当前 goroutine 回去写锁前,还需要等待多少个读锁的 goroutine 释放读锁
	readerWait  atomic.Int32 // number of departing readers
}
// 共享读锁的 goroutine 数量上限
const rwmutexMaxReaders = 1 << 30

# RLock 读锁加锁

func (rw *RWMutex) RLock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// 将 RWMutex 的 readerCount +1,表示占用或等待锁的 goroutine +1
	// 如果返回值小于 0,说明有其他协程未释放写锁,此时需要将当前 goroutine 添加到读锁的阻塞队列中并阻塞挂起当前协程,直到写锁释放 (runtime_SemacquireRWMutexR)
	if rw.readerCount.Add(1) < 0 {
		// A writer is pending, wait for it.
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

# RUnlock && rUnlockSlow

func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
    // RWMutex 的 readerCount -1,表示占用或等待读锁的 goroutine 数量 -1
    // 如果返回值小于 0,说明有其他协程正在获取写锁,则走入慢路径 rUnlockSlow 流程中
	// 快路径下只做计数操作,慢路径才涉及唤醒写锁等待者,保证写锁优先
	if r := rw.readerCount.Add(-1); r < 0 {
		// Outlined slow-path to allow the fast-path to be inlined
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
	//  如果当前这把锁没有被加过读锁却调用 RUnLock,直接抛出致命错误,防止非法操作
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		fatal("sync: RUnlock of unlocked RWMutex")
	}
	// A writer is pending.
	// 如果有写锁正在等待(即 readerCount < 0 ),则每个读锁释放时都让 readerWait -1,表示一个读锁释放了
	// 当最后一个读锁释放(rw.readerWait.Add (-1) == 0),说明所有的读锁都已经释放,此时唤醒等待写锁的 goroutine
	if rw.readerWait.Add(-1) == 0 {
		// The last reader unblocks the writer.
		此时唤醒等待写锁的goroutine
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

# Lock 写锁加锁

func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// First, resolve competition with other writers.
	// 1. 先竞争写锁,保证同一时刻只有一个写锁请求进入
	rw.w.Lock()
	// Announce to readers there is a pending writer.
	// 2. 通知所有读锁有写锁请求,将 readerCount 变为负数,阻止新的读锁进入
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	// Wait for active readers.
	// 3. 等待所有活跃读锁释放,如果有活跃读锁,则将当前 goroutine 加入读锁阻塞队列,并阻塞当前 goroutine
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
	......
}

# Unlock 写锁解锁

func (rw *RWMutex) Unlock() {
	......
	// Announce to readers there is no active writer.
	// 1. 通知读锁没有活跃写锁,将 readerCount 恢复为正数,允许新的读锁进入
	r := rw.readerCount.Add(rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		fatal("sync: Unlock of unlocked RWMutex")
	}
	// Unblock blocked readers, if any.
	// 2. 唤醒所有被阻塞的读锁
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// Allow other writers to proceed.
	// 3. 释放写锁,允许其他写锁请求进入
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}
更新于 阅读次数

请我喝[茶]~( ̄▽ ̄)~*

ZJM 微信支付

微信支付

ZJM 支付宝

支付宝