Skip to main content
  1. Docs/

Golang Channel

·5 mins· ·
Owl Dawn
Author
Owl Dawn
Table of Contents

Go Channel 的实现
#

关于 channel 锁和 cas 的实现
#

Channel 在运行时的内部表示为 runtime.hchan,该结构体中包含了一个用于保护成员变量的互斥锁,从某种程度上说,Channel 是一个用于同步和通信的有锁队列。

锁会导致休眠和唤醒,带来额外的上下文切换,如果临界区过小,加锁解锁会导致额外的开销。Go 语言社区在 2014 年也提出了无锁 Channel 的实现方案,该方案将 Channel 分为了以下三种类型

  • 同步 Channel:不需要缓冲区,发送方直接将数据交给(handoff)接收方
  • 异步 Channel:基于环形缓存的传统生产者消费者模型
  • chan struct{} 类型的异步 Channel:struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)语义

这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但在实际基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。

目前通过 CAS 实现的无锁 Channel 没有提供 FIFO 的特性,所以该提案暂时被搁浅

创建 chan
#

Go 中所有 Channel 的创建都会使用 make 关键字

这一阶段对 make 关键字的缓冲区大小做检查,如果不向 make 传递表示缓冲区大小的参数,则会设置 0 参数,表示当前 chan 不存在缓冲区

发送数据
#

当 channel 发送数据时,如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报 send on closed channel 错误并终止程序。除此之外还有三种场景

  • 直接发送:当存在等待的接受者时,通过 runtime.send 直接将数据发送给阻塞的接受者
  • 写缓冲区:当缓冲区存在空余空间时,建发送的数据写入 Channel 的缓冲区
  • 阻塞发送:当不存在缓冲区或缓冲区已经满时,等待其他 Goroutine 从 Channel 接收数据

直接发送
#

如果目标 Channel 没有被关闭而且处于已经有处于读等待的 Goroutine,那么 runtime.chansend 函数会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向他发送数据

runtime.send 执行分为两个部分:

  1. 调用 runtime.sendDirect 函数将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上
  2. 调用 runtime.goready 将等待接收数据的 Goroutine 标记为可运行状态 Grunnable ,并把改 Goroutine 放到发送方所在的处理器的 runnext 上等待执行。该处理器在下一次调度时会立刻唤醒数据的接收方(发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,并没有立即执行该 Goroutine)

写缓冲区
#

如果创建的 Channel 包含缓冲区,且 Channel 中的数据没有满

首先使用 chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区并增加 sendx 索引和 qcount 计数器

向 Channel 发送的数据会存储在 Channel 中 sendx 索引所在的位置,并将 senx 索引加一,由于这里的 buf 是一个循环数据,所以当 sendx 等于 dataqsiz 时就会重新回到数组开始的位置

阻塞发送
#

当 Channel 没有接受者能够处理数据时,向 Channel 发送数据就会被下游阻塞,当然使用 select 关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞的发送数据:

  1. 调用 runtime.getg 获取发送数据使用的 Goroutine
  2. 执行 runtime.acquireSudog 函数获取 runtime.sudog 结构体并设置这一次阻塞发送的相关信息,如发送的 Channel、是否在 Select 控制结构中和发送数据的内存地址等
  3. 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪
  4. 调用 runtime.goparkunlock 函数将当前的 Goroutine 陷入沉睡等待唤醒
  5. 被调度器唤醒后会执行一些收尾工作,将一些属性置零并释放 runtime.sudog 结构体
  6. 最后,函数放回 true 表示向 Channel 发送数据的结束

接收数据
#

当在空的 channel 接受数据的时候会直接调用 runtime.gopark 直接让出处理器的使用权

如果 Channel 已经被关闭,且缓冲区不存在任何数据,就会清除 ep 指针中的数据并立刻返回

除此之外,接受数据存在以下三种情况

  • 直接接收:当存在等待的接收者时,通过 runtime.recv 直接从阻塞的接收者或者缓冲区中获取数据
  • 读缓冲区:当缓冲区存在数据时,从 Channel 的缓冲区中接收数据
  • 阻塞接收:当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据

直接接收
#

当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send 函数,而接收数据时使用 runtime.recv 函数

runtime.recv 根据缓冲区的大小分别处理:

  • 如果 channel 不存在缓冲区:
    • 调用 runtime.recvDirect 函数将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中
  • channel 存在缓冲区:
    • 将队列中的数据拷贝到接收方的地址
    • 将发送队列头的数据拷贝到缓冲区,释放一个阻塞的发送方

无论发生哪种情况,运行时都会调用 runtime.goready 函数将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

缓冲区
#

当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		return true, true
	}
	...
}

如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中、清除队列中的数据并完成收尾工作。

收尾工作包括递增 recvx,一旦发现索引超过了 Channel 的容量时,会将它归零重置循环队列的索引;除此之外,该函数还会减少 qcount 计数器并释放持有 Channel 的锁。

阻塞接受
#

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
	return true, !closed
}

在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。

完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。

关闭 channel
#

当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

    c.closed = 1

	var glist gList
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		gp := sg.g
		gp.param = nil
		glist.push(gp)
	}

    // 处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑了,下面这段代码的主要工作就是将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素:
	for {
		sg := c.sendq.dequeue()
		...
	}
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。

参考
#

Related

Golang sync.Map
·4 mins
containerd
·12 mins
Cgroup
·5 mins
Cgroup Cgroup