协程池主要可以解决并发数量过高的问题,可以限制并发数量,防止并发任务过多导致资源耗尽。部分协程池的实现会复用协程,减少栈分裂,减少协程的创建和销毁的开销。
Channel 简单实现 #
type Pool struct {
MaxGoroutines int
wg sync.WaitGroup
ch chan struct{}
}
func NewPool(maxGoroutines int) *Pool {
return &Pool{
MaxGoroutines: maxGoroutines,
ch: make(chan struct{}, maxGoroutines),
}
}
func (p *Pool) AddTask(ctx context.Context, task func()) {
p.ch <- struct{}{}
p.wg.Add(1) // !!注意 waitGroup 的计数需要在 goroutine 外层添加,否则引入竞争问题,可能主线程 wait 时协程内还未 Add
go func() {
defer log.Recover() // 捕获处理 panic
defer func() {
<-p.ch
time.Sleep(100 * time.Microsecond)
p.wg.Done()
}()
task()
}()
}
func (p *Pool) Wait() {
p.wg.Wait()
}
gopkg/gopool 实现 #
源码地址:https://github.com/bytedance/gopkg/tree/main/util/gopool
设计可以抽象成两个池化概念:
- 任务池:链表实现,每个任务作为一个节点添加在链表最后,遵从 FIFO。每一个任务节点从
sync.Pool
获取赋值,减少 gc 压力和内存的频繁分配 - 协程池(worker):存储协程,每个协程都循环从任务池中拉取任务执行,达到协程的复用。如果任务池空了,则回收协程
实现源码: 任务池:
// https://github.com/bytedance/gopkg/blob/main/util/gopool/pool.go
type pool struct {
// The name of the pool
name string
// capacity of the pool, the maximum number of goroutines that are actually working
cap int32
// Configuration information
config *Config
// linked list of tasks
// 链表实现
taskHead *task
taskTail *task
taskLock sync.Mutex
taskCount int32
// Record the number of running workers
workerCount int32
// This method will be called when the worker panic
panicHandler func(context.Context, interface{})
}
...
func (p *pool) CtxGo(ctx context.Context, f func()) {
t := taskPool.Get().(*task)
t.ctx = ctx
t.f = f
p.taskLock.Lock()
if p.taskHead == nil {
p.taskHead = t
p.taskTail = t
} else {
p.taskTail.next = t
p.taskTail = t
}
p.taskLock.Unlock()
atomic.AddInt32(&p.taskCount, 1)
// The following two conditions are met:
// 1. the number of tasks is greater than the threshold.
// 2. The current number of workers is less than the upper limit p.cap.
// or there are currently no workers.
// 任务池控制向协程池中添加协程
// 如果协程池为空或者阈值判断达到条件,就会向协程池中添加协程
if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
p.incWorkerCount()
w := workerPool.Get().(*worker)
w.pool = p
w.run()
}
}
worker 池
// https://github.com/bytedance/gopkg/blob/main/util/gopool/worker.go
func (w *worker) run() {
go func() {
for {
var t *task
w.pool.taskLock.Lock()
// 从任务池中取出任务
if w.pool.taskHead != nil {
t = w.pool.taskHead
w.pool.taskHead = w.pool.taskHead.next
atomic.AddInt32(&w.pool.taskCount, -1)
}
// 任务池为空则回收协程
if t == nil {
// if there's no task to do, exit
w.close()
w.pool.taskLock.Unlock()
w.Recycle()
return
}
w.pool.taskLock.Unlock()
// 执行任务
func() {
defer func() {
// 捕获 panic
if r := recover(); r != nil {
if w.pool.panicHandler != nil {
w.pool.panicHandler(t.ctx, r)
} else {
msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
logger.CtxErrorf(t.ctx, msg)
}
}
}()
t.f()
}()
t.Recycle()
}
}()
}
panjf2000/ants #
源码地址:https://github.com/panjf2000/ants
项目 README 中详细介绍了实现原理。以及作者的文章