除了《Kubernetes GO》系列之外,对于golang相关知识,同时准备了《Golang 漫谈》以增雅趣,不足之处,万望海涵,在此特别感谢雨痕的Golang 源码剖析。
Cond在client-go等库中广泛使用,该文对此做知识铺垫。
Cond在Locker的基础上增加的一个消息通知的功能。但是它只能按照顺序去使一个goroutine解除阻塞。
条件变量是构建在一个基础锁上的同步原语,Golang Crondition位于sync包中,用于goroutine需要关注特定的条件的场景。该文章主要介绍,以下内容:
Cond同步机制
Cond模块定义:
1 2 3 4 5 6 7 8 9 10
| type Cond struct { noCopy noCopy
L Locker
notify notifyList checker copyChecker } func NewCond(l Locker) *Cond
|
Cond需要指定一个Locker,通常是一个Mutex或RWMutex。另外,Cond还定义了一下几个核心方法:
1 2 3
| func (c *Cond) Broadcast() func (c *Cond) Signal() func (c *Cond) Wait()
|
那外部传入的Locker,是对wait,Signal,Broadcast进行保护。防止发送信号的时候,不会有新的goroutine进入wait。在wait逻辑完成前,不会有新的事件发生。
注意:在调用Signal,Broadcast之前,应确保目标进入Wait阻塞状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package main
import ( "fmt" "sync" "time" )
func main() { wait := sync.WaitGroup{} locker := new(sync.Mutex) cond := sync.NewCond(locker)
for i := 0; i < 3; i++ { go func(i int) { defer wait.Done() wait.Add(1) cond.L.Lock() fmt.Println("Waiting start...") cond.Wait() fmt.Println("Waiting end...") cond.L.Unlock() fmt.Println("Goroutine run. Number:", i) }(i) } time.Sleep(2e9) cond.L.Lock() cond.Signal() cond.L.Unlock()
time.Sleep(2e9) cond.L.Lock() cond.Signal() cond.L.Unlock()
time.Sleep(2e9) cond.L.Lock() cond.Signal() cond.L.Unlock() wait.Wait() }
|
输出结果:
1 2 3 4 5 6 7 8 9
| Waiting start... Waiting start... Waiting start... Waiting end... Goroutine run. Number: 0 Waiting end... Goroutine run. Number: 1 Waiting end... Goroutine run. Number: 2
|
可以看出来,每执行一次Signal就会执行一个goroutine。如果想让所有的goroutine执行,那么将所有的Signal换成一个Broadcast方法可以。
1.1 Broadcase、Signal
唤醒因wait condition而挂起goroutine,区别是Signal只唤醒一个,而Broadcast唤醒所有。允许调用者获取基础锁Locker之后再调用唤醒,但非必需。
1.2 Wait
必须获取该锁之后才能调用Wait()方法,Wait方法在调用时会释放底层锁Locker,并且将当前goroutine挂起,直到另一个goroutine执行Signal或者Broadcase,该goroutine才有机会重新唤醒,并尝试获取Locker,完成后续逻辑。
1.3 实例
生产者消费者问题是条件原语的一个典型例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package main
import ( "fmt" "math/rand" "sync" "time" )
var locker = new(sync.Mutex) var cond = sync.NewCond(locker)
var capacity = 10 var consumerNum = 3 var producerNum = 5
func producer(out chan<- int) { for i := 0; i < producerNum; i++ { go func(nu int) { for { cond.L.Lock() for len(out) == capacity { fmt.Println("Capacity Full, stop Produce") cond.Wait() } num := rand.Intn(100) out <- num fmt.Printf("Produce %d produce: num %d\n", nu, num) cond.L.Unlock() cond.Signal()
time.Sleep(time.Second) } }(i) } }
func consumer(in <-chan int) { for i := 0; i < consumerNum; i++ { go func(nu int) {
for { cond.L.Lock() for len(in) == 0 { fmt.Println("Capacity Empty, stop Consume") cond.Wait() } num := <-in fmt.Printf("Goroutine %d: consume num %d\n", nu, num) cond.L.Unlock() time.Sleep(time.Millisecond * 500) cond.Signal() } }(i) } }
func main() {
rand.Seed(time.Now().UnixNano())
quit := make(chan bool) product := make(chan int, capacity)
producer(product) consumer(product)
<-quit }
|
参考资料