Golang通道

Posted by 淦 Blog on January 24, 2023

基本使用方式

通道是Go语言中的一等公民,将箭头(←)作为操作符进行通道的读取和写入。

1
2
c <- number
<- c

通道声明与初始化

chan作为Go语言中的类型,其最基本的声明方式如下:

1
var name chan T
  • name代表chan的名字,为用户自定义的;
  • chan T代表通道的类型,T代表通道中的元素类型。在声明时,channel必须与一个实际的类型T绑定在一起,代表通道中能够读取和传递的元素类型。
  • 通道的表示形式有如下有三种:chan T、chan←T、←chan T。不带“←”的通道可读可写,而带“←”的类型限制了通道的读写。例如,chan←float代表该通道只能写入浮点数,←chan string代表该通道只能读取字符串。

一个还未初始化的通道会被预置为nil,一个未初始化的通道在编译时和运行时并不会报错,不过,显然无法向通道中写入或读取任何数据。要对通道进行操作,需要使用make操作符,make会初始化通道,在内存中分配通道的空间。

1
var c = make(chan int)

通道写入数据

1
c <- 5

对于无缓冲通道,能够向通道写入数据的前提是必须有另一个协程在读取通道。否则,当前的协程会陷入休眠状态,直到能够向通道中成功写入数据。

无缓冲通道的读与写应该位于不同的协程中,否则,程序将陷入死锁的状态。 1.3通道读取数据 通道中读取数据可以直接使用←c,←c可以直接嵌套在程序中使用。

如果不能直接读取通道的数据,那么当前的读取协程将陷入堵塞,直到有协程写入通道为止。读取通道也可以有返回值,如下代码接收通道中的数据并赋值给data。

1
data := <- c

读取通道还有两种返回值的形式,借助编译时将该形式转换为不同的处理函数。第1个返回值仍然为通道读取到的数据,第2个返回值为布尔类型,返回值为false代表当前通道已经关闭。

1
data, ok := <- c

通道关闭

用到内置的close函数

1
close(c)

在正常读取的情况下,通道返回的ok为true。通道在关闭时仍然会返回,但是data为其类型的零值,ok也变为了false。和通道读取不同的是,不能向已经关闭的通道中写入数据。

1
2
3
4
5
var c = make(chan int, 1)
close(c)
data, ok := <-c
fmt.Println(data, ok) // 0, false
c <- 1            // panic: send on closed channel

通道关闭会通知所有正在读取通道的协程,相当于向所有读取协程中都写入了数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
	var c = make(chan int)
	go func() {
		data, ok := <-c
		fmt.Println(data, ok) // 0, false
	}()
	go func() {
		data, ok := <-c
		fmt.Println(data, ok) // 0, false
	}()
	close(c)
	time.Sleep(1 * time.Second)
}

如果读取通道是一个循环操作,关闭通道并不能终止循环,依然会收到一个永无休止的 零值序列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
	var c = make(chan int)
	go func() {
		for {
			data, ok := <-c
			fmt.Println(data, ok)
			// 0, false
			// 0, false
			// 0, false
			// ...
		}
	}()
	close(c)
	time.Sleep(10 * time.Second)
}

在实践中会通过第二个返回的布尔值来判断通道是否已经关闭,如果已经关闭,那么退出循环是一种比较常见的操作。

1
2
3
4
5
6
7
8
go func() {
	for {
		data, ok := <-c
		if !ok {
			break
		}
	}
}()

注:

  • 重复关闭一个channel将导致panic异常,
  • 关闭一个nil值的channel也将导致panic异常。

在实践中,并不需要关心是否所有的通道都已关闭,当通道没有被引用时将被Go语言的垃圾自动回收器回收。

通道作为参数和返回值

通道是Go语言中的引用类型而不是值类型,因此传递到其他协程中的通道,实际引用了同一个通道。

单方向通道

一般来说,一个协程在大多数情况下只会读取或者写入通道,为了表达这种语义并防止通道被误用,Go语言的类型系统提供了单方向的通道类型。 普通的通道具有读和写的功能,普通的通道类型能够隐式地转换为单通道的类型。反之,单通道的类型不能转换为普通的通道类型。

select多路复用

在实践中使用通道时,更多的时候会与select结合,因为时常会出现多个通道与多个协程进行通信的情况,我们当然不希望由于一个通道的读写陷入堵塞,影响其他通道的正常读写。select正是为了解决这一问题诞生的,select赋予了Go语言更加强大的功能。

1
2
3
4
5
6
select {
case <-ch1:
case x := <-ch2:
case ch3 <- 3:
default:
}

每个case语句都必须对应通道的读写操作。select语句会陷入堵塞,直到一个或多个通道能够正常读写才恢复。

随机选择机制

当多个通道同时准备好执行读写操作时,select的选择具有一定的随机性。

1
2
3
4
5
6
7
8
9
10
func main() {
	c := make(chan int, 1)
	c <- 1
	select {
	case <-c:
		fmt.Println("random 01")
	case <-c:
		fmt.Println("random 02")
	}
}

当多次执行程序时会发现,程序有时会输出random 01,有时会输出random 02。

堵塞与控制

如果select中没有任何的通道准备好,那么当前select所在的协程会永远陷入等待,直到有一个case中的通道准备好为止。

在实践中,为了避免这种情况发生,有时会加上default分支。default分支的作用是当所有的通道都陷入堵塞时,正常执行default分支。

与定时器或者超时器配套使用,←time.After(800*time.Millisecond)调用了time包的After函数,其返回一个通道800ms后会向当前通道发送消息,可以通过这种方式完成超时控制。

1
2
3
4
5
6
7
8
9
func main() {
	c := make(chan int)
	select {
	case <-c:
		fmt.Println("random 01")
	case <-time.After(800 * time.Millisecond):
		fmt.Println("timeout")
	}
}

循环

将for与select进行组合,循环往复执行select中的内容,另外可以向select中加入一些定时任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func main() {
	c := make(chan int)
	tick := time.Tick(time.Second)
	for {
		select {
		case <-c:
			fmt.Println("random 01")
		case <-tick:
			fmt.Println("tick")
		case <-time.After(800 * time.Millisecond):
			fmt.Println("timeout")
		}
	}
}

定时器time.Tick与time.After是有本质不同的。time.After并不会定时发送数据到通道中,而只是在时间到了后发送一次数据。当其放入for+select后,新一轮的select语句会重置 time.After,这意味着第2次select语句依然需要等待800ms才执行超时。

如果在800ms之前,其他的通道就已经执行好了,那么time.After的case将永远得不到执行。而定时器tick不同,由于tick在for循环的外部,因此其不重置,只会累积时间,实现定时执行任务的功能。

select与nil

一个为nil的通道,不管是读取还是写入都将陷入堵塞状态。当select语句的case对nil通道进行操作时,case分支将永远得不到执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {
	a := make(chan int)
	b := make(chan int)
	go func() {
		for i := 0; i < 2; i++ {
			select {
			case a <- 1:
				a = nil
			case b <- 2:
				b = nil
			}
		}
	}()
	fmt.Println(<-a) // 1
	fmt.Println(<-b) // 2
}

通道底层原理

通道结构与环形队列

通道在运行时是一个特殊的hchan结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
type hchan struct {
	qcount   uint           // 通道队列中的数据当前个数
	dataqsiz uint           // 通道队列中的数据容量
	buf      unsafe.Pointer // 存放实际数据的指针
	elemsize uint16         // 通道类型大小
	closed   uint32         // 通道是否关闭
	elemtype *_type         // 通道类型
	sendx    uint           // 记录发送者在buf中的序号
	recvx    uint           // 记录接受者在buf中的序号
	recvq    waitq          // 读取的阻塞协程队列
	sendq    waitq          // 写入的阻塞协程队列
	lock     mutex          // 锁,并发保护
}

对于有缓存的通道,存储在buf中的数据虽然是线性的数组,但是用数组和序号recvx、recvq模拟了一个环形队列。recvx可以找到从buf哪个位置获取通道中的元素,而sendx能够找到写入时放入buf的位置,这样做主要是为了重用已经使用过的空间。recvx到sendx的距离代表通道队列中的元素数量。

当到达循环队列的末尾时,sendx会置为0,以确保其下一次写入0号位置,开始循环利用空间。这同样意味着,当前的通道中只能放入指定大小的数据。当通道中的数据满了后,再次写入数据将陷入等待,直到第0号位置被取出后,才能继续写入。

通道初始化

通道的初始化在运行时调用了makechan函数,第1个参数代表通道的类型,第2个参数代表通道中数据的容量。

  • 当分配的通道中数据的容量为0时,只用在内存中分配hchan结构体的大小即可。
  • 当通道的元素中不包含指针时,连续分配hchan结构体大小+size元素大小。
  • 当通道的元素中包含指针时,需要单独分配内存空间,因为当元素中包含指针时,需要单独分配空间才能正常进行垃圾回收。

通道写入原理

有正在等待的读取协程

通道hchan结构中的recvq字段存储了正在等待的协程链表,每个协程对应一个sudog结构,它是对协程的封装,包含了准备获取的协程中的元素指针等。 当有读取的协程正在等待时,直接从等待的读取协程链表中获取第1个协程,并将元素直接复制到对应的协程中,再唤醒被堵塞的协程。

缓冲区有空余

如果队列中没有正在等待的协程,但是该通道是带缓冲区的,并且当前缓冲区没有满,则向当前缓冲区中写入当前元素。

缓冲区无空余

如果当前通道无缓冲区或者当前缓冲区已经满了,则代表当前协程的sudog结构需要放入sendq链表末尾中,并且当前协程陷入休眠状态,等待被唤醒重新执行。 img.png

通道读取原理

有正在等待的写入协程

当有正在等待的写入协程时,直接从等待的写入协程链表中获取第1个协程,并将写入的元素直接复制到当前协程中,再唤醒被堵塞的写入协程。

缓冲区有元素

如果队列中没有正在等待的写入协程,但是该通道是带缓冲区的,并且当前缓冲区中有数据,则读取该缓冲区中的数据,并写入当前的读取协程中。当前协程将不需要陷入休眠

缓冲区无元素

如果当前通道无缓冲区或者当前缓冲区已经空了,则代表当前协程的sudog结构需要放入recvq链表末尾,并且当前协程陷入休眠状态,等待被唤醒重新执行。

select底层原理

当select中只有一个控制通道的case语句时,和普通的通道操作是等价的。

1
2
3
4
5
select {
case v := <ch:
}

v := <-ch

select语句拥有多个控制通道的case语句时,每个case在运行时都是一个scase结构体。

1
2
3
4
type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // data element
}

通过pollorder进行乱序scase序列,其通过引入随机数的方式给序列带来了随机性。

一轮循环

当对所有scase中的通道加锁完毕后,开始一轮对于所有scase的循环。循环的目的是找到当前准备好的通道。如果有,则根据具体的情况执行,主要是将元素写入或读取到当前的协程中,解锁所有的通道,并立即返回。

二轮循环

当select完成一轮循环不能直接退出时,意味着当前的协程需要进入休眠状态并等待select中至少有一个通道被唤醒。不管是读取通道还是写入通道都需要创建一个新的sudog并将其放入指定通道的等待队列,之后当前协程将进入休眠状态。

当select case中的任意一个通道不再阻塞时,当前协程将被唤醒。要注意的是,最后需要将sudog结构体在其他通道的等待队列中出栈,因为当前协程已经能够正常运行,不再需要被其他通道唤醒。