Go 语言
并发编程
sync 的用法

sync 的用法

Go 语言在提供 CSP 并发模型原语的同时,还通过标准库的 sync 包提供了针对传统的基于共享内存并发模型的基本同步原语, 包括互斥锁(sync.Mutex),读写锁(sync.RWMutex),条件变量(sync.Cond),等待组(sync.WaitGroup),单次执行(sync.Once)等。

下面一些场景下,我们会使用到 sync 包提供的同步原语:

  1. 需要高性能的临界区同步机制场景,例如使用 sync.Mutex 实现的同步机制要比 channel 实现的高出两倍多。
  2. 不想转移结构体对象所有权,但又要保证结构体内部状态数据的同步访问的场景。

注意事项

src/sync/mutex.go 文件中,我们能找到一些使用的注意事项,如:

// Values containing the types defined in this package should not be copied.
// 该包中定义的类型的值不应该被复制。
 
// A Mutex must not be copied after first use.
// 互斥锁在第一次使用后不应该被复制。
 
// A RWMutex must not be copied after first use.
// 读写锁在第一次使用后不应该被复制。
 
// A Cond must not be copied after first use.
// 条件变量在第一次使用后不应该被复制。

那些 sync 包中的类型的实例在首次使用后被复制得到的副本一旦再次被使用将导致不可预期的结果。 为此,在使用 sync 包中类型时,推荐通过闭包方式或传递类型实例(或包裹该类型的类型实例)的地址或指针的方式进行。

互斥锁还是读写锁

互斥锁是临界区同步原语的首选,它常被用来对结构体对象的内部状态,缓存等进行保护,是使用最为广泛的临界区同步机制。

我们对比一下互斥锁和读写锁的性能差异:

sync_test.go
var cs1 = 0 // 模拟临界区要保护的数据
var mu1 sync.Mutex
 
var cs2 = 0 // 模拟临界区要保护的数据
var mu2 sync.RWMutex
 
func BenchmarkMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu1.Lock()
            _ = cs1
            mu1.Unlock()
        }
    })
}
 
func BenchmarkReadSyncByRWMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu2.RLock()
            _ = cs2
            mu2.RUnlock()
        }
    })
}
 
func BenchmarkWriteSyncByRWMutex(b *testing.B) {
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu2.Lock()
            cs2++
            mu2.Unlock()
        }
    })
}

上面的代码中,我们使用了 go test -bench=. -cpu 2 命令来进行性能测试,测试结果如下:

go test -bench=. sync_test.go -cpu 2
go test -bench=. sync_test.go -cpu 8
go test -bench=. sync_test.go -cpu 16
go test -bench=. sync_test.go -cpu 24
go test -bench=. sync_test.go -cpu 32

结论是:

  • 在并发量较小的情况下,互斥锁的性能要比读写锁的性能要好;随着并发量增大,互斥锁的竞争激烈,导致加锁和解锁性能下降。
  • 读写锁的读锁性能并未随着并发量的增大而下降,而写锁的性能随着并发量的增大而下降。
  • 在并发量较大的情况下,读写锁的写锁性能比互斥锁、读写锁的读性能都差,随着并发量增大,写锁性能有继续下降的趋势。

我们可以看到,读写锁适合应用在具有一定并发量且读多写少的场合。

条件变量

sync.Cond 是传统的条件变量原语概念在 Go 语言中的实现。一个条件变量可以理解为一个容器。 这个容器存放着一个或多个等待的 goroutine,当条件变量的条件满足时,条件变量会通知容器中的 goroutine。

type signal struct{}
var ready bool
 
func worker(i int) {
    println(i, "ready")
    time.Sleep(time.Second * time.Duration(i))
    println(i, "done")
}
 
func spawnGroup(f func(i int), num int, groupSignal *sync.Cond) <-chan signal {
    c := make(chan signal)
    var wg sync.WaitGroup
 
    for i := 0; i < num; i++ {
        wg.Add(1)
        go func(i int) {
            groupSignal.L.Lock()
            for !ready {
                groupSignal.Wait()
            }
            groupSignal.L.Unlock()
            println(i, "start")
            f(i)
            wg.Done()
        }(i+1)
    }
 
    go func() {
        wg.Wait()
        c <- signal(struct{}{})
    }()
 
    return c
}
 
func main() {
    groupSignal := sync.NewCond(&sync.Mutex{})
    c := spawnGroup(worker, 3, groupSignal)
 
    time.Sleep(time.Second)
 
    groupSignal.L.Lock()
    ready = true
    groupSignal.Broadcast()
    groupSignal.L.Unlock()
 
    <-c
}

在 main goroutine 将 ready 设置为 true 后,调用 groupSignal.Broadcast() 方法通知容器中的 goroutine。

sync.Once

我们知道程序运行期间只被执行一次且 goroutine 的安全函数只有每个包的 init 函数。 sync 包提供了另一种更为灵活的机制,可以保证任意一个函数在程序运行期间只被执行一次,这就是 sync.Once

sync.Once 十分适合实现单例模型,并且实现起来十分简单,我们来看一个例子:

type Foo struct {}
 
var once sync.Once
var instance *Foo
 
func GetInstance(id int) *Foo {
    defer func() {
        if e := recover(); e != nil {
            println("recover", e)
        }
    }()
    println("GetInstance", id)
    once.Do(func() {
        instance = &Foo{}
        time.Sleep(2 * time.Second)
        println("init", id, instance)
        panic("panic in once.Do function")
    })
    return instance
}
 
func main() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            inst := GetInstance(i)
            println("instance addr", i, inst)
        }(i+1)
    }
 
    time.Sleep(3 * time.Second)
    inst := GetInstance(0)
    println("instance addr", 0, inst)
 
    wg.Wait()
    println("done")
}

运行结果:

GetInstance 1
GetInstance 2
GetInstance 3
init 1 0x540140
recover (0x463f80,0x48b590)
instance addr 1 0x0
instance addr 3 0x540140
instance addr 2 0x540140
GetInstance 0
instance addr 0 0x540140
done

我们可以看到,GetInstance 函数只被执行了一次,且在第一次执行时发生了 panic,但是 panic 并没有影响到后续的执行。

sync.Pool

sync.Pool 是一个对象缓存池,它具有下面的特点:

  • 它是 goroutine 并发安全的,可以被多个 goroutine 同时使用。
  • 放入该缓存池中的数据对象的生命是暂时的,随时都可以被垃圾回收掉。
  • 缓存池中的数据对象是可以重复利用的,这样可以在一定程度上降低数据对象重新分配的频率,减轻 GC 压力。
  • sync.Pool 会为每个 P(调度器中的 P)单独建立一个 local 缓存池,进一步降低高并发下对锁的争抢。

sync.Pool 的典型用法就是建立像 bytes.Buffer 这样类型的临时的缓存对象池:

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

sync.Pool 的使用方法很简单,只需要调用 Put 方法放入对象,调用 Get 方法取出对象即可。

func main() {
    buf := bufPool.Get().(*bytes.Buffer)
    defer bufPool.Put(buf)
    buf.WriteString("hello, ")
    buf.WriteString("world!")
    println(buf.String())
}

由于 sync.PoolGet 方法从缓存池中挑选数据对象时,并未考虑数据对象是否满足调用者的需求,因此一旦返回的对象是刚刚被"大数据"撑大后的,并且调用者只是需要一个小数据对象时,这样就会导致内存的浪费。 一旦这类情况集中出现,将会给 Go 应用带来沉重的内存消耗负担。

目前的 Go 标准库采用两种方式来缓解这个问题:

  • 限制要放回缓冲池中的数据对象大小;
  • 建立多级缓存池。