go语言同步教程之条件变量【澳门新葡亰赌995577】

Go的标准库中有一个类型叫条件变量:sync.Cond。这种类型与互斥锁和读写锁不同,它不是开箱即用的,它需要与互斥锁组合使用:

前言:

在前面并发性能对比的文章中,我们可以看到Golang处理大并发的能力十分强劲,而且开发也特别方便,只需要用go关键字即可开启一个新的协程。

但当多个goroutine同时进行处理的时候,就会遇到同时抢占一个资源的情况(并发都会遇到的问题),所以我们希望某个goroutine等待另一个goroutine处理完某一个步骤之后才能继续。sync包就是为了让goroutine同步而出现的。当然还可以使用channel实现,这个后面会介绍到。

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
 return &Cond{L: l}
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
 Lock()
 Unlock()
}

锁:

锁有两种:互斥锁和读写锁

互斥锁:
当数据被加锁了之后,除次外的其他协程不能对数据进行读操作和写操作。
这个当然能解决并发程序对资源的操作。但是,效率上是个问题,因为当加锁后,其他协程只有等到解锁后才能对数据进行读写操作。

读写锁:
读数据的时候上读锁,写数据的时候上写锁。有写锁的时候,数据不可读不可写。有读锁的时候,数据可读,不可写。

两种锁的使用方式相同,这里就只列出互斥锁的代码:

package mainimport (  "sync"  "time"  "fmt")var num = 0func main ()  {  mu := &sync.Mutex{}  for i:=0;i<10000;i++ {    go func(){      mu.Lock()      defer mu.Unlock()      num += 1    }()  }  time.Sleep(time.Second)  fmt.Println("num:", num)  // 如果不加锁这里的num的值会是一个随机数而不是10000}

通过使用 NewCond 函数可以返回 *sync.Cond 类型的结果, *sync.Cond
我们主要操作其三个方法,分别是:

Once:

有的时候,我们启动多个相同goroutine,但是里面的某个操作我只希望被执行一次,这个时候Once就上场了。

package mainimport (  "fmt"  "sync"  "time")func main() {  var once sync.Once  one := func() {fmt.Println("just once")  }  for i := 0; i < 10; i++ {go func {  once.Do   // 只是被执行一次}  }  time.Sleep(time.Millisecond*200)}

wait():等待通知

Signal():单播通知

Broadcast():广播通知

WaitGroup:

当某个操作或是某个goroutine需要等待一批goroutine执行完毕以后才继续执行,那么这种多线程(go里面说的线程就是goroutine)等待的问题就可以使用WaitGroup了。

代码如下:

package mainimport ("sync""fmt""time")var waitGroup sync.WaitGroupfunc main () {for i := 0; i < 10; i++ {waitGroup.Add  // 添加需要等待goroutine的数量go func() {fmt.Printlntime.Sleep(time.Second)waitGroup.Done() // 减少需要等待goroutine的数量 相当于Add}waitGroup.Wait()  // 执行阻塞,直到所有的需要等待的goroutine数量变成0fmt.Println}

具体的函数说明如下:

Cond:

sync.Cond是用来控制某个条件下,goroutine进入等待时期,等待信号到来,然后重新启动。

代码如下:

package mainimport ("fmt""sync""time")var locker = new(sync.Mutex)var cond = sync.NewCondfunc test {cond.L.Lock() //获取锁cond.Wait()//等待通知 暂时阻塞fmt.Printlntime.Sleep(time.Second * 1)cond.L.Unlock()//释放锁}func main() {for i := 0; i < 40; i++ {go test}fmt.Println("start all")time.Sleep(time.Second * 3)fmt.Println("signal1")cond.Signal()   // 下发一个通知随机给已经获取锁的goroutinetime.Sleep(time.Second * 3)fmt.Println("signal2")cond.Signal()// 下发第二个通知随机给已经获取锁的goroutinetime.Sleep(time.Second * 1)  // 在广播之前要等一会,让所有线程都在wait状态fmt.Println("broadcast")cond.Broadcast()//下发广播给所有等待的goroutinetime.Sleep(time.Second * 60)}

上面代码有几个要点要特别说明一下:

1.每个Cond都必须有个与之关联的锁 //见第9行

2.协程方法里面一开始/结束都必须加/解锁 //见第12行和16行

3.
cond.Wait()时会自动解锁,当被唤醒时,又会加上锁。所以第2点提到必须加/解锁。

// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
//  c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}

Channel

channel不仅可以用来goroutine之间的通信,也可以使goroutine同步完成协作。这点主要基于从channel取数据的时候,会阻塞当前goroutine这个特性。示例代码如下:

package mainimport ("fmt""time")var chan1 = make(chan string, 512)var arr1 = []string{"qq","ww","ee","rr","tt"}func chanTest1() {for _, v := range arr1 {chan1 <- v}close // 关闭channel}func chanTest2() {for {getStr, ok := <- chan1  // 阻塞,直到chan1里面有数据if !ok {   // 判断channel是否关闭或者为空return}fmt.Println // 按数组顺序内容输出}}func main () {go chanTest1()go chanTest2()time.Sleep(time.Millisecond*200)}

条件变量sync.Cond本质上是一些正在等待某个条件的线程的同步机制。

sync.Cond 主要实现一个条件变量,假如 goroutine A
执行前需要等待另外的goroutine B 的通知,那边处于等待的goroutine A
会保存在一个通知列表,也就是说需要某种变量状态的goroutine A
将会等待/Wait在那里,当某个时刻状态改变时负责通知的goroutine B
通过对条件变量通知的方式(Broadcast,Signal)来通知处于等待条件变量的goroutine
A, 这样便可首先一种“消息通知”的同步机制。

以go的http处理为例,在Go的源码中http模块server部分源码中所示,当需要处理一个新的连接的时候,若连接conn是实现自*tls.Conn的情况下,会进行相关的客户端与服务端的“握手”处理Handshake(),
入口代码如下:

if tlsConn, ok := c.rwc.(*tls.Conn); ok {
  if d := c.server.ReadTimeout; d != 0 {
   c.rwc.SetReadDeadline(time.Now().Add(d))
  }
  if d := c.server.WriteTimeout; d != 0 {
   c.rwc.SetWriteDeadline(time.Now().Add(d))
  }
  if err := tlsConn.Handshake(); err != nil {
   c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
   return
  }
  c.tlsState = new(tls.ConnectionState)
  *c.tlsState = tlsConn.ConnectionState()
  if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
   if fn := c.server.TLSNextProto[proto]; fn != nil {
    h := initNPNRequest{tlsConn, serverHandler{c.server}}
    fn(c.server, tlsConn, h)
   }
   return
  }
 }

其中的Handshake函数代码通过使用条件变量的方式来处理新连接握手调用的同步问题:

func (c *Conn) Handshake() error {
 c.handshakeMutex.Lock()
 defer c.handshakeMutex.Unlock()

 for {
  if err := c.handshakeErr; err != nil {
   return err
  }
  if c.handshakeComplete {
   return nil
  }
  if c.handshakeCond == nil {
   break
  }

  c.handshakeCond.Wait()
 }

 c.handshakeCond = sync.NewCond(&c.handshakeMutex)
 c.handshakeMutex.Unlock()

 c.in.Lock()
 defer c.in.Unlock()

 c.handshakeMutex.Lock()

 if c.handshakeErr != nil || c.handshakeComplete {
  panic("handshake should not have been able to complete after handshakeCond was set")
 }

 if c.isClient {
  c.handshakeErr = c.clientHandshake()
 } else {
  c.handshakeErr = c.serverHandshake()
 }
 if c.handshakeErr == nil {
  c.handshakes++
 } else {
  c.flush()
 }

 if c.handshakeErr == nil && !c.handshakeComplete {
  panic("handshake should have had a result.")
 }

 c.handshakeCond.Broadcast()
 c.handshakeCond = nil

 return c.hand