golang内置锁的分类
类型 | 实现类 |
---|---|
互斥锁 | sync.Mutex |
读写锁 | sync.RWMutex |
其中读写锁是通过互斥锁进行实现的
互斥锁源码解析
信号量概念
首先应弄清PV操作的含义:
PV操作由P操作原语和V操作原语组成(原语是不可中断的过程),对信号量进行操作,具体定义如下:
P(S):①将信号量S的值减1,即S=S-1;
②如果S³0,则该进程继续执行;否则该进程置为等待状态,排入等待队列。
V(S):①将信号量S的值加1,即S=S+1;
②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程。
PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信。
源码注释
//接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}
Mutex实现中有两种模式
- 正常模式 协程会排队(FIFO[First In First Out 即先进先出])
- 饥饿模式 当某个goroutine等待时间太久了就会进入饥饿模式
前者指的是当一个协程获取到锁时,后面的协程会排队(FIFO[First In First Out 即先进先出]),释放锁时会唤醒最早排队的协程,这个协程会和正在CPU上运行的协程竞争锁,但是大概率会失败,为什么呢?因为你是刚被唤醒的,还没有获得CPU的使用权,而CPU正在执行的协程肯定比你有优势,如果这个被唤醒的协程竞争失败,并且超过了1ms,那么就会退回到后者(饥饿模式),这种模式下,该协程在下次获取锁时直接得到,不存在竞争关系,本质是为了防止协程等待锁的时间太长。
const (
mutexLocked = 1 << iota //1, 0001 最后一位表示当前锁的状态,0未锁,1已锁
mutexWoken //2, 0010,倒数第二位表示当前锁是否会被唤醒,0唤醒,1未唤醒
mutexStarving //4, 0100 倒数第三位表示当前对象是否为饥饿模式,0正常,1饥饿
mutexWaiterShift = iota //3 从倒数第四位往前的bit表示排队的gorouting数量
starvationThresholdNs = 1e6 // 饥饿的阈值:1ms
)
//实现类
type Mutex struct {
state int32 // 0代表未获取到锁,1代表得到锁,2-2^31表示gorouting排队的数量的
sema uint32 // 非负数的信号量,阻塞协程的依据
}
func (m *Mutex) Lock() {
// 快速的方式:获取解锁的互斥锁
// 首先判断是否&m.state是否是0如果不是0的话讲mutexLocked赋值到&m.state(这个操作是原子操作,CompareAndSwapInt32提供)
//其中如果m.state是0的话表示这个锁没有被占用
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 比较慢的方式 (已概述,以便可以内联快速路径)
m.lockSlow()
}
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 接触锁定(讲m.state设置未0)
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}
在不能使用快速通过state获取锁的状态就需要比较复杂的获取方式
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
// 这里是一个循环,直到获取到锁,不然会一直循环下去
// 这意味着调用同一个互斥锁的lock会阻塞
for {
// 不能在饥饿模式下进行自旋,所有权会移交给服务员
// 所以我们还是无法获取互斥体
// 0001&&(0001|0100) => 0001 old:0001表示当前锁被持有 `old&(mutexLocked|mutexStarving) == mutexLocked`
//runtime_canSpin 是否能进入自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 这个是判断:没有被唤醒 && 有排队等待的协程 && 尝试设置通知被唤醒
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
//明上个协程此时已经unlock了,唤醒当前协程
awoke = true
}
//自旋一段时间 这里不太了解为啥要进行自旋,可能是为了等待
runtime_doSpin()
//自旋次数加1
iter++
//这里更新了old的statue,下一次进来就不会在进入当前判断
old = m.state
continue
}
new := old
// 不要试图获取饥饿的互斥体,新的goroutine必须排队
// 原协程已经unlock了,对new的修改为已锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 这里是执行完自旋或者没执行自旋(原协程没有unlock)
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果是饥饿模式,并且已锁的状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
//当前协程被唤醒了,肯定不为0
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 既然当前协程被唤醒了,重置唤醒标志为0
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // 这里代表的是正常模式获取锁成功
}
// 下面的代码是判断是否从饥饿模式恢复正常模式
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 进入阻塞状态
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果当前锁是饥饿模式,但这个gorouting被唤醒
if old&mutexStarving != 0 {
// 减去当前锁的排队
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
//退出饥饿模式
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
当不能通过直接快速判断是否可以解锁,这里就需要按照比较复杂的解锁方式
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//判断当前锁是否饥饿模式,==0代表不是
if new&mutexStarving == 0 {
old := new
for {
//如果没有未排队的协程 或者 有已经被唤醒,得到锁或饥饿的协程,则直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
//唤醒其它协程
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
//释放信号量
runtime_Semrelease(&m.sema, true, 1)
}
}