中国建设银行网站忘记密码杭州工业设计公司
1、引言
Do not communicate by sharing memory; instead, share memory by communicating
Golang 的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel 进行 goroutine 之间的数据传递和同步,而不是通过共享变量(内存)来实现。
func write(chanInt chan int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}func TestCSP(t *testing.T) {chanInt := make(chan int, 10)chanExit := make(chan bool)go write(chanInt)go read(chanInt, chanExit)for {select {case _, ok := <-chanExit:if !ok {fmt.Println("done")return}}}}
 
如上述示例,write 函数负责写,read 函数负责读,chanInt 负责在两个 goroutine 进行数据同步,chanExit 负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。
接下来,来说说 channel 的特性,最后结合底层源码来加深印象。
2、特性
2.1 基本用法
由于 channel 是引用类型,需要用 make 来初始化
chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)
 
这里创建的是可读写的 channel,区别在于是否有 capacity(容量)
- 带缓冲区的 
channel,可以存储cap个数据 - 不带缓冲区的 
channel,一般用于同步 
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)
 
这里创建的是只写和只读的 channel,不过这样写意义不大,一般用于传参,接下来用这两个 chan 把引言示示例中关于 write 和 read 函数给改下
func write(chanInt chan<- int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt <-chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}
 
查看 channel 的长度和容量
func TestChanLenCAP(t *testing.T) {chanInt := make(chan int, 2)chanInt <- 1fmt.Println(len(chanInt)) // 1fmt.Println(cap(chanInt)) // 2
}
 
关闭 channel
close(ch)
 
判断 channel 是否已关闭
func TestChanIsClosed(t *testing.T) {chanInt := make(chan int, 10)close(chanInt)if _, ok := <-chanInt; !ok {fmt.Println("closed")}
}
 
向一个已关闭的 channel 读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel 是否已关闭。
2.2 异常情况
接下来看看几种需要注意的异常情况
注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。
2.2.1 给一个 nil channel发送数据,
 
func TestWriteNil(t *testing.T) {var chanInt chan intchanInt <- 1
}
 
由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 chanInt <- 1 这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
 
解法:channel 使用前需要使用 make 初始化。
2.2.2 从一个 nil channel 读数据
 
func TestReadNil(t *testing.T) {var chanInt chan int<-chanInt
}
 
由于 chanInt 还没初始化,值为 nil,此时代码会阻塞在 <-chanInt 这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
 
解法:channel 使用前需要使用 make 初始化。
2.2.3 关闭一个 nil channel
 
func TestCloseNil(t *testing.T) {var chanInt chan intclose(chanInt)
}
 
如果尝试关闭一个 nil 的 channel,会导致运行时错误 panic: close of nil channel。
panic: close of nil channel [recovered]panic: close of nil channel
 
解法:channel 使用前需要使用 make 初始化。
前三个异常说明,channel 使用前一定要使用 make 进行初始化。
2.2.4 向一个已关闭的 channel 发数据
 
func TestWriteClosed(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)chanNoBuffer <- 1
}
 
向一个已关闭的 channel 发送数据会引起 panic。
panic: send on closed channel [recovered]panic: send on closed channel
 
这是因为一旦 channel 被关闭,就不能再向其发送数据,但可以继续从中接收数据。
解法:判断 channel 是否已关闭。
2.2.5 向一个已关闭的 channel 发起重复关闭动作
 
func TestClosedOnceMore(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)close(chanNoBuffer)
}
 
尝试关闭一个已经关闭的 channel 会导致运行时错误 panic: close of closed channel。这个错误通常出现在多个 goroutine 试图关闭同一个 channel 或者代码逻辑不正确导致同一个 channel 被关闭多次。
panic: close of closed channel [recovered]panic: close of closed channel
 
解法:判断 channel 是否已关闭。
2.2.6 向没有缓冲区的 channel 写数据,但没有读取方
 
func TestSendNoBuffer(t *testing.T) {ch := make(chan int)ch <- 4
}
 
无缓冲的 channel 是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
 
解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。
2.2.7 向没有缓冲区的 channel 读取数据,但没有写入方
 
func TestReadNoBuffer(t *testing.T) {ch := make(chan int)<-ch
}
 
尝试从一个无缓冲的 channel 读取数据时,如果没有其他 goroutine 向该 channel 发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。
fatal error: all goroutines are asleep - deadlock!
 
解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。
2.2.8 无缓冲区 channel 的发送和接收操作没有同时进行
 
func ReadNoBufferChan(chanBool chan bool) {<-chanBool
}func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)ch <- truego ReadNoBufferChan(ch)time.Sleep(1 * time.Second)
}
 
上面两个异常一直强调,由于无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。代码执行到 ch <- chan 时,调度器发现没有任何 goroutine 接收,于是阻塞并死锁。
fatal error: all goroutines are asleep - deadlock!
 
解法:无缓冲 channel 是一种同步通信机制,需要发送和接收操作同时进行。
func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)go ReadNoBufferChan(ch)ch <- truetime.Sleep(1 * time.Second)
}
 
把 go ReadNoBufferChan(ch) 提前,这样就确保了在发送数据之前,有一个 goroutine 正在等待接收数据。
对于无缓冲的 channel
- 读取和写入要成对出现,并且不能在同一个 
goroutine里 - 使用 
for读取数据时,写入方需要关闭channel 
2.2.9 向有缓存区的 channel 先读数据
 
func TestWriteBufferChan(t *testing.T) {ch := make(chan int, 1)if _, ok := <-ch; !ok {fmt.Println("closed")}
}
 
当尝试从一个空的带缓冲的 channel 读取数据时,读取操作会阻塞,直到有数据被写入 channel。这是因为即使是带缓冲的 channel,也需要在读取数据时有数据可读。
带缓冲的 channel 和无缓冲的 channel 的主要区别在于:带缓冲的 channel 可以存储一定数量的数据,而无缓冲的 channel 则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine 试图从空的 channel 读取数据时,它会被阻塞,直到有其他 goroutine 写入数据。
fatal error: all goroutines are asleep - deadlock!
 
解法:需要在读取数据时有数据可读。
2.2.10 向有缓存区的 channel 写数据,但没有读取数据
 
func TestReadBufferChan(t *testing.T) {ch := make(chan int, 1)ch <- 1ch <- 2
}
 
当带缓冲的 channel 在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
 
解法:当带缓冲的 channel 在缓冲区满时,需要有读取方,或者增加缓冲区的大小。
注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。
其实,最后这两个带缓冲区 channel 异常情况总结就是
- 若在同一个 
goroutine里,写数据操作一定在读数据操作前 - 若 
channel空了,接收者会阻塞 - 若 
channel满了,发送者会阻塞 
3、底层实现
3.1 数据结构
Golang 的 channel 在运行时使用 runtime.hchan 结构体表示。
// runtime/chan.go
type hchan struct {qcount   uint           // 队列中的数据个数dataqsiz uint           // 环形缓冲区的大小buf      unsafe.Pointer // 环形缓冲区指针elemsize uint16         // 单个元素的大小closed   uint32         // 标志 channel 是否关闭elemtype *_type         // 元素的类型sendx    uint           // 发送操作的索引recvx    uint           // 接收操作的索引recvq    waitq          // 等待接收的 goroutine 队列sendq    waitq          // 等待发送的 goroutine 队列lock     mutex          // 保护 channel 的锁
}
 

先看看环形缓冲区相关的字段:
qcount: 当前缓冲区中的元素个数。dataqsiz: 环形缓冲区的容量。buf: 实际存储数据的缓冲区,类型为unsafe.Pointer(类似C语言的void *)。elemsize: 每个元素的大小。sendx: 环形缓冲区中下一个待写入的位置。recvx: 环形缓冲区中下一个待读取的位置。
再来看看发送和接收队列:
recvq: 等待接收的goroutine队列。sendq: 等待发送的goroutine队列。
这两个队列是通过 waitq 结构体来实现的,waitq 本质上是一个双向链表,链表中的每个节点是一个 sudog 结构体,sudog 代表一个等待中的 goroutine。
type waitq struct {first *sudoglast  *sudog
}
 
最后看看 lock 字段
lock锁用于保护channel数据结构的互斥锁。Golang使用自旋锁和互斥锁的结合来保证channel操作的线程安全。
3.2 初始化
func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}
 
这里主要说下 switch 相关的分支代码
- 第一个分支:如果 
channel的缓冲区大小是0(也就是创建无缓冲channel),或channel中的元素大小是0(如struct{}{},Golang中“空结构体”是不占内存的,size为0)时,调用mallocgc()在堆上为channel开辟一段大小为hchanSize的内存空间。- 这里说下 
c.buf = c.raceaddr(),c.raceaddr()会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如Golang的race detector)用于同步,这也是无缓冲区的channel用来做数据同步场景的由来。 
 - 这里说下 
 - 第二个分支:如果元素不包含指针时。调用 
mallocgc一次性分配hchan和buf的内存。 - 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数 
new/mallocgc。 
仔细看,三个分支都调用了 mallocgc 在堆上分配内存,也就说 channel 本身会被 GC 自动回收。
在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。
3.2 发送数据
// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed.  it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 为 nil 时处理if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 检查 channel 是否关闭if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 检查是否有等待接收的 goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 检查 channel 缓冲区是否有空位if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 非阻塞模式下if !block {unlock(&c.lock)return false}// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 将这个发送 g 从 Grunning -> Gwaiting// 进入休眠atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// 以下唤醒后需要执行的代码// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {// 唤醒后,发现 channel 被关闭了if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
 
代码比较长,可以分为两大部分:异常检测和发送数据
3.2.1 异常检测
代码一开始就排除了在异常章节中 nil channel 的情形,比如未初始化,或是被 GC 回收了。
接着会检测非阻塞模式下,也就是有缓冲区的 channel,如果还未 close 并且缓冲区已经满了,则直接返回 false。
func TestASyncSendFull(t *testing.T) {ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channelch <- 1                 // 向 channel 发送一个元素,此时缓冲区已满select {case ch <- 2: // 尝试发送第二个元素fmt.Println("Successfully sent 2")default: // 缓冲区已满,进入 default 分支fmt.Println("channel is full, unable to send 2")}
}
 
3.2.2 发送数据
发送数据可以归纳为以下三点
- 直接发送:当 
recvq存在等待的接收者时,那么通过runtime.send直接将数据发送给阻塞的接收者- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的 
goroutine标记成可运行状态grunnable并把该goroutine放到发送方所在的处理器的runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方; 
 - 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的 
 - 异步发送:当 
buf缓冲区存在空余空间时,将发送的数据写入channel的缓冲区; - 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他 
goroutine从channel接收数据;- 将当前 
goroutine加入sendq发送队列并挂起,阻塞等待其他的协程从channel接收数据; - 当唤醒后,检查是否因为 
channel关闭而唤醒,如果是则触发panic。 
 - 将当前 
 
发送数据的过程中包含几个会触发 goroutine 调度的时机:
- 发送数据时发现 
channel上存在等待接收数据的goroutine,立刻设置处理器的runnext属性,但是并不会立刻触发调度 - 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 
channel的sendq发送队列并调用runtime.goparkunlock触发goroutine的调度让出处理器的使用权; 
3.3 接收数据
// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞if c == nil {// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock.  This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 当前 channel 中没有数据可读if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// sender 队列中有 sudog 在等待// 直接从该 sudog 中获取数据拷贝到当前 g 即可// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}if c.qcount > 0 {// 直接从 buffer 里拷贝数据// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)// 接收索引 +1c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// buffer 元素计数 -1c.qcount--unlock(&c.lock)return true, true}// 非阻塞时,且无数据可收if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包成 sudogmysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 进入 recvq 队列c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us up// 被唤醒if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)// 如果 channel 未被关闭,那就是真的 recv 到数据了return true, success
}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from sender// 直接从发送者复制数据recvDirect(c.elemtype, sg, ep)}} else {// 缓冲区已满,从队列头部取出数据// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// 将数据从队列复制到接收者// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// 将数据从发送者复制到队列// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}
 
在 Golang 的 channel 中,有两种接收方式
num <- ch
num, ok <- ch
 
这两种分别对应上述源码中的 chanrecv1 和 chanrecv2,不过最终都会走到 chanrecv 函数。
3.3.1 异常检测
当我们从一个 nil channel 接收数据时(这里 nil 有可能是被 GC 回收导致的),若是非阻塞的 channel 会直接返回,否则会直接调用 runtime.gopark 让出处理器的使用权。
如果当前 channel 已经被 close 并且缓冲区中不存在任何数据,那么会清除 ep 指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel 读取数据而不会报错。
3.3.2 接收数据
从 channel 接收数据可以归纳为以下三种情况:
3.3.2.1 直接接收
当 sendq 发送队列存在等待的发送者时,通过 runtime.recv 从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv 函数
- 场景一
 
当 buf 缓冲区的容量 dataqsiz 为 0,也就是同步的 channel,调用 recvDirect 将 sendq 发送队列中 sudog 存储的 ep 数据直接拷贝到接收者的内存地址中。
- 场景二
 
当缓冲区已满时(会有两次内存的拷贝)
- 先取出 
buf缓冲区头部的数据发给接收者(第一次拷贝) - 接着取出 
sendq发送队列头的数据拷贝到buf缓冲区中,并释放一个sudog阻塞的goroutine(第二次拷贝) 
到这里获取有人会问,为什么不直接从 sendq 取出数据发给接收方,而是要从 buf 里取出发给接收方?
原因在于 Golang 在缓冲模式下,channel 的数据在缓冲区中按照 FIFO(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。
这里再说下场景二下关于 recvx 和 sendx 的更新机制。
- 缓冲区已满时的处理逻辑
 
当 buf 缓冲区满时,recvx 指向的是 buf 的头部位置,这也是下一个将要被接收的数据。注意此时 sendx 也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。
- 从缓冲区读取数据
 
此时从已满的 buf 缓冲区读取数据,接收者从缓冲区的头部位置 recvx 获取数据,并将数据传递给接收方。并更新 recvx,使其指向下一个将要被接收的数据位置。
- 将 
sendq拷贝到缓冲区 
由于此时 buf 头部的数据已经发送,那么则取出 sendq 头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx,使其和 recvx 保持一致,指向下一个要发送的位置。
这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
3.3.2.2 异步接收
当 buf 缓冲区的 qcount 大于 0 时,也就是带缓冲的 channel 有数据时,那么会从 buf 缓冲区中 recvx 的索引位置取出数据进行处理:
- 如果接收数据的内存地址不为空,那么会使用 
runtime.typedmemmove将缓冲区中的数据拷贝到内存中,并通过runtime.typedmemclr清除队列中的数据 - 最后更新 
channel上相关数据:recvx指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channel的qcount长度减一,并释放持有channel的锁 
3.3.2.3 阻塞接收
当不属于上述两种情况,即当 channel 的 sendq 发送队列中不存在等待的 goroutine 并且 buf 缓冲区中也不存在任何数据时,从 channel 中接收数据的操作会变成阻塞的。此时会将当前的goroutine 挂起并加入 channel 的接收队列 recvq,以便在有数据可用时能够被唤醒。
当然了,若是 goroutine 被唤醒后会完成 channel 的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel 的绑定并释放 sudog。
结合异常检测那一节,发现从 channel 接收数据时,会触发 goroutine 调度的两个时机:
- 当 
channel为nil时 - 当 
buf缓冲区中不存在数据并且也不存在数据的发送者时 
3.4 关闭管道
最后来看看关闭通道实现
func closechan(c *hchan) {// 关闭一个 nil channel 会直接 panicif c == nil {panic(plainError("close of nil channel"))}// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁lock(&c.lock)// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()// 弹出的 sudog 是 nil,说明读队列已经空了if sg == nil {break}// sg.elem unsafe.Pointer,指向 sudog 的数据元素// 该元素可能在堆上分配,也可能在栈上if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}// 将 goroutine 入 glist// 为最后将全部 goroutine 都 ready 做准备gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panicgp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0// 使 g 的状态切换到 Grunnablegoready(gp, 3)}
}
 
3.4.1 异常检测
- 关闭一个 
nil channel会直接panic - 在 
close channel时,如果channel已经关闭过了,直接触发panic 
3.4.2 释放所有接收方和发送方
关闭 channel 的主要工作是释放所有的 readers 和 writers。
主要就是取出 recvq 和 sendq 的 sudog 加入到 goroutine 待清除 glist 队列中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素。同时需要注意的是:在处理 sendq 时有可能会 panic,在之前的异常情况中列举往一个 close 的 channel 发送数据会引起 panic。
最后会为所有被阻塞的 goroutine 调用 runtime.goready 触发调度。将所有 glist 队列中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度。
3.4.3 优雅关闭通道
最后说说如何优雅关闭 channel。
通过之前的异常小节介绍,发现:
- 向已关闭的 
channel发送数据,会导致panic - 重复关闭 
channel,也会导致panic 
同时,还了解了:
- 从一个已关闭的 
channel中接收数据,会得到零值,且不会导致程序异常 - 关闭一个 
channel,那么所有接收这个channel的select case都会收到信号 
那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel 方法来收尾。
package _0240623import ("log""math/rand""strconv""sync""testing""time"
)func TesGracefullyCloseChannel(t *testing.T) {rand.Seed(time.Now().UnixNano()) // needed before Go 1.20log.SetFlags(0)// ...const Max = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh := make(chan int)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown// below, and its receivers are all senders// and receivers of dataCh.toStop := make(chan string, 1)// The channel toStop is used to notify the// moderator to close the additional signal// channel (stopCh). Its senders are any senders// and receivers of dataCh, and its receiver is// the moderator goroutine shown below.// It must be a buffered channel.var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// Here, the try-send operation is// to notify the moderator to close// the additional signal channel.select {case toStop <- "sender#" + id:default:}return}// The try-receive operation here is to// try to exit the sender goroutine as// early as possible. Try-receive and// try-send select blocks are specially// optimized by the standard Go// compiler, so they are very efficient.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and for ever in theory) if the send// to dataCh is also non-blocking. If// this is unacceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the// try-receive operation here is to// try to exit the receiver goroutine// as early as possible.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and forever in theory) if the receive// from dataCh is also non-blocking. If// this is not acceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase value := <-dataCh:if value == Max-1 {// Here, the same trick is// used to notify the moderator// to close the additional// signal channel.select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println("stopped by", stoppedBy)
}
 
这段代码的核心是这里
// moderator
go func() {stoppedBy = <-toStopclose(stopCh)
}()
 
对于生产者和消费者是 M*N 的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop,起个 goroutine 然后 stoppedBy = <-toStop 阻塞在这里,只要生产者和消费者一方满足条件,向 toStop 写入数据了,那么就可以关闭 stopCh。这也正好契合上面的 moderator 注释,一个 协调者,用来协调生产者和消费者在 M*N 情况下如何优雅关闭 channel。
