channel 的用法
channel 是 Go 语言提供的一种重要的并发原语。它在 CSP 模型中扮演着重要的角色:既可以实现 goroutine 间的通信,又可以实现 goroutine 间的同步。
channel 类型在 Go 中为一等公民,可以像使用变量那样使用 channel。如:
c := make(chan int) // 创建一个无缓冲(unbuffered)的 int 类型的 channel
c := make(chan int, 5) // 创建一个带缓冲的 int 类型的 channel
c < - 1 // 向 channel c 发送一个整数 1
x := <- c // 从 channel c 接收一个整数,并赋值给 x
x, ok := <- c // 从 channel c 接收一个整数,并赋值给 x,同时 ok 为 true 表示接收成功
for i := range c { // 从 channel c 中接收值,直到 channel 被关闭
fmt.Println(i)
}
close(c) // 关闭 channel c
c := make(chan chan int) // 创建一个无缓存 channel,用于传递 int 类型的 channel
func stream(ctx context.Context, out chan<- Value) error // 将只发发送(send-only)channel 作为函数参数
func spawn(...) <-chan T // 将只收接收(receive-only)channel 作为函数返回值
当需要同时对多个 channel 进行操作时,我们会结合使用 Go 为 CSP 模型提供的另一种原语 select。 通过 select,我们可以监听多个 channel 上的数据流动,直到其中一个 channel 准备好接收数据,就从中接收数据。
select {
case x := <-ch1:
// 如果 ch1 可读,则执行该 case
case y, ok := <-ch2:
// 如果 ch2 可读,则执行该 case
case c3 <- z:
// 如果 c3 可写,则执行该 case
default:
// 如果没有 case 可执行,则执行 default
}
无缓存 channel
无缓存 channel 是指 channel 的大小为 0,即不存储任何数据。无缓存 channel 的发送和接收操作是阻塞的,直到发送方和接收方都准备好。
c := make(chan T) // 创建一个无缓存的 T 类型的 channel
由于无缓存 channel 的运行时层实现不带有缓冲区,因此对无缓存 channel 的发送和接收操作是同步的。 也就是说,只有在对其进行接收操作和对其进行发送操作的 goroutine 都存在的情况下,通信才能进行。 否则单方面的操作会让对应的 goroutine 阻塞。
用作信号传递
一对一通知信号
无缓冲 channel 常被用于两个 goroutine 之间一对一地传递通知信号,如:
type signal struct {}
func worker() {
println("worker is working")
time.Sleep(time.Second)
}
func spawn(f func()) <-chan signal {
c := make(chan signal)
go func() {
println("woker start to work...")
f()
c <- signal(struct{}{})
}()
return c
}
func main() {
c := spawn(worker)
<-c
println("worker is done")
}
上述例子,spawn 函数返回的 channel 用于承载新 goroutine 的结束信号,main 函数通过该 channel 等待 worker 函数的结束。
一对多通知信号
我们来看一个例子:
type signal struct{}
func worker(i int) {
println("worker", i, "is working")
time.Sleep(time.Second)
println("worker", i, "is done")
}
func spawnGroup(f func(i int), num int, groupSingal <-chan signal) <-chan signal {
c := make(chan signal)
var wg sync.WaitGroup
for i := 0; i < num; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
<-groupSingal
println("worker", i, "start to work...")
f(i)
}(i + 1)
}
go func() {
wg.Wait()
c <- signal(struct{}{})
}()
return c
}
func main() {
groupSingal := make(chan signal)
c := spawnGroup(worker, 3, groupSingal)
time.Sleep(3 * time.Second)
println("all workers start to work")
close(groupSingal)
<-c
println("all workers are done")
}
上述例子,main goroutine 通过 close(groupSingal)
向 spawnGroup 函数中的多个 goroutine 发送信号,以触发它们的工作。就像起跑线上的运动员听到裁判员发出的发令枪一样。
用于替换锁机制
无缓存 channel 具有同步特性,这让它在某些场合可以代替锁,从而使得程序更加清晰。
我们来看一个例子:
type counter struct {
c chan int
i int
}
var cter counter
func InitCounter() {
cter = counter{c: make(chan int)}
go func() {
for {
cter.i++
cter.c <- cter.i
}
}()
println("counter is initialized")
}
func Increase() int {
return <-cter.c
}
func init() {
InitCounter()
}
func main() {
for i := 0; i < 10; i++ {
go func(i int) {
v := Increase()
println("goroutine", i, "increase to", v)
}(i)
}
time.Sleep(time.Second)
}
在这个实现中,我们使用无缓存 channel 来实现了一个计数器。计数器的值是通过 channel 来传递的,这样就避免了使用锁。
这种并发设计逻辑更符合 Go 语言所倡导的“不要通过共享内存来通信,而应该通过通信来共享内存”的原则。
带缓存 channel
带缓存的 channel 可以通过带有 capacity 参数的 make 函数创建,如:
c := make(chan T, 5) // 创建一个带缓存的 T 类型的 channel,缓存大小为 5,5 就是 capacity
一个带缓冲 channel,在缓冲区无数据或有数据但未满的情况下,对其进行发送操作的 goroutine 不会被阻塞; 在缓冲区已满的情况下,对其进行发送操作的 goroutine 会被阻塞; 在缓冲区无数据的情况下,对其进行接收操作的 goroutine 也会被阻塞;
两种 channel 的性能情况:
- 带缓存的 channel 的收发性能都要优于无缓存的 channel;
- 对于带缓存 channel 而言,选择适当容量会在一定程度上提升收发性能。
用作计数信号量
Go 并发设计的一个惯用法是将带缓存的 channel 作为信号量来使用。我们来看一个例子:
var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)
func main() {
go func() {
for i := 0; i < 8; i++ {
jobs <- i+1
}
close(jobs)
}()
var wg sync.WaitGroup
for j := range jobs {
wg.Add(1)
go func(j int) {
defer wg.Done()
active <- struct{}{}
println("handle job:", j)
time.Sleep(time.Second)
<-active
}(j)
}
wg.Wait()
}
上面的例子中创建了一组 goroutine 来处理 jobs,同一时间最多允许 3 个 goroutine 处理 jobs。 active channel 的容量为 3,这意味着只有 3 个 goroutine 可以同时向 active channel 发送数据,超过 3 个的 goroutine 会被阻塞。
len(channel)
len
是 Go 语言原生内置的函数,它可以接受数组、切片、map、channel 等类型的参数,并返回其长度或者元素个数。
- 如果是无缓冲的 channel,那么
len(c)
总是返回 0; - 如果是带缓冲的 channel,那么
len(c)
返回的是当前 channel 中未被读取的元素个数。
单接收多发送的场景:
for {
if len(c) > 0 {
i := <-c
// 处理 i
}
}
单发送多接收的场景:
for {
if len(c) < cap(c) {
c <- i
}
}
nil channel
nil channel 是指未初始化的 channel,它的值是 nil。对于 nil channel 的发送和接收操作都会永久阻塞。
func main() {
var c chan int
c <- 1 // fatal error: all goroutines are asleep - deadlock!
<-c // fatal error: all goroutines are asleep - deadlock!
}
我们来看一个例子:
func main() {
c1, c2 := make(chan int), make(chan int)
go func() {
time.Sleep(time.Second)
c1 <- 5
close(c1)
}()
go func () {
time.Sleep(2 * time.Second)
c2 <- 10
close(c2)
}()
for {
select {
case x, ok := <-c1:
if !ok {
c1 = nil
} else {
println("c1:", x)
}
case x, ok := <-c2:
if !ok {
c2 = nil
} else {
println("c2:", x)
}
}
if c1 == nil && c2 == nil {
break
}
}
println("all done")
}
上述例子中,我们通过将 channel 置为 nil 来标记其关闭,从而避免了对已关闭的 channel 进行接收操作。
与 select 结合使用
利用 default 分支避免阻塞
// src/time/sleep.go
func sendTime(c interface{}, seq uintptr) {
// 无阻塞地向 c 发送当前时间
select {
case c.(chan<- Time) <- Now():
default:
}
}
实现超时控制
func main() {
c := make(chan int)
select {
case <-c:
// ...
case <-time.After(time.Second):
println("timeout")
}
}
实现心跳机制
func worker() {
heartbeat := time.NewTicker(30 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-heartbeat.C:
println("heartbeat")
}
}
}