Go Channel 的实现
关于 channel 锁和 cas 的实现
Channel 在运行时的内部表示为 runtime.hchan
,该结构体中包含了一个用于保护成员变量的互斥锁,从某种程度上说,Channel 是一个用于同步和通信的有锁队列。
锁会导致休眠和唤醒,带来额外的上下文切换,如果临界区过小,加锁解锁会导致额外的开销。Go 语言社区在 2014 年也提出了无锁 Channel 的实现方案,该方案将 Channel 分为了以下三种类型
- 同步 Channel:不需要缓冲区,发送方直接将数据交给(handoff)接收方
- 异步 Channel:基于环形缓存的传统生产者消费者模型
chan struct{}
类型的异步 Channel:struct{}
类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)语义
这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但在实际基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。
目前通过 CAS 实现的无锁 Channel 没有提供 FIFO 的特性,所以该提案暂时被搁浅
创建 chan
Go 中所有 Channel 的创建都会使用 make
关键字
这一阶段对 make
关键字的缓冲区大小做检查,如果不向 make
传递表示缓冲区大小的参数,则会设置 0 参数,表示当前 chan 不存在缓冲区
发送数据
当 channel 发送数据时,如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报 send on closed channel
错误并终止程序。除此之外还有三种场景
- 直接发送:当存在等待的接受者时,通过
runtime.send
直接将数据发送给阻塞的接受者 - 写缓冲区:当缓冲区存在空余空间时,建发送的数据写入 Channel 的缓冲区
- 阻塞发送:当不存在缓冲区或缓冲区已经满时,等待其他 Goroutine 从 Channel 接收数据
直接发送
如果目标 Channel 没有被关闭而且处于已经有处于读等待的 Goroutine,那么 runtime.chansend
函数会从接收队列 recvq
中取出最先陷入等待的 Goroutine 并直接向他发送数据
runtime.send
执行分为两个部分:
- 调用
runtime.sendDirect
函数将发送的数据直接拷贝到x = <-c
表达式中变量x
所在的内存地址上 - 调用
runtime.goready
将等待接收数据的 Goroutine 标记为可运行状态Grunnable
,并把改 Goroutine 放到发送方所在的处理器的runnext
上等待执行。该处理器在下一次调度时会立刻唤醒数据的接收方(发送数据的过程只是将接收方的 Goroutine 放到了处理器的runnext
中,并没有立即执行该 Goroutine)
写缓冲区
如果创建的 Channel 包含缓冲区,且 Channel 中的数据没有满
首先使用 chanbuf
计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove
将发送的数据拷贝到缓冲区并增加 sendx
索引和 qcount
计数器
向 Channel 发送的数据会存储在 Channel 中 sendx
索引所在的位置,并将 senx
索引加一,由于这里的 buf
是一个循环数据,所以当 sendx
等于 dataqsiz
时就会重新回到数组开始的位置
阻塞发送
当 Channel 没有接受者能够处理数据时,向 Channel 发送数据就会被下游阻塞,当然使用 select
关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞的发送数据:
- 调用
runtime.getg
获取发送数据使用的 Goroutine - 执行
runtime.acquireSudog
函数获取runtime.sudog
结构体并设置这一次阻塞发送的相关信息,如发送的 Channel、是否在 Select 控制结构中和发送数据的内存地址等 - 将刚刚创建并初始化的
runtime.sudog
加入发送等待队列,并设置到当前 Goroutine 的waiting
上,表示 Goroutine 正在等待该sudog
准备就绪 - 调用
runtime.goparkunlock
函数将当前的 Goroutine 陷入沉睡等待唤醒 - 被调度器唤醒后会执行一些收尾工作,将一些属性置零并释放
runtime.sudog
结构体 - 最后,函数放回 true 表示向 Channel 发送数据的结束
接收数据
当在空的 channel 接受数据的时候会直接调用 runtime.gopark
直接让出处理器的使用权
如果 Channel 已经被关闭,且缓冲区不存在任何数据,就会清除 ep
指针中的数据并立刻返回
除此之外,接受数据存在以下三种情况
- 直接接收:当存在等待的接收者时,通过
runtime.recv
直接从阻塞的接收者或者缓冲区中获取数据 - 读缓冲区:当缓冲区存在数据时,从 Channel 的缓冲区中接收数据
- 阻塞接收:当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据
直接接收
当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send
函数,而接收数据时使用 runtime.recv
函数
runtime.recv
根据缓冲区的大小分别处理:
- 如果 channel 不存在缓冲区:
- 调用
runtime.recvDirect
函数将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中
- 调用
- channel 存在缓冲区:
- 将队列中的数据拷贝到接收方的地址
- 将发送队列头的数据拷贝到缓冲区,释放一个阻塞的发送方
无论发生哪种情况,运行时都会调用 runtime.goready 函数将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
缓冲区
当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
return true, true
}
...
}
如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中、清除队列中的数据并完成收尾工作。
收尾工作包括递增 recvx,一旦发现索引超过了 Channel 的容量时,会将它归零重置循环队列的索引;除此之外,该函数还会减少 qcount 计数器并释放持有 Channel 的锁。
阻塞接受
当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block {
unlock(&c.lock)
return false, false
}
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
}
在正常的接收场景中,我们会使用 runtime.sudog
将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。
完成入队之后,上述代码还会调用 runtime.goparkunlock
立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。
关闭 channel
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
gp := sg.g
gp.param = nil
glist.push(gp)
}
// 处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑了,下面这段代码的主要工作就是将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素:
for {
sg := c.sendq.dequeue()
...
}
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready
触发调度。