Skip to main content
  1. Docs/

Go 协程池的一些实现

·2 mins· ·
Golang
Owl Dawn
Author
Owl Dawn
Table of Contents

协程池主要可以解决并发数量过高的问题,可以限制并发数量,防止并发任务过多导致资源耗尽。部分协程池的实现会复用协程,减少栈分裂,减少协程的创建和销毁的开销。

Channel 简单实现
#

type Pool struct {
	MaxGoroutines int
	wg            sync.WaitGroup
	ch            chan struct{}
}

func NewPool(maxGoroutines int) *Pool {
	return &Pool{
		MaxGoroutines: maxGoroutines,
		ch:            make(chan struct{}, maxGoroutines),
	}
}

func (p *Pool) AddTask(ctx context.Context, task func()) {
	p.ch <- struct{}{}
	p.wg.Add(1) // !!注意 waitGroup 的计数需要在 goroutine 外层添加,否则引入竞争问题,可能主线程 wait 时协程内还未 Add
	go func() {
		defer log.Recover() // 捕获处理 panic
		defer func() {
			<-p.ch
			time.Sleep(100 * time.Microsecond)
			p.wg.Done()
		}()
		task()
	}()
}

func (p *Pool) Wait() {
	p.wg.Wait()
}

gopkg/gopool 实现
#

源码地址:https://github.com/bytedance/gopkg/tree/main/util/gopool

设计可以抽象成两个池化概念:

  • 任务池:链表实现,每个任务作为一个节点添加在链表最后,遵从 FIFO。每一个任务节点从 sync.Pool 获取赋值,减少 gc 压力和内存的频繁分配
  • 协程池(worker):存储协程,每个协程都循环从任务池中拉取任务执行,达到协程的复用。如果任务池空了,则回收协程

实现源码: 任务池

// https://github.com/bytedance/gopkg/blob/main/util/gopool/pool.go
type pool struct {
	// The name of the pool
	name string

	// capacity of the pool, the maximum number of goroutines that are actually working
	cap int32
	// Configuration information
	config *Config
	// linked list of tasks
    // 链表实现
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// Record the number of running workers
	workerCount int32

	// This method will be called when the worker panic
	panicHandler func(context.Context, interface{})
}
...
func (p *pool) CtxGo(ctx context.Context, f func()) {
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)
	// The following two conditions are met:
	// 1. the number of tasks is greater than the threshold.
	// 2. The current number of workers is less than the upper limit p.cap.
	// or there are currently no workers.
    // 任务池控制向协程池中添加协程
    // 如果协程池为空或者阈值判断达到条件,就会向协程池中添加协程
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
		p.incWorkerCount()
		w := workerPool.Get().(*worker)
		w.pool = p
		w.run()
	}
}

worker 池

// https://github.com/bytedance/gopkg/blob/main/util/gopool/worker.go
func (w *worker) run() {
	go func() {
		for {
			var t *task
			w.pool.taskLock.Lock()
            // 从任务池中取出任务
			if w.pool.taskHead != nil {
				t = w.pool.taskHead
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
            // 任务池为空则回收协程
			if t == nil {
				// if there's no task to do, exit
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()
            // 执行任务
			func() {
				defer func() {
                    // 捕获 panic
					if r := recover(); r != nil {
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						} else {
							msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
							logger.CtxErrorf(t.ctx, msg)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}

panjf2000/ants
#

源码地址:https://github.com/panjf2000/ants

项目 README 中详细介绍了实现原理。以及作者的文章

Related

Go Relect 机制
·2 mins
Golang
Golang Channel
·5 mins
Golang
Golang sync.Map
·4 mins
Golang
Go Panic
·3 mins
Golang
Golang 锁机制
·3 mins
Golang
分布式浅谈
·12 mins
分布式