Go语言之锁

Go语言之锁

Posted by Lerko on July 18, 2020

golang内置锁的分类

类型 实现类
互斥锁 sync.Mutex
读写锁 sync.RWMutex

其中读写锁是通过互斥锁进行实现的

互斥锁源码解析

信号量概念

首先应弄清PV操作的含义:

PV操作由P操作原语和V操作原语组成(原语是不可中断的过程),对信号量进行操作,具体定义如下:

P(S):①将信号量S的值减1,即S=S-1;
       ②如果S³0,则该进程继续执行;否则该进程置为等待状态,排入等待队列。

V(S):①将信号量S的值加1,即S=S+1;
       ②如果S>0,则该进程继续执行;否则释放队列中第一个等待信号量的进程。

PV操作的意义:我们用信号量及PV操作来实现进程的同步和互斥。PV操作属于进程的低级通信。

源码注释

//接口
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

Mutex实现中有两种模式

  1. 正常模式 协程会排队(FIFO[First In First Out 即先进先出])
  2. 饥饿模式 当某个goroutine等待时间太久了就会进入饥饿模式

前者指的是当一个协程获取到锁时,后面的协程会排队(FIFO[First In First Out 即先进先出]),释放锁时会唤醒最早排队的协程,这个协程会和正在CPU上运行的协程竞争锁,但是大概率会失败,为什么呢?因为你是刚被唤醒的,还没有获得CPU的使用权,而CPU正在执行的协程肯定比你有优势,如果这个被唤醒的协程竞争失败,并且超过了1ms,那么就会退回到后者(饥饿模式),这种模式下,该协程在下次获取锁时直接得到,不存在竞争关系,本质是为了防止协程等待锁的时间太长。

const (
   mutexLocked = 1 << iota //1, 0001 最后一位表示当前锁的状态,0未锁,1已锁 
   mutexWoken //2, 0010,倒数第二位表示当前锁是否会被唤醒,0唤醒,1未唤醒
   mutexStarving //4, 0100 倒数第三位表示当前对象是否为饥饿模式,0正常,1饥饿
   mutexWaiterShift = iota //3 从倒数第四位往前的bit表示排队的gorouting数量
   starvationThresholdNs = 1e6 // 饥饿的阈值:1ms

//实现类
type Mutex struct {
	state int32 // 0代表未获取到锁,1代表得到锁,2-2^31表示gorouting排队的数量的
	sema  uint32 // 非负数的信号量,阻塞协程的依据
}

func (m *Mutex) Lock() {
    // 快速的方式:获取解锁的互斥锁
    // 首先判断是否&m.state是否是0如果不是0的话讲mutexLocked赋值到&m.state(这个操作是原子操作,CompareAndSwapInt32提供)
    //其中如果m.state是0的话表示这个锁没有被占用
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// 比较慢的方式 (已概述,以便可以内联快速路径)
	m.lockSlow()
}


func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	// 接触锁定(讲m.state设置未0)
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

在不能使用快速通过state获取锁的状态就需要比较复杂的获取方式

func (m *Mutex) lockSlow() {
	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
    old := m.state
    // 这里是一个循环,直到获取到锁,不然会一直循环下去
    // 这意味着调用同一个互斥锁的lock会阻塞
	for {
		// 不能在饥饿模式下进行自旋,所有权会移交给服务员
        // 所以我们还是无法获取互斥体
        // 0001&&(0001|0100) => 0001  old:0001表示当前锁被持有 `old&(mutexLocked|mutexStarving) == mutexLocked`
        //runtime_canSpin 是否能进入自旋
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// 这个是判断:没有被唤醒 && 有排队等待的协程 && 尝试设置通知被唤醒
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                //明上个协程此时已经unlock了,唤醒当前协程
				awoke = true
            }
            //自旋一段时间 这里不太了解为啥要进行自旋,可能是为了等待
            runtime_doSpin()
            //自旋次数加1
            iter++
            //这里更新了old的statue,下一次进来就不会在进入当前判断
			old = m.state
			continue
		}
		new := old
        // 不要试图获取饥饿的互斥体,新的goroutine必须排队
        // 原协程已经unlock了,对new的修改为已锁
		if old&mutexStarving == 0 {
			new |= mutexLocked
        }
        // 这里是执行完自旋或者没执行自旋(原协程没有unlock)
		if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}
		// 如果是饥饿模式,并且已锁的状态
		if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}
		if awoke {
			//当前协程被唤醒了,肯定不为0
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
            }
            // 既然当前协程被唤醒了,重置唤醒标志为0
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // 这里代表的是正常模式获取锁成功
			}
			// 下面的代码是判断是否从饥饿模式恢复正常模式 
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
            }
            // 进入阻塞状态 
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
            // 如果当前锁是饥饿模式,但这个gorouting被唤醒
			if old&mutexStarving != 0 {
				// 减去当前锁的排队
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if !starving || old>>mutexWaiterShift == 1 {
					//退出饥饿模式
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

当不能通过直接快速判断是否可以解锁,这里就需要按照比较复杂的解锁方式

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
    }
    //判断当前锁是否饥饿模式,==0代表不是
	if new&mutexStarving == 0 {
		old := new
		for {
			//如果没有未排队的协程 或者 有已经被唤醒,得到锁或饥饿的协程,则直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			//唤醒其它协程
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else {
		//释放信号量
		runtime_Semrelease(&m.sema, true, 1)
	}
}