Golang Channel

Posted by OwlDawn on Monday, July 8, 2024

Go Channel 的实现

关于 channel 锁和 cas 的实现

Channel 在运行时的内部表示为 runtime.hchan,该结构体中包含了一个用于保护成员变量的互斥锁,从某种程度上说,Channel 是一个用于同步和通信的有锁队列。

锁会导致休眠和唤醒,带来额外的上下文切换,如果临界区过小,加锁解锁会导致额外的开销。Go 语言社区在 2014 年也提出了无锁 Channel 的实现方案,该方案将 Channel 分为了以下三种类型

  • 同步 Channel:不需要缓冲区,发送方直接将数据交给(handoff)接收方
  • 异步 Channel:基于环形缓存的传统生产者消费者模型
  • chan struct{} 类型的异步 Channel:struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)语义

这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。社区中已经有无锁 Channel 的实现,但在实际基准测试中,无锁队列在多核测试中的表现还需要进一步的改进。

目前通过 CAS 实现的无锁 Channel 没有提供 FIFO 的特性,所以该提案暂时被搁浅

创建 chan

Go 中所有 Channel 的创建都会使用 make 关键字

这一阶段对 make 关键字的缓冲区大小做检查,如果不向 make 传递表示缓冲区大小的参数,则会设置 0 参数,表示当前 chan 不存在缓冲区

发送数据

当 channel 发送数据时,如果 Channel 已经关闭,那么向该 Channel 发送数据时就会报 send on closed channel 错误并终止程序。除此之外还有三种场景

  • 直接发送:当存在等待的接受者时,通过 runtime.send 直接将数据发送给阻塞的接受者
  • 写缓冲区:当缓冲区存在空余空间时,建发送的数据写入 Channel 的缓冲区
  • 阻塞发送:当不存在缓冲区或缓冲区已经满时,等待其他 Goroutine 从 Channel 接收数据

直接发送

如果目标 Channel 没有被关闭而且处于已经有处于读等待的 Goroutine,那么 runtime.chansend 函数会从接收队列 recvq 中取出最先陷入等待的 Goroutine 并直接向他发送数据

runtime.send 执行分为两个部分:

  1. 调用 runtime.sendDirect 函数将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上
  2. 调用 runtime.goready 将等待接收数据的 Goroutine 标记为可运行状态 Grunnable ,并把改 Goroutine 放到发送方所在的处理器的 runnext 上等待执行。该处理器在下一次调度时会立刻唤醒数据的接收方(发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,并没有立即执行该 Goroutine)

写缓冲区

如果创建的 Channel 包含缓冲区,且 Channel 中的数据没有满

首先使用 chanbuf 计算出下一个可以存储数据的位置,然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区并增加 sendx 索引和 qcount 计数器

向 Channel 发送的数据会存储在 Channel 中 sendx 索引所在的位置,并将 senx 索引加一,由于这里的 buf 是一个循环数据,所以当 sendx 等于 dataqsiz 时就会重新回到数组开始的位置

阻塞发送

当 Channel 没有接受者能够处理数据时,向 Channel 发送数据就会被下游阻塞,当然使用 select 关键字可以向 Channel 非阻塞地发送消息。向 Channel 阻塞的发送数据:

  1. 调用 runtime.getg 获取发送数据使用的 Goroutine
  2. 执行 runtime.acquireSudog 函数获取 runtime.sudog 结构体并设置这一次阻塞发送的相关信息,如发送的 Channel、是否在 Select 控制结构中和发送数据的内存地址等
  3. 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪
  4. 调用 runtime.goparkunlock 函数将当前的 Goroutine 陷入沉睡等待唤醒
  5. 被调度器唤醒后会执行一些收尾工作,将一些属性置零并释放 runtime.sudog 结构体
  6. 最后,函数放回 true 表示向 Channel 发送数据的结束

接收数据

当在空的 channel 接受数据的时候会直接调用 runtime.gopark 直接让出处理器的使用权

如果 Channel 已经被关闭,且缓冲区不存在任何数据,就会清除 ep 指针中的数据并立刻返回

除此之外,接受数据存在以下三种情况

  • 直接接收:当存在等待的接收者时,通过 runtime.recv 直接从阻塞的接收者或者缓冲区中获取数据
  • 读缓冲区:当缓冲区存在数据时,从 Channel 的缓冲区中接收数据
  • 阻塞接收:当缓冲区中不存在数据时,等待其他 Goroutine 向 Channel 发送数据

直接接收

当 Channel 的 sendq 队列中包含处于等待状态的 Goroutine 时,该函数会取出队列头等待的 Goroutine,处理的逻辑和发送时相差无几,只是发送数据时调用的是 runtime.send 函数,而接收数据时使用 runtime.recv 函数

runtime.recv 根据缓冲区的大小分别处理:

  • 如果 channel 不存在缓冲区:
    • 调用 runtime.recvDirect 函数将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中
  • channel 存在缓冲区:
    • 将队列中的数据拷贝到接收方的地址
    • 将发送队列头的数据拷贝到缓冲区,释放一个阻塞的发送方

无论发生哪种情况,运行时都会调用 runtime.goready 函数将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

缓冲区

当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		return true, true
	}
	...
}

如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中、清除队列中的数据并完成收尾工作。

收尾工作包括递增 recvx,一旦发现索引超过了 Channel 的容量时,会将它归零重置循环队列的索引;除此之外,该函数还会减少 qcount 计数器并释放持有 Channel 的锁。

阻塞接受

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	...
	if !block {
		unlock(&c.lock)
		return false, false
	}

	gp := getg()
	mysg := acquireSudog()
	mysg.elem = ep
	gp.waiting = mysg
	mysg.g = gp
	mysg.c = c
	c.recvq.enqueue(mysg)
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	releaseSudog(mysg)
	return true, !closed
}

在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。

完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。

关闭 channel

当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:

func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

    c.closed = 1

	var glist gList
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		gp := sg.g
		gp.param = nil
		glist.push(gp)
	}

    // 处理完了这些异常的情况之后就可以开始执行关闭 Channel 的逻辑了,下面这段代码的主要工作就是将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素:
	for {
		sg := c.sendq.dequeue()
		...
	}
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。

参考