Go 并发模型
Go 语言的并发模型是基于 CSP(Communicating Sequential Processes)模型的,CSP 模型是由 Tony Hoare 在 1978 年提出的,它的核心思想是:通过通信来共享内存,而不是通过共享内存来通信。
为了实现 CSP 模型中的输入输出原语,Go 引入了 goroutine 之间通信的原语 channel。 goroutine 可以从 channel 中输入数据,再将处理结果通过 channel 输出。通过 channel 将 goroutine 组合与连接起来,使得设计和编写大型并发系统变得更为简单。
Go 也支持基于共享内存的并发模型,并提供基本的低级同步原语(主要是 sync 包中的互斥锁,条件变量,读写锁,原子操作等)。
Go 始终推荐以 CSP 模型风格构建并发程序。
在语言层面,Go 针对 CSP 提供了三种并发原语:
- goroutine:对应 CSP 模型中的 P,封装了数据的处理逻辑,是 Go 运行时调度的基本执行单位。
- channel:对应 CSP 模型中的通信机制,用于在 goroutine 之间传递数据。
- select:用于应对多路输入、输出,可以让 goroutine 同时协调处理多个 channel 操作。
创建模式
Go 使用 go 关键字 + 函数/方法来创建 goroutine:
go func() {
// goroutine 的处理逻辑
}()
go fmt.Println("Hello, World!")
c := srv.newConn(rw)
go c.serve(connCtx)
在稍复杂一些的并发程序中,需要考虑通过 CSP 模型输入输出原语的承载体 channel 在 goroutine 之间建立联系。
type T struct {}
func spawn(f func()) chan T {
c := make(chan T)
go func() {
...
f()
...
}()
return c
}
func main() {
c := spawn(func() {
...
})
...
}
以上方式再内部创建一个 goroutine,然后返回一个 channel,外部通过 channel 与内部 goroutine 通信。 这是 Go 中最常见的 goroutine 创建模式。
退出模式
goroutine 使用代价很低,官方推荐多使用 goroutine。但一些常驻的后台服务程序可能会对 goroutine 有着优雅退出的要求。
分离模式
我们来看一个例子:
// src/net/dial.go
func (c *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
...
if oldCancel := d.Cancel; oldCancel != nil {
subCtx, cancel = context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-oldCancel:
cancel()
case <-subCtx.Done():
}
}()
ctx = subCtx
}
...
}
对于分离的 goroutine,创建它的 goroutine 不需要关心它的退出,其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。
join 模式
如果 goroutine 的创建者需要等待新 goroutine 结束,这样的退出模式称为 join 模式。
等待一个 goroutine 退出
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Duration(interval) * time.Second)
}
func spawn(f func(...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
go func() {
f(args...)
c <- struct{}{}
}()
return c
}
func main() {
done := spawn(worker, 3)
println("spawned a worker goroutine ")
<-done
println("worker done")
}
获取 goroutine 的退出状态
var OK = errors.New("ok")
func worker(args ...interface{}) error {
if len(args) == 0 {
return errors.New("no args")
}
interval, ok := args[0].(int)
if !ok {
return errors.New("invalid args")
}
time.Sleep(time.Duration(interval) * time.Second)
return OK
}
func spawn(f func(...interface{}) error, args ...interface{}) chan error {
c := make(chan error)
go func() {
c <- f(args...)
}()
return c
}
func main() {
done := spawn(worker, 3)
println("spawned a worker goroutine ")
if err := <-done; err != OK {
println("worker failed: ", err)
} else {
println("worker done")
}
}
我们将 worker 函数的返回值改为 error 类型,然后在 spawn 函数中将 worker 函数的返回值通过 channel 传递出去。这样就获得 goroutine 的退出状态。
等待多个 goroutine 退出
有些场景,goroutine 的创建者需要等待多个 goroutine 退出,这时可以使用 sync.WaitGroup。
func worker(args ...interface{}) {
if len(args) == 0 {
return
}
interval, ok := args[0].(int)
if !ok {
return
}
time.Sleep(time.Duration(interval) * time.Second)
}
func spawnGroup(n int, f func(...interface{}), args ...interface{}) chan struct{} {
c := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
println("worker done:", i)
f(args...)
}(i)
}
go func() {
wg.Wait()
c <- struct{}{}
}()
return c
}
func main() {
done := spawnGroup(3, worker, 3)
println("spawned 3 worker goroutines ")
<-done
println("all workers done")
}
我们看到,通过 sync.WaitGroup
, spawnGroup 每创建一个 goroutine 就调用 wg.Add(1)
,会在退出前调用 wg.Done()
。
spawnGroup 中还创建了一个用于监视的 goroutine,该 goroutine 调用 sync.Wait
来等待所以 goroutine 退出。
支持超时机制的等待
有时候,我们不想无限阻塞等待所有新创建 goroutine 的退出,而是仅等待一段合理的时间。如果在这段时间内 goroutine 没有退出,则创建者回继续向下执行或主动退出。
我们来看一个例子:
func main() {
done := spawn(worker, 3)
println("spawned a worker goroutine ")
time := time.NewTimer(5 * time.Second)
defer time.Stop()
select {
case <-time.C:
println("worker timeout")
case <-done:
println("worker done")
}
}
上述代码我们通过一个定时器 time.Timer
设置了超时等待时间,并通过 select
原语监听 time.C
和 done
两个 channel,如果 time.C
先到期,则输出 worker timeout
,否则输出 worker done
。
notift-and-wait 模式
前面的几个场景,goroutine 的创建者都是被动等待 goroutine 退出。但有时候,goroutine 的创建者需要主动通知那些 goroutine 退出。
我们来看一个例子:
func worker(n int) {
time.Sleep(time.Second * time.Duration(n))
}
func spawn(n int, f func(int)) chan struct{} {
quit := make(chan struct{})
job := make(chan int)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func(i interface) {
defer wg.Done()
println("worker id:", i)
for {
j, ok := <-job
if !ok {
return
}
worker(j)
}
}(i)
}
go func() {
<-quit
close(job)
wg.Wait()
quit <- struct{}{}
}()
return quit
}
func main() {
quit := spawn(3, worker)
println("spawned 3 worker goroutines ")
time.Sleep(5 * time.Second)
println("notify all workers to quit")
quit <- struct{}{}
time := time.NewTimer(5 * time.Second)
defer time.Stop()
select {
case <-time.C:
println("worker timeout")
case <-quit:
println("all workers done")
}
}
上述代码关键是创建者直接利用了 worker goroutine 接收任务(job)的 channel 来广播退出通知,而实现这一广播的代码是 close(job)
。
退出模式的应用
很多时间,我们程序中药启动多个 goroutine 协作完成应用的业务逻辑,比如:
func main() {
go producer.Start()
go consumer.Start()
go watcher.Start()
}
这些 goroutine 的运行形态都不一样,有的是生产者,有的是消费者等等。我们现在需要实现一个超时等到退出的框架,以统一解决各种运行形态的 goroutine 的优雅退出问题
type GracefullyShutdowner interface {
Shutdown(waitTimeout time.Duration) error
}
type ShutdownerFunc func(time. Duration) error
func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
return f(waitTimeout)
}
一组 goroutine 的退出总体有两种情况,一种是并发退出,另外一种是串行退出。
如果各个 goroutine 的退出先后次序对数据处理无影响,可以执行并发退出。如果各个 goroutine 之间的退出是按照一定次序逐个退出,如果次序错误则导致数据的状态混乱或错误。
我们来看一下并发退出:
func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
c := make(chan struct{})
go func() {
var wg sync.WaitGroup
for _, g := range shutdowners {
wg.Add(1)
go func(shutdowner GracefullyShutdowner) {
defer wg.Done()
shutdowner.Shutdown(waitTimeout)
}(g)
}
wg.Wait()
c <- struct{}{}
}()
timer := time.NewTimer(waitTimeout)
defer time.Stop()
select {
case <-c:
return nil
case <-time.C:
return error.New("wait timeout")
}
}
有了并发退出的基础,串行退出的实现也就简单了。
func SequentialShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
start := time.Now()
var left time.Duration
time := time.NewTimer(waitTimeout)
for _, g := range shutdowners {
elapsed := time.Since(start)
left = waitTimeout - elapsed
c := make(chat struct{})
go func(shutdowner GracefullyShutdowner) {
shutdowner.Shutdown(left)
c <- struct{}{}
}(g)
time.Reset(left)
select {
case <-c:
case <-time.C:
return error.New("wait timeout")
}
}
return nil
}