Go 语言
并发编程
channel 的用法

channel 的用法

channel 是 Go 语言提供的一种重要的并发原语。它在 CSP 模型中扮演着重要的角色:既可以实现 goroutine 间的通信,又可以实现 goroutine 间的同步。

channel 类型在 Go 中为一等公民,可以像使用变量那样使用 channel。如:

c := make(chan int)     // 创建一个无缓冲(unbuffered)的 int 类型的 channel
c := make(chan int, 5)  // 创建一个带缓冲的 int 类型的 channel
c < - 1                 // 向 channel c 发送一个整数 1
x := <- c               // 从 channel c 接收一个整数,并赋值给 x
x, ok := <- c           // 从 channel c 接收一个整数,并赋值给 x,同时 ok 为 true 表示接收成功
for i := range c {      // 从 channel c 中接收值,直到 channel 被关闭
    fmt.Println(i)
}
close(c)                // 关闭 channel c
 
c := make(chan chan int) // 创建一个无缓存 channel,用于传递 int 类型的 channel
func stream(ctx context.Context, out chan<- Value) error // 将只发发送(send-only)channel 作为函数参数
func spawn(...) <-chan T // 将只收接收(receive-only)channel 作为函数返回值

当需要同时对多个 channel 进行操作时,我们会结合使用 Go 为 CSP 模型提供的另一种原语 select。 通过 select,我们可以监听多个 channel 上的数据流动,直到其中一个 channel 准备好接收数据,就从中接收数据。

select {
case x := <-ch1:
// 如果 ch1 可读,则执行该 case
case y, ok := <-ch2:
// 如果 ch2 可读,则执行该 case
case c3 <- z:
// 如果 c3 可写,则执行该 case
default:
// 如果没有 case 可执行,则执行 default
}

无缓存 channel

无缓存 channel 是指 channel 的大小为 0,即不存储任何数据。无缓存 channel 的发送和接收操作是阻塞的,直到发送方和接收方都准备好。

c := make(chan T) // 创建一个无缓存的 T 类型的 channel

由于无缓存 channel 的运行时层实现不带有缓冲区,因此对无缓存 channel 的发送和接收操作是同步的。 也就是说,只有在对其进行接收操作和对其进行发送操作的 goroutine 都存在的情况下,通信才能进行。 否则单方面的操作会让对应的 goroutine 阻塞。

用作信号传递

一对一通知信号

无缓冲 channel 常被用于两个 goroutine 之间一对一地传递通知信号,如:

type signal struct {}
 
func worker() {
    println("worker is working")
    time.Sleep(time.Second)
}
 
func spawn(f func()) <-chan signal {
    c := make(chan signal)
    go func() {
        println("woker start to work...")
        f()
        c <- signal(struct{}{})
    }()
    return c
}
 
func main() {
    c := spawn(worker)
    <-c
    println("worker is done")
}

上述例子,spawn 函数返回的 channel 用于承载新 goroutine 的结束信号,main 函数通过该 channel 等待 worker 函数的结束。

一对多通知信号

我们来看一个例子:

type signal struct{}
 
func worker(i int) {
	println("worker", i, "is working")
	time.Sleep(time.Second)
	println("worker", i, "is done")
}
 
func spawnGroup(f func(i int), num int, groupSingal <-chan signal) <-chan signal {
	c := make(chan signal)
	var wg sync.WaitGroup
 
	for i := 0; i < num; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
            <-groupSingal
			println("worker", i, "start to work...")
			f(i)
		}(i + 1)
	}
	go func() {
		wg.Wait()
		c <- signal(struct{}{})
	}()
 
	return c
}
 
func main() {
    groupSingal := make(chan signal)
	c := spawnGroup(worker, 3, groupSingal)
	time.Sleep(3 * time.Second)
	println("all workers start to work")
	close(groupSingal)
	<-c
	println("all workers are done")
}

上述例子,main goroutine 通过 close(groupSingal) 向 spawnGroup 函数中的多个 goroutine 发送信号,以触发它们的工作。就像起跑线上的运动员听到裁判员发出的发令枪一样。

用于替换锁机制

无缓存 channel 具有同步特性,这让它在某些场合可以代替锁,从而使得程序更加清晰。

我们来看一个例子:

type counter struct {
    c chan int
    i int
}
 
var cter counter
 
func InitCounter() {
    cter = counter{c: make(chan int)}
 
    go func() {
        for {
            cter.i++
            cter.c <- cter.i
        }
    }()
    println("counter is initialized")
}
 
func Increase() int {
    return <-cter.c
}
 
func init() {
	InitCounter()
}
 
func main() {
    for i := 0; i < 10; i++ {
        go func(i int) {
            v := Increase()
            println("goroutine", i, "increase to", v)
        }(i)
    }
    time.Sleep(time.Second)
}

在这个实现中,我们使用无缓存 channel 来实现了一个计数器。计数器的值是通过 channel 来传递的,这样就避免了使用锁。

这种并发设计逻辑更符合 Go 语言所倡导的“不要通过共享内存来通信,而应该通过通信来共享内存”的原则。

带缓存 channel

带缓存的 channel 可以通过带有 capacity 参数的 make 函数创建,如:

c := make(chan T, 5) // 创建一个带缓存的 T 类型的 channel,缓存大小为 5,5 就是 capacity

一个带缓冲 channel,在缓冲区无数据或有数据但未满的情况下,对其进行发送操作的 goroutine 不会被阻塞; 在缓冲区已满的情况下,对其进行发送操作的 goroutine 会被阻塞; 在缓冲区无数据的情况下,对其进行接收操作的 goroutine 也会被阻塞;

两种 channel 的性能情况:

  • 带缓存的 channel 的收发性能都要优于无缓存的 channel;
  • 对于带缓存 channel 而言,选择适当容量会在一定程度上提升收发性能。

用作计数信号量

Go 并发设计的一个惯用法是将带缓存的 channel 作为信号量来使用。我们来看一个例子:

var active = make(chan struct{}, 3)
var jobs = make(chan int, 10)
 
func main() {
    go func() {
        for i := 0; i < 8; i++ {
            jobs <- i+1
        }
        close(jobs)
    }()
 
    var wg sync.WaitGroup
 
    for j := range jobs {
        wg.Add(1)
        go func(j int) {
            defer wg.Done()
            active <- struct{}{}
            println("handle job:", j)
            time.Sleep(time.Second)
            <-active
        }(j)
    }
 
    wg.Wait()
}

上面的例子中创建了一组 goroutine 来处理 jobs,同一时间最多允许 3 个 goroutine 处理 jobs。 active channel 的容量为 3,这意味着只有 3 个 goroutine 可以同时向 active channel 发送数据,超过 3 个的 goroutine 会被阻塞。

len(channel)

len 是 Go 语言原生内置的函数,它可以接受数组、切片、map、channel 等类型的参数,并返回其长度或者元素个数。

  • 如果是无缓冲的 channel,那么 len(c) 总是返回 0;
  • 如果是带缓冲的 channel,那么 len(c) 返回的是当前 channel 中未被读取的元素个数。

单接收多发送的场景

for {
    if len(c) > 0 {
        i := <-c
        // 处理 i
    }
}

单发送多接收的场景

for {
    if len(c) < cap(c) {
        c <- i
    }
}

nil channel

nil channel 是指未初始化的 channel,它的值是 nil。对于 nil channel 的发送和接收操作都会永久阻塞。

func main() {
    var c chan int
    c <- 1 // fatal error: all goroutines are asleep - deadlock!
    <-c // fatal error: all goroutines are asleep - deadlock!
}

我们来看一个例子:

func main() {
    c1, c2 := make(chan int), make(chan int)
 
    go func() {
        time.Sleep(time.Second)
        c1 <- 5
        close(c1)
    }()
 
    go func () {
        time.Sleep(2 * time.Second)
        c2 <- 10
        close(c2)
    }()
 
    for {
        select {
        case x, ok := <-c1:
            if !ok {
                c1 = nil
            } else {
                println("c1:", x)
            }
        case x, ok := <-c2:
            if !ok {
                c2 = nil
            } else {
                println("c2:", x)
            }
        }
        if c1 == nil && c2 == nil {
            break
        }
    }
    println("all done")
}

上述例子中,我们通过将 channel 置为 nil 来标记其关闭,从而避免了对已关闭的 channel 进行接收操作。

与 select 结合使用

利用 default 分支避免阻塞

// src/time/sleep.go
func sendTime(c interface{}, seq uintptr) {
    // 无阻塞地向 c 发送当前时间
    select {
        case c.(chan<- Time) <- Now():
        default:
    }
}

实现超时控制

func main() {
    c := make(chan int)
    select {
    case <-c:
        // ...
    case <-time.After(time.Second):
        println("timeout")
    }
}

实现心跳机制

func worker() {
    heartbeat := time.NewTicker(30 * time.Second)
    defer heartbeat.Stop()
    for {
        select {
        case <-heartbeat.C:
            println("heartbeat")
        }
    }
}