go channel

模型

三部分组成:

发送等待队列
接收等待队列
管道 缓存区
发送等待队列
接收等待队列
管道 缓存区
发送等待队列 接收等待队列 管道 缓存区

定义

runtimechan.go

type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
elemtype *_type // element type
// 上面的这几个组成了一个环形缓存区
closed uint32 // 关闭状态
// 下面4个组成了2个队列 g的
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
lock mutex // 锁,用于在操作 channel的 元素时候,保证并发安全
}
type hchan struct {
  qcount   uint          
  dataqsiz uint          
  buf      unsafe.Pointer 
  elemsize uint16
  elemtype *_type // element type
    // 上面的这几个组成了一个环形缓存区

  closed   uint32 // 关闭状态

    // 下面4个组成了2个队列 g的
  sendx    uint   // send index
  recvx    uint   // receive index
  recvq    waitq  // list of recv waiters
  sendq    waitq  // list of send waiters

  // lock protects all fields in hchan, as well as several
  lock mutex  // 锁,用于在操作 channel的 元素时候,保证并发安全 
}
type hchan struct { qcount uint dataqsiz uint buf unsafe.Pointer elemsize uint16 elemtype *_type // element type // 上面的这几个组成了一个环形缓存区 closed uint32 // 关闭状态 // 下面4个组成了2个队列 g的 sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several lock mutex // 锁,用于在操作 channel的 元素时候,保证并发安全 }

特殊缓存区

qcount 已经存储的个数
dataqsiz 环形队列缓存的容量,即允许缓存的消息最大个数
buf 指向这个缓存区的地址
elemsize 元素的大小
elemtype 元素的类型
qcount  已经存储的个数
dataqsiz  环形队列缓存的容量,即允许缓存的消息最大个数
buf   指向这个缓存区的地址
elemsize 元素的大小
elemtype  元素的类型
qcount 已经存储的个数 dataqsiz 环形队列缓存的容量,即允许缓存的消息最大个数 buf 指向这个缓存区的地址 elemsize 元素的大小 elemtype 元素的类型

设计成这样,主要目的不需要gc来清理,因为环形机构,会自动把删除数据内存给占了。

环形缓存可以大幅降低GC的开销

两个队列

这里还要看下 waitq 的定义

type waitq struct {
first *sudog
last *sudog
}
// 在sema中也有用到,就g结构体的封装
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // 接受参数的地址
// 当是接收g时候,如果有缓存中有数据,直接把数据拷贝到这个地址
 type waitq struct {
    first *sudog
    last  *sudog
 }

// 在sema中也有用到,就g结构体的封装
type sudog struct {
        g *g 
  next *sudog
  prev *sudog
  elem unsafe.Pointer // 接受参数的地址 
    // 当是接收g时候,如果有缓存中有数据,直接把数据拷贝到这个地址
type waitq struct { first *sudog last *sudog } // 在sema中也有用到,就g结构体的封装 type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer // 接受参数的地址 // 当是接收g时候,如果有缓存中有数据,直接把数据拷贝到这个地址

}

注意:这里的sendx 并不是指向发送队列中的g,而且发送队列应该写入环形缓存区的index,
同理,recvx也是,指向接受数据的g,应该从缓冲区的那个index取数据

互斥锁

lock mutex
lock mutex  
lock mutex

互斥锁并不是排队发送/接收数据

不是让发送和接收队列来排队的,这些发送和接收数据的队列,休眠也不是在锁的sema里

互斥锁保护的hchan结构体本身

所以, Channel并不是无锁的

状态值

closed uint32 // 关闭状态
  closed   uint32 // 关闭状态
closed uint32 // 关闭状态

0为开启、1为关闭

当一个关闭的channel,再往里写或者重复关闭、就会panic。但是可以读。
当一个关闭的channel,再往里写或者重复关闭、就会panic。但是可以读。
当一个关闭的channel,再往里写或者重复关闭、就会panic。但是可以读。

发送数据

c<- 关键字是一个语法糖
编译阶段,会把C<- 转化为 ruintime.chansend1
chansend1 会调用 chansend0 方法
c<- 关键字是一个语法糖

编译阶段,会把C<- 转化为 ruintime.chansend1

chansend1 会调用 chansend0 方法
c<- 关键字是一个语法糖 编译阶段,会把C<- 转化为 ruintime.chansend1 chansend1 会调用 chansend0 方法

直接发送 (已经有接收队列在等)

发送数据前,己经有G在休眠等待接收
此时缓存肯定是空的,不用考虑缓存
将数据直接拷贝给G的接收变量,唤醒G
发送数据前,己经有G在休眠等待接收

此时缓存肯定是空的,不用考虑缓存

将数据直接拷贝给G的接收变量,唤醒G
发送数据前,己经有G在休眠等待接收 此时缓存肯定是空的,不用考虑缓存 将数据直接拷贝给G的接收变量,唤醒G

实现

// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
// 部分源码
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//有加锁,如果这时候,再来一个g也发送,就会休眠去 sema队列了
lock(&c.lock)
if c.closed != 0 { // 如果已经关闭,就会报错,上面讲过给一个关闭的chan发送会panic
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 从接收队列里面拿一个 g
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
// 上面的注释讲的很清楚,直接把值给接收者,绕过 channel的buffer
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 和之前讲的 sudog的elem对上了
if sg.elem != nil {
// 直接把数据拷贝到 接收者的 elem中
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒这个协程 接收者的g
goready(gp, skip+1)
}
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}
// 部分源码
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    //有加锁,如果这时候,再来一个g也发送,就会休眠去 sema队列了
  lock(&c.lock)
  if c.closed != 0 { // 如果已经关闭,就会报错,上面讲过给一个关闭的chan发送会panic
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }
    // 从接收队列里面拿一个 g
  if sg := c.recvq.dequeue(); sg != nil {
    // Found a waiting receiver. We pass the value we want to send
    // directly to the receiver, bypassing the channel buffer (if any).
        // 上面的注释讲的很清楚,直接把值给接收者,绕过 channel的buffer
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
  }
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      //  和之前讲的 sudog的elem对上了
      if sg.elem != nil {
        // 直接把数据拷贝到 接收者的 elem中
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
    // 唤醒这个协程 接收者的g
  goready(gp, skip+1)
}
// entry point for c <- x from compiled code. func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) } // 部分源码 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { //有加锁,如果这时候,再来一个g也发送,就会休眠去 sema队列了 lock(&c.lock) if c.closed != 0 { // 如果已经关闭,就会报错,上面讲过给一个关闭的chan发送会panic unlock(&c.lock) panic(plainError("send on closed channel")) } // 从接收队列里面拿一个 g if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). // 上面的注释讲的很清楚,直接把值给接收者,绕过 channel的buffer send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } } func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 和之前讲的 sudog的elem对上了 if sg.elem != nil { // 直接把数据拷贝到 接收者的 elem中 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } // 唤醒这个协程 接收者的g goready(gp, skip+1) }

步骤:

从队列里取出一个等待接收的G
将数据直接拷贝到接收变量中
唤醒G,接收者的g
从队列里取出一个等待接收的G
将数据直接拷贝到接收变量中
唤醒G,接收者的g
从队列里取出一个等待接收的G 将数据直接拷贝到接收变量中 唤醒G,接收者的g

放入缓存

没有G在休眠等待,但是有缓存空间
将数据放入缓存
没有G在休眠等待,但是有缓存空间
将数据放入缓存
没有G在休眠等待,但是有缓存空间 将数据放入缓存

实现

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 上面是直接发送的源码,被截除
// 当缓存队列存入的数量 小于 缓存的容量,就是还有缓存空间
if c.qcount < c.dataqsiz {
// 缓存区接受数据的地址
qp := chanbuf(c, c.sendx)
// 将数据拷贝过去
typedmemmove(c.elemtype, qp, ep)
// 指示下一个发送数据,存在那个缓冲区,这里有个逻辑下面讲
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素个数加1,释放锁并返回
c.qcount++
unlock(&c.lock)
return true
}
}
qp := chanbuf(c, c.sendx)
// chanbuf(c, i) is pointer to the i'th slot in the buffer
// 返回缓冲区的 第几个槽
// 写入缓存区
c.sendx++
意味着这里使用 sendx 来指引下一个发送的数据,写到几个槽,
所以才有下面,如果满了,就从0开始,形成环形
if c.sendx == c.dataqsiz {
c.sendx = 0
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 上面是直接发送的源码,被截除
    // 当缓存队列存入的数量 小于 缓存的容量,就是还有缓存空间
  if c.qcount < c.dataqsiz {
        // 缓存区接受数据的地址
    qp := chanbuf(c, c.sendx)
    // 将数据拷贝过去
    typedmemmove(c.elemtype, qp, ep)
        
        // 指示下一个发送数据,存在那个缓冲区,这里有个逻辑下面讲
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
        // 元素个数加1,释放锁并返回
    c.qcount++
    unlock(&c.lock)
    return true
  }
 }

    qp := chanbuf(c, c.sendx)
    // chanbuf(c, i) is pointer to the i'th slot in the buffer
    // 返回缓冲区的 第几个槽
    // 写入缓存区
    c.sendx++
    意味着这里使用  sendx 来指引下一个发送的数据,写到几个槽,
    所以才有下面,如果满了,就从0开始,形成环形
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 上面是直接发送的源码,被截除 // 当缓存队列存入的数量 小于 缓存的容量,就是还有缓存空间 if c.qcount < c.dataqsiz { // 缓存区接受数据的地址 qp := chanbuf(c, c.sendx) // 将数据拷贝过去 typedmemmove(c.elemtype, qp, ep) // 指示下一个发送数据,存在那个缓冲区,这里有个逻辑下面讲 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 元素个数加1,释放锁并返回 c.qcount++ unlock(&c.lock) return true } } qp := chanbuf(c, c.sendx) // chanbuf(c, i) is pointer to the i'th slot in the buffer // 返回缓冲区的 第几个槽 // 写入缓存区 c.sendx++ 意味着这里使用 sendx 来指引下一个发送的数据,写到几个槽, 所以才有下面,如果满了,就从0开始,形成环形 if c.sendx == c.dataqsiz { c.sendx = 0 }

整个逻辑比较清晰:

获取可存入的缓存地址
存入数据
维护索引
  获取可存入的缓存地址
  存入数据
  维护索引
获取可存入的缓存地址 存入数据 维护索引

休眠等待

没有G在休眠等待,而旦没有缓存或满了
自己进入发送队列,休眠等待
没有G在休眠等待,而旦没有缓存或满了

自己进入发送队列,休眠等待
没有G在休眠等待,而旦没有缓存或满了 自己进入发送队列,休眠等待

实现

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 接着 直接放入缓存的代码
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog() // 把自己组装成一个 sudog结构体
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 要发送的数据 也是放到这个 elem中的
mysg.waitlink = nil
mysg.g = gp // sudug 的g等于自己的g结构体
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // 进入waitReasonChanSend 休眠
// 然后去 gopark 休眠了,这个方法在将 协程切换的时候,讲过
// 协程到这里就 休眠了,不继续执行了,直到被唤醒。
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  
  // 接着 直接放入缓存的代码

  // Block on the channel. Some receiver will complete our operation for us.
  gp := getg() 
  mysg := acquireSudog() // 把自己组装成一个 sudog结构体
  mysg.releasetime = 0
  if t0 != 0 {
    mysg.releasetime = -1
  }
  mysg.elem = ep  // 要发送的数据 也是放到这个  elem中的
  mysg.waitlink = nil
  mysg.g = gp   // sudug 的g等于自己的g结构体
  mysg.isSelect = false
  mysg.c = c
  gp.waiting = mysg
  gp.param = nil
  c.sendq.enqueue(mysg)
  
  gp.parkingOnChan.Store(true)
  gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // 进入waitReasonChanSend 休眠
  // 然后去 gopark 休眠了,这个方法在将 协程切换的时候,讲过
 // 协程到这里就 休眠了,不继续执行了,直到被唤醒。
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 接着 直接放入缓存的代码 // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() // 把自己组装成一个 sudog结构体 mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep // 要发送的数据 也是放到这个 elem中的 mysg.waitlink = nil mysg.g = gp // sudug 的g等于自己的g结构体 mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) // 进入waitReasonChanSend 休眠 // 然后去 gopark 休眠了,这个方法在将 协程切换的时候,讲过 // 协程到这里就 休眠了,不继续执行了,直到被唤醒。 }

步骤:

把自己包装成sudog
sudog放入sendq队列
休眠并解锁
被唤醒后,数据已经被取走,维护其他数据 (下面讲 接收时候讲)
    把自己包装成sudog

    sudog放入sendq队列

    休眠并解锁

    被唤醒后,数据已经被取走,维护其他数据 (下面讲 接收时候讲)
把自己包装成sudog sudog放入sendq队列 休眠并解锁 被唤醒后,数据已经被取走,维护其他数据 (下面讲 接收时候讲)

发送小结

  1. 编译阶段,会把<-转化为 runtime.chansend10

  2. 直接发送时,将数据直接拷贝到目标变量

  3. 放入缓存时,将数据放入环形缓存,成功返回

  4. 休眠等待时,将自己包装后放入sendp, 休眠

接收

<-c 关键字

<-c 关键字是一个语法糖
编译阶段,i<-C转化为 runtime.chanrecv()
编译阶段,i, ok<-c转化为 runtime.chanrecv()
最后会调用 chanrecv() 方法
<-c 关键字是一个语法糖

编译阶段,i<-C转化为 runtime.chanrecv()

编译阶段,i, ok<-c转化为 runtime.chanrecv()

最后会调用 chanrecv() 方法
<-c 关键字是一个语法糖 编译阶段,i<-C转化为 runtime.chanrecv() 编译阶段,i, ok<-c转化为 runtime.chanrecv() 最后会调用 chanrecv() 方法

无缓存区、有发送协程在等待

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil { // 如果读一个 block 为true的 channel ,协程会直接休眠,
// 正常读channel这个 block都是 true
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// 中间删了一段 block 为 false的情况
lock(&c.lock)
if c.closed != 0 { // 如果channel已经关闭
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 有数据 放回数据
}
return true, false // 没数据,返回一个 false
}
} else {
if sg := c.sendq.dequeue(); sg != nil { // 如果发送队列中有g
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 调用这个 recv方法
return true, true
}
}
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 { // 无缓存区
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep) // 直接把 传过来的g 的数据取走
}
}
// 给g跟新下参数
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 唤醒g,这时候,发送的数据已经被取走了
}
func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
src := sg.elem
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
memmove(dst, src, t.Size_)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  
  if c == nil {  // 如果读一个 block 为true的 channel ,协程会直接休眠,
                      // 正常读channel这个 block都是 true
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  // 中间删了一段  block 为 false的情况

  lock(&c.lock)

  if c.closed != 0 {  // 如果channel已经关闭
    if c.qcount == 0 {
      if raceenabled {
        raceacquire(c.raceaddr())
      }
      unlock(&c.lock)
      if ep != nil {
        typedmemclr(c.elemtype, ep)  // 有数据 放回数据
      }
      return true, false  // 没数据,返回一个 false
    }
  } else {
    if sg := c.sendq.dequeue(); sg != nil { // 如果发送队列中有g
      
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)  // 调用这个 recv方法
      return true, true
    }
  }
}

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 { // 无缓存区
    if ep != nil {  
      // copy data from sender
      recvDirect(c.elemtype, sg, ep) // 直接把 传过来的g 的数据取走
    }
  }
    
    // 给g跟新下参数
        sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    goready(gp, skip+1) // 唤醒g,这时候,发送的数据已经被取走了
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
  src := sg.elem
  typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_)
  memmove(dst, src, t.Size_)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c == nil { // 如果读一个 block 为true的 channel ,协程会直接休眠, // 正常读channel这个 block都是 true if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) throw("unreachable") } // 中间删了一段 block 为 false的情况 lock(&c.lock) if c.closed != 0 { // 如果channel已经关闭 if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) // 有数据 放回数据 } return true, false // 没数据,返回一个 false } } else { if sg := c.sendq.dequeue(); sg != nil { // 如果发送队列中有g recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 调用这个 recv方法 return true, true } } } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // 无缓存区 if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) // 直接把 传过来的g 的数据取走 } } // 给g跟新下参数 sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) // 唤醒g,这时候,发送的数据已经被取走了 } func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) memmove(dst, src, t.Size_) }

步骤:

判断有G 在发送队列等待,进入recv()
判断此 Channel 无缓存
直接从等待的 G 中取走数据,唤醒G
  判断有G 在发送队列等待,进入recv()
  判断此 Channel 无缓存
  直接从等待的 G 中取走数据,唤醒G
判断有G 在发送队列等待,进入recv() 判断此 Channel 无缓存 直接从等待的 G 中取走数据,唤醒G

这里有两个地方要注意,代码中也标记了:

如果channel为nil,再去读会直接休眠阻塞。这里只的是 block为 true的读,block为false的情况后面讲,正常channe都是true

如果channel close了, 去读有值返回值,没值返回 false

有等待的g,缓存区满了

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 上面是缓存区为0的情况
qp := chanbuf(c, c.recvx) // 取缓存区的数据,用的recvx 标记,和开头的总结对应上了
// ep 就是 a <-c ,这个a的地址,如果a是nil,说明传递的值,没有用,只是要个时机
// ep may be nil, in which case received data is ignored.
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送队列中休眠的这个g的数据 拷贝到了 缓存去
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++ // 取数据的地址增加
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
// 唤醒了这个 发送数据的g,因为这个g的数据已经放到了缓存区,不用休眠等待了
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 上面是缓存区为0的情况
        qp := chanbuf(c, c.recvx) // 取缓存区的数据,用的recvx 标记,和开头的总结对应上了
        
        // ep 就是 a <-c ,这个a的地址,如果a是nil,说明传递的值,没有用,只是要个时机
    // ep may be nil, in which case received data is ignored.
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
        // 将发送队列中休眠的这个g的数据 拷贝到了 缓存去
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++ // 取数据的地址增加
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz  
        
        // 唤醒了这个 发送数据的g,因为这个g的数据已经放到了缓存区,不用休眠等待了
        sg.elem = nil
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      sg.success = true
      if sg.releasetime != 0 {
        sg.releasetime = cputicks()
      }
      goready(gp, skip+1)  

}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 上面是缓存区为0的情况 qp := chanbuf(c, c.recvx) // 取缓存区的数据,用的recvx 标记,和开头的总结对应上了 // ep 就是 a <-c ,这个a的地址,如果a是nil,说明传递的值,没有用,只是要个时机 // ep may be nil, in which case received data is ignored. if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 将发送队列中休眠的这个g的数据 拷贝到了 缓存去 // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ // 取数据的地址增加 if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz // 唤醒了这个 发送数据的g,因为这个g的数据已经放到了缓存区,不用休眠等待了 sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }

步骤:

接收数据前,已经有G 在休眠等待发送
而且这个 Channel 有缓存
从缓存取走一个数据
将休眠G 的数据放进缓存,唤醒G
  接收数据前,已经有G 在休眠等待发送

  而且这个 Channel 有缓存

  从缓存取走一个数据

  将休眠G 的数据放进缓存,唤醒G
接收数据前,已经有G 在休眠等待发送 而且这个 Channel 有缓存 从缓存取走一个数据 将休眠G 的数据放进缓存,唤醒G

接收缓存,没有发送g在等待

直接从 缓存区拿数据走就行

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx) // 取出数据
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 上面讲过 也许接收的变量为空
if ep != nil {
typedmemmove(c.elemtype, ep, qp) // 数据拷贝过去
}
typedmemclr(c.elemtype, qp)//clr 是 clear的意思 清理缓存区已经取走的这个数据的空间
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
 func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

    if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx) // 取出数据
    if raceenabled {
      racenotify(c, c.recvx, nil)
    }
        // 上面讲过 也许接收的变量为空
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp) // 数据拷贝过去
    }
    typedmemclr(c.elemtype, qp)//clr 是 clear的意思 清理缓存区已经取走的这个数据的空间
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) // 取出数据 if raceenabled { racenotify(c, c.recvx, nil) } // 上面讲过 也许接收的变量为空 if ep != nil { typedmemmove(c.elemtype, ep, qp) // 数据拷贝过去 } typedmemclr(c.elemtype, qp)//clr 是 clear的意思 清理缓存区已经取走的这个数据的空间 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } }

步骤:

判断没有 G 在发送队列等待
判断此 Channel 有缓存
从缓存中取走一个数据
  判断没有 G 在发送队列等待

  判断此 Channel 有缓存

  从缓存中取走一个数据
判断没有 G 在发送队列等待 判断此 Channel 有缓存 从缓存中取走一个数据

接收阻塞

比较多用在不让协程退出,除非收到 context的cancel消息等。
比较多用在不让协程退出,除非收到 context的cancel消息等。
比较多用在不让协程退出,除非收到 context的cancel消息等。

没有 G 在休眠等待,而旦没有缓存或缓存空

自己进入接收队列,休眠等待

代码实现

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog() //把自己包装成 sudog
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep // 接收数据的地址,拷贝到了elem
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 加入了等待接收队列
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) // 休眠
}
  func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

  // no sender available: block on this channel.
    gp := getg()  
    mysg := acquireSudog() //把自己包装成 sudog

    mysg.releasetime = 0
    if t0 != 0 {
      mysg.releasetime = -1
    }
    mysg.elem = ep // 接收数据的地址,拷贝到了elem
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg) // 加入了等待接收队列

    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) // 休眠
  }
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // no sender available: block on this channel. gp := getg() mysg := acquireSudog() //把自己包装成 sudog mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep // 接收数据的地址,拷贝到了elem mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // 加入了等待接收队列 gp.parkingOnChan.Store(true) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) // 休眠 }

这里和上面的发送时候,已经有等待接收的g,对上了。

步骤:

判断没有 G 在发送队列等待
判断此 Channel 无缓存
将自己包装成 sudog
sudog 放入接收等待队列,休眠
唤醒时,发送的 G 已经把数据拷贝到位
判断没有 G 在发送队列等待

判断此 Channel 无缓存

将自己包装成 sudog

sudog 放入接收等待队列,休眠

唤醒时,发送的 G 已经把数据拷贝到位
判断没有 G 在发送队列等待 判断此 Channel 无缓存 将自己包装成 sudog sudog 放入接收等待队列,休眠 唤醒时,发送的 G 已经把数据拷贝到位

接收总结:

编译阶段,<-C 会转化为 chanrecv()
有等待的G,旦无缓存时,从G 接收
有等待的 G,且有缓存时,从缓存接收
无等待的 G,且缓存有数据,从缓存接收
无等待的 G,且缓存无数据,等待喂数据
编译阶段,<-C 会转化为 chanrecv()

有等待的G,旦无缓存时,从G 接收

有等待的 G,且有缓存时,从缓存接收

无等待的 G,且缓存有数据,从缓存接收

无等待的 G,且缓存无数据,等待喂数据
编译阶段,<-C 会转化为 chanrecv() 有等待的G,旦无缓存时,从G 接收 有等待的 G,且有缓存时,从缓存接收 无等待的 G,且缓存有数据,从缓存接收 无等待的 G,且缓存无数据,等待喂数据

看上面代码时候,讲过一般使用时候,那个blocktrue的,什么情况下blockfalse,下篇聊。

© 版权声明
THE END
支持一下吧
点赞13 分享
评论 共1条
头像
请文明发言!
提交
头像

昵称

取消
昵称表情代码快捷回复

    暂无评论内容