sync.Mutex #
可以看 sync/mutex.go 中的实现,有这样一段注释:
Mutex can be in 2 modes of operations: normal and starvation. In normal mode waiters are queued in FIFO order, but a woken up waiter does not own the mutex and competes with new arriving goroutines over the ownership. New arriving goroutines have an advantage – they are already running on CPU and there can be lots of them, so a woken up waiter has good chances of losing. In such case it is queued at front of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, it switches mutex to the starvation mode.
In starvation mode ownership of the mutex is directly handed off from the unlocking goroutine to the waiter at the front of the queue. New arriving goroutines don’t try to acquire the mutex even if it appears to be unlocked, and don’t try to spin. Instead they queue themselves at the tail of the wait queue.
If a waiter receives ownership of the mutex and sees that either (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, it switches mutex back to normal operation mode.
Normal mode has considerably better performance as a goroutine can acquire a mutex several times in a row even if there are blocked waiters. Starvation mode is important to prevent pathological cases of tail latency.
对于 golang 中互斥锁的实现,有两种模式:正常模式(normal)和饥饿模式(starvation)。
normal 模式 #
waiter 按照先进先出(FIFO)来排队,当锁释放唤醒队首的 waiter,它不会直接拥有互斥锁,而是和新的 goroutine 争夺互斥锁的所有权。
一般情况下,新的 goroutine 大概率会争夺到互斥锁,因为它已经调度到 cpu 上运行了,而唤醒的 waiter 大概率会失败。在这种情况下,它被放在等待队列的前面重新等待。
当一个 waiter 超过 1ms 仍然没有获取到互斥锁的时候,就会切换到 starvation 模式
starvation 模式 #
在饥饿模式下,互斥锁的所有权直接从解锁的 goroutine 转交给处于等待队列队首的 waiter。新到来的 goroutine 不会尝试获取锁或者自旋,而是直接添加到等待队列的队尾。
当达到以下两个条件是,重新切换回 normal 模式:
- 队尾只有一个 waiter
- waiter 等待的时间少于 1ms
对比 #
normal 模式拥有更好的性能,因为一个 goroutine 可以连续获取多次互斥锁,但是可能会造成队尾的 waiter 长时间抢不到锁。
starvation 模式则可以避免队尾延迟问题。
源码剖析 #
数据结构:
type Mutex struct {
state int32 // 锁的状态,会同时被多个协程共用,使用 CAS 保证原子性,见下面常量
sema uint32 // 信号量,用于唤醒 goroutine
}
const (
mutexLocked = 1 << iota // 0b001 表示已加锁,被某个协程持有
mutexWoken // 0b010 是否有 goroutine 是唤醒状态,在尝试获取锁
mutexStarving // 0b100 饥饿模式
mutexWaiterShift = iota // waiter 的数量移位,通过 mutex.state>> mutexWaiterShift 可以得到 waiter 的数量。
starvationThresholdNs = 1e6 // 饥饿模式的时间限制, 1e6ns,即 1ms
)
runtime 内部调用:
// 确定当前 goroutine 是否可以自旋(自旋非常消耗 cpu 资源,对自旋操作也有一定的次数限制)
func runtime_canSpin(i int) bool
// 执行自旋操作
func runtime_doSpin()
// 获取当前毫秒时间戳
func runtime_nanotime() int64
// 信号量的实现,对应信号量的 P 原语操作
// s 的值代表信号量,包含一个 fifo 等待队列
// 如果 lifo 为 true,则放入队首
// skipframes 只有在开启 tracing 时有效。
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// 信号量的实现,对应信号量的 V 原语操作
// 如果 handoff 为 true,则会将锁直接转移给队首 goroutine
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
加锁 Lock()
func (m *Mutex) Lock() {
// 如果 state 的值为 0,即当前锁处于开启状态且无等待者,则直接加锁成功
// 原子的 CAS 操作
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m)) // load happens-before 保证
}
return
}
// 已经加锁状态
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 当前 goroutine 等待时间
starving := false // 当前 goroutine 是否饥饿
awoke := false // 当前 goroutine 是否已被唤醒
iter := 0 // 当前 goroutine 循环次数
old := m.state // 复制当前锁的状态
// 自旋
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// if 已上锁状态 & 非饥饿状态 & can spin
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 当前协程未被唤醒 && mutex 状态未唤醒 && waiter 数量不为 0
// 尝试将自己的状态设置为唤醒,锁的状态设置为唤醒,这样不需要去唤醒那些 waiter
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() // 自旋
iter++
old = m.state // 更新锁状态,可能自旋这段时间锁的状态被其他协程更改了
continue
}
// 至此,三种状态:
// -
// old 表示当前状态,new 表示期望状态,根据 old 状态配置
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
// 如果当前不是 starving 状态,就将期望状态 new 设置为获取锁,其他协程去排队
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果当前锁是加锁状态或 starving 状态,就将期望状态 new 中等待队列数量 +1(即 new+8)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not be true in this case.
// 如果当前协程饥饿,且其他协程持有锁,则将锁的期望状态 new 设置为饥饿
// 如果锁未被持有,就不用管
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// 如果当前协程是唤醒状态,则需要 reset 该状态
// 因为当前状态一定会转换为 sleep 或获取锁
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken // &^ 即 and not,将 new 的 mutexWoken 置为 0
}
// CAS 尝试设置锁的状态为 new,如果失败就会重新循环
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// old 不是饥饿也不是加锁状态
// 表示当前协程已经通过 CAS 成功获取锁(状态是从空闲到获取)
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
// wait 状态不为 0,说明协程已经被唤醒过,需要加入等待队首
queueLifo := waitStartTime != 0
if waitStartTime == 0 { // 如果之前没有等待过,则设置等待开始时间戳
waitStartTime = runtime_nanotime()
}
// 开始 sleep,通过 信号量 获取锁
// 对于已经等过的放到队首,否则放到队尾
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// sleep 结束,被唤醒
// 如果等待时间超过 1ms,就设置当前协程为饥饿
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state // 获取锁的当前状态
// 如果锁是饥饿状态,说明当前协程是锁释放后被唤醒的,即当前协程拿到了锁
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
// 如果当前锁的状态是唤醒状态,或者是加锁状态,或者等待队列为空
// 那就是 inconsistent,因为当前状态等待队列一定不为空,且一定是释放状态且未唤醒
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 等待队列 -1,当前协程获取了锁
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果当前协程不是饥饿状态,或当前协程是队列最后一个协程
// 退出饥饿模式
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
// 原子性更新锁的状态
atomic.AddInt32(&m.state, delta)
break
}
// 如果锁不是饥饿模式,将当前协程设为唤醒,并自旋清零重置
awoke = true
iter = 0
} else {
// CAS 获取锁失败,更新 old 状态,循环重试
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
释放锁 Unlock()
锁不会绑定至特定协程,任何协程都可以释放其他协程加的锁
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// Fast path: drop lock bit.
// 原子性地将锁的状态更新为释放状态,即 -1
new := atomic.AddInt32(&m.state, -mutexLocked)
// 如果 new 是 0,说明没有等待队列,normal 模式,空闲了,就结束了
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
// 进入这个函数,愿状态已经被 -1 了
if (new+mutexLocked)&mutexLocked == 0 {
// 说明在释放非锁定状态的锁,panic
// 先 -1 再 +1 是为了防止并发 unlock
fatal("sync: unlock of unlocked mutex")
}
// 如果是 normal 模式
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 有四种情况直接返回:
// - 没有 waiter 等待锁
// - 锁已经被其他协程获取
// - 有协程唤醒,正在获取锁
// - 锁为饥饿模式
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
// 至此说明锁是空闲的,normal 模式,队列中有 waiter 在 sleep
// 设置为被唤醒,等待队列数量 -1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1) // 信号量唤醒其他协程
return
}
old = m.state // 每个循环刷新状态
}
} else {
// Starving mode: handoff mutex ownership to the next waiter, and yield
// our time slice so that the next waiter can start to run immediately.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式,直接将锁交给队首 waiter
runtime_Semrelease(&m.sema, true, 1)
}
}
sync.RWMutex #
Go 标准库的 sync/rwmutex.go 实现使用的写优先,即如果一个写操作在等待,那么不管后续有多少个新的读操作,都是写操作优先获取到写锁。但是对于已经获取到了写锁,之后又有新的读锁与写锁在等待的情况,写锁释放后优先唤醒这些已经等待读锁。
注意
- 读写锁不可复制
- 获取与释放需要成对出现,获取了读锁的协程不能再去获取写锁
- 互斥锁不可重入,因此写锁的获取不可重入
- 获取到读锁的协程,不要重入再去获取读锁
In the terminology of the Go memory model,
type RWMutex struct {
// 用于竞争写锁
w Mutex // held if there are pending writers
// 写操作信号量,写操作在该信号量中排队等待、被唤醒
writerSem uint32 // semaphore for writers to wait for completing readers
// 读操作信号量,读操作在该信号量中排队等待、被唤醒
readerSem uint32 // semaphore for readers to wait for completing writers
// 记录当前读操作的数量
// 不管读操作获取到读锁与否,只要调用了 RLock (),都会被计数。
// 该值使用负数的方式来记录当前是否有写操作(写操作已经获取到了互斥锁 w,但是可能还在等读锁释放,也可能已经获取到了写锁)
readerCount int32 // number of pending readers
// 记录写操作请求时已经获取到读锁的读操作数量,即写操作要等待多少个读锁释放。
readerWait int32 // number of departing readers
}
// 支持读锁的最大个数
// 该值会在写操作获取写锁时被用来去减去 readerCount,使 readerCount 变成一个负数,用代表有写操作到来了,进而实现阻塞后续新的读操作获取到读锁
const rwmutexMaxReaders = 1 << 30