Go 语言
并发编程
Go 并发模型

Go 并发模型

Go 语言的并发模型是基于 CSP(Communicating Sequential Processes)模型的,CSP 模型是由 Tony Hoare 在 1978 年提出的,它的核心思想是:通过通信来共享内存,而不是通过共享内存来通信。

为了实现 CSP 模型中的输入输出原语,Go 引入了 goroutine 之间通信的原语 channel。 goroutine 可以从 channel 中输入数据,再将处理结果通过 channel 输出。通过 channel 将 goroutine 组合与连接起来,使得设计和编写大型并发系统变得更为简单。

Go 也支持基于共享内存的并发模型,并提供基本的低级同步原语(主要是 sync 包中的互斥锁,条件变量,读写锁,原子操作等)。

Go 始终推荐以 CSP 模型风格构建并发程序。

在语言层面,Go 针对 CSP 提供了三种并发原语:

  • goroutine:对应 CSP 模型中的 P,封装了数据的处理逻辑,是 Go 运行时调度的基本执行单位。
  • channel:对应 CSP 模型中的通信机制,用于在 goroutine 之间传递数据。
  • select:用于应对多路输入、输出,可以让 goroutine 同时协调处理多个 channel 操作。

创建模式

Go 使用 go 关键字 + 函数/方法来创建 goroutine:

go func() {
    // goroutine 的处理逻辑
}()
 
go fmt.Println("Hello, World!")
 
c := srv.newConn(rw)
go c.serve(connCtx)

在稍复杂一些的并发程序中,需要考虑通过 CSP 模型输入输出原语的承载体 channel 在 goroutine 之间建立联系。

type T struct {}
 
func spawn(f func()) chan T {
    c := make(chan T)
    go func() {
        ...
        f()
        ...
    }()
    return c
}
 
func main() {
    c := spawn(func() {
        ...
    })
    ...
}

以上方式再内部创建一个 goroutine,然后返回一个 channel,外部通过 channel 与内部 goroutine 通信。 这是 Go 中最常见的 goroutine 创建模式。

退出模式

goroutine 使用代价很低,官方推荐多使用 goroutine。但一些常驻的后台服务程序可能会对 goroutine 有着优雅退出的要求。

分离模式

我们来看一个例子:

// src/net/dial.go
func (c *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
    ...
    if oldCancel := d.Cancel; oldCancel != nil {
        subCtx, cancel = context.WithCancel(ctx)
        defer cancel()
        go func() {
            select {
            case <-oldCancel:
                cancel()
            case <-subCtx.Done():
            }
        }()
        ctx = subCtx
    }
    ...
}

对于分离的 goroutine,创建它的 goroutine 不需要关心它的退出,其生命周期与其执行的主函数相关,函数返回即 goroutine 退出。

join 模式

如果 goroutine 的创建者需要等待新 goroutine 结束,这样的退出模式称为 join 模式。

等待一个 goroutine 退出

func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
 
    interval, ok := args[0].(int)
    if !ok {
        return
    }
 
    time.Sleep(time.Duration(interval) * time.Second)
}
 
func spawn(f func(...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    go func() {
        f(args...)
        c <- struct{}{}
    }()
    return c
}
 
func main() {
    done := spawn(worker, 3)
    println("spawned a worker goroutine ")
    <-done
    println("worker done")
}

获取 goroutine 的退出状态

var OK = errors.New("ok")
 
func worker(args ...interface{}) error {
    if len(args) == 0 {
        return errors.New("no args")
    }
 
    interval, ok := args[0].(int)
    if !ok {
        return errors.New("invalid args")
    }
 
    time.Sleep(time.Duration(interval) * time.Second)
    return OK
}
 
func spawn(f func(...interface{}) error, args ...interface{}) chan error {
    c := make(chan error)
    go func() {
        c <- f(args...)
    }()
    return c
}
 
func main() {
    done := spawn(worker, 3)
    println("spawned a worker goroutine ")
    if err := <-done; err != OK {
        println("worker failed: ", err)
    } else {
        println("worker done")
    }
}

我们将 worker 函数的返回值改为 error 类型,然后在 spawn 函数中将 worker 函数的返回值通过 channel 传递出去。这样就获得 goroutine 的退出状态。

等待多个 goroutine 退出

有些场景,goroutine 的创建者需要等待多个 goroutine 退出,这时可以使用 sync.WaitGroup。

func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
 
    interval, ok := args[0].(int)
    if !ok {
        return
    }
 
    time.Sleep(time.Duration(interval) * time.Second)
}
 
func spawnGroup(n int, f func(...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            println("worker done:", i)
            f(args...)
        }(i)
    }
    go func() {
        wg.Wait()
        c <- struct{}{}
    }()
    return c
}
 
func main() {
    done := spawnGroup(3, worker, 3)
    println("spawned 3 worker goroutines ")
    <-done
    println("all workers done")
}

我们看到,通过 sync.WaitGroup, spawnGroup 每创建一个 goroutine 就调用 wg.Add(1),会在退出前调用 wg.Done()。 spawnGroup 中还创建了一个用于监视的 goroutine,该 goroutine 调用 sync.Wait 来等待所以 goroutine 退出。

支持超时机制的等待

有时候,我们不想无限阻塞等待所有新创建 goroutine 的退出,而是仅等待一段合理的时间。如果在这段时间内 goroutine 没有退出,则创建者回继续向下执行或主动退出。

我们来看一个例子:

func main() {
    done := spawn(worker, 3)
    println("spawned a worker goroutine ")
 
    time := time.NewTimer(5 * time.Second)
    defer time.Stop()
    select {
    case <-time.C:
        println("worker timeout")
    case <-done:
        println("worker done")
    }
}

上述代码我们通过一个定时器 time.Timer 设置了超时等待时间,并通过 select 原语监听 time.Cdone 两个 channel,如果 time.C 先到期,则输出 worker timeout,否则输出 worker done

notift-and-wait 模式

前面的几个场景,goroutine 的创建者都是被动等待 goroutine 退出。但有时候,goroutine 的创建者需要主动通知那些 goroutine 退出。

我们来看一个例子:

func worker(n int) {
    time.Sleep(time.Second * time.Duration(n))
}
 
func spawn(n int, f func(int)) chan struct{} {
    quit := make(chan struct{})
    job := make(chan int)
    var wg sync.WaitGroup
 
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i interface) {
            defer wg.Done()
            println("worker id:", i)
            for {
                j, ok := <-job
                if !ok {
                    return
                }
                worker(j)
            }
        }(i)
    }
 
    go func() {
        <-quit
        close(job)
        wg.Wait()
        quit <- struct{}{}
    }()
 
    return quit
}
 
func main() {
    quit := spawn(3, worker)
    println("spawned 3 worker goroutines ")
 
    time.Sleep(5 * time.Second)
    println("notify all workers to quit")
    quit <- struct{}{}
 
    time := time.NewTimer(5 * time.Second)
    defer time.Stop()
    select {
    case <-time.C:
        println("worker timeout")
    case <-quit:
        println("all workers done")
    }
}

上述代码关键是创建者直接利用了 worker goroutine 接收任务(job)的 channel 来广播退出通知,而实现这一广播的代码是 close(job)

退出模式的应用

很多时间,我们程序中药启动多个 goroutine 协作完成应用的业务逻辑,比如:

func main() {
    go producer.Start()
    go consumer.Start()
    go watcher.Start()
}

这些 goroutine 的运行形态都不一样,有的是生产者,有的是消费者等等。我们现在需要实现一个超时等到退出的框架,以统一解决各种运行形态的 goroutine 的优雅退出问题

type GracefullyShutdowner interface {
    Shutdown(waitTimeout time.Duration) error
}
 
type ShutdownerFunc func(time. Duration) error
 
func (f ShutdownerFunc) Shutdown(waitTimeout time.Duration) error {
    return f(waitTimeout)
}

一组 goroutine 的退出总体有两种情况,一种是并发退出,另外一种是串行退出。

如果各个 goroutine 的退出先后次序对数据处理无影响,可以执行并发退出。如果各个 goroutine 之间的退出是按照一定次序逐个退出,如果次序错误则导致数据的状态混乱或错误。

我们来看一下并发退出:

func ConcurrentShutdown(waitTimeout time.Duration, shutdowners ...GracefullyShutdowner) error {
    c := make(chan struct{})
 
    go func() {
        var wg sync.WaitGroup
        for _, g := range shutdowners {
            wg.Add(1)
            go func(shutdowner GracefullyShutdowner) {
                defer wg.Done()
                shutdowner.Shutdown(waitTimeout)
            }(g)
        }
        wg.Wait()
        c <- struct{}{}
    }()
 
    timer := time.NewTimer(waitTimeout)
    defer time.Stop()
 
    select {
    case <-c:
        return nil
    case <-time.C:
        return error.New("wait timeout")
    }
}

有了并发退出的基础,串行退出的实现也就简单了。

func SequentialShutdown(waitTimeout  time.Duration, shutdowners ...GracefullyShutdowner) error {
    start := time.Now()
    var left time.Duration
    time := time.NewTimer(waitTimeout)
 
    for _, g := range shutdowners {
        elapsed := time.Since(start)
        left = waitTimeout - elapsed
 
        c := make(chat struct{})
        go func(shutdowner GracefullyShutdowner) {
            shutdowner.Shutdown(left)
            c <- struct{}{}
        }(g)
 
        time.Reset(left)
 
        select {
        case <-c:
        case <-time.C:
            return error.New("wait timeout")
        }
    }
 
    return nil
}