# 1. 核心数据结构
# 1.1 hchan
type hchan struct { | |
// 当前队列中的元素数量 | |
qcount uint // total data in the queue | |
// 环形队列的大小 | |
dataqsiz uint // size of the circular queue | |
// 指向存储元素的数组 | |
buf unsafe.Pointer // points to an array of dataqsiz elements | |
// 元素大小 | |
elemsize uint16 | |
//channel 是否已关闭,只能被关闭一次,重复关闭会 panic | |
closed uint32 | |
// 关联的定时器 | |
timer *timer // timer feeding this chan | |
// 元素类型 | |
elemtype *_type // element type | |
// 发送索引 | |
sendx uint // send index | |
// 接收索引 | |
recvx uint // receive index | |
// 等待接收的协程队列 | |
recvq waitq // list of recv waiters | |
// 等待发送的协程队列 | |
sendq waitq // list of send waiters | |
// lock protects all fields in hchan, as well as several | |
// fields in sudogs blocked on this channel. | |
// | |
// Do not change another G's status while holding this lock | |
// (in particular, do not ready a G), as this can deadlock | |
// with stack shrinking. | |
// 互斥锁,保护结构体并发访问安全 | |
lock mutex | |
} |
# 1.2 waitq
waitq 为阻塞的协程队列
type waitq struct { | |
first *sudog // 队列头部 | |
last *sudog // 队列尾部 | |
} |
# 1.3 sudog
sudog 用于包装协程的节点
//sudo 代表一个在等待队列中的 goroutine | |
// 它是 goroutine 和 同步对象(channel)之间的中间层 | |
type sudog struct { | |
//goroutine 协程,指向此 sudog 对应的 goroutine | |
// 当 goroutine 等待 channel 操作时,会被封装成 sudog | |
g *g | |
//next 和 prev 形成双向链表,用于将 sudog 串联成等待队列 | |
next *sudog | |
prev *sudog | |
//elem 指向通信数据的指针 | |
// 对于 channel 发送,指向要发送的数据 | |
// 对于 channel 接收,指向接收到的数据的位置 | |
elem unsafe.Pointer | |
//acquiretime 记录 goroutine 开始等待的时间 | |
// 用于监控和调试目的 | |
acquiretime int64 | |
//releasetime 记录 goroutine 结束等待的时间 | |
// 用于监控和调试目的 | |
releasetime int64 | |
// 用于休眠和唤醒机制 | |
ticket uint32 | |
//isSelect 表示当前 这个 sudog 是否是 select 语句的一部分 | |
// 在 select 场景下需要特殊出处理 | |
isSelect bool | |
//success 表示当前 goroutine 是否已经成功完成 channel 操作 | |
// 在 select 场景下需要特殊处理 | |
success bool | |
// 记录了有多少个 goroutine 正在等待 | |
// 用于管理等待计数 | |
waiters uint16 | |
// 以下字段用于管理信号量(semaphore)的等待树结构 | |
//parent、waitlink 和 waittail 构成二叉树,用于高效管理等待队列 | |
parent *sudog // semaRoot binary tree | |
waitlink *sudog // g.waiting list or semaRoot | |
waittail *sudog // semaRoot | |
//c 指向与此 sudog 关联的 channel | |
//sudog 可能会被放入 channel 的等待队列中 | |
c *hchan // channel | |
} |
# 2. 构造器函数
const ( | |
// 用来设置内存最大对齐值,对应就是 64 位系统下 Cache line 的大小。当结构体是 8 字节对齐时候,能避免 false share,提高读写速度。 | |
maxAlign = 8 | |
//hchanSize 用来计算 chan 大小,unsafe.Sizeof (hchan {}) + uintptr (-int (unsafe.Sizeof (hchan {}))&(maxAlign-1)) 用来计算离 unsafe.Sizeof (hchan {}) 最近的 8 的倍数。假设 hchan {} 大小为 13,则 hchanSize = 16 | |
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) | |
..... | |
) |
//t: channel 的类型信息 | |
//size: channel 的缓冲区大小 | |
func makechan(t *chantype, size int) *hchan { | |
// 获取 channel 中的元素的类型信息 | |
elem := t.Elem | |
// 检查元素大小是否超过限制(64KB) | |
if elem.Size_ >= 1<<16 { | |
throw("makechan: invalid channel element type") | |
} | |
// 检查内存对齐是否正确 | |
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { | |
throw("makechan: bad alignment") | |
} | |
// 计算需要分配的内存大小 | |
//mem = 元素大小 * 缓冲区大小 | |
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) | |
// 检查是否内存溢出或超过最大分配限制 | |
if overflow || mem > maxAlloc-hchanSize || size < 0 { | |
panic(plainError("makechan: size out of range")) | |
} | |
var c *hchan | |
switch { | |
case mem == 0: | |
// 处理无缓冲 channel 或元素大小为 0 的情况 | |
// 只需分配 hchan 结构体和缓冲区的空间 | |
// Queue or element size is zero. | |
c = (*hchan)(mallocgc(hchanSize, nil, true)) | |
// Race detector uses this location for synchronization. | |
c.buf = c.raceaddr() | |
case !elem.Pointers(): | |
// 元素类型不包含指针 | |
// 可以一次性分配 hchan 机构提和缓冲区的空间 | |
c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) | |
// 缓冲区紧跟在 hchan 结构体后面 | |
c.buf = add(unsafe.Pointer(c), hchanSize) | |
default: | |
// 元素类型包含指针 | |
// 需要分别分配 hchan 和缓冲区 | |
// 这样可以使 GC 更好的处理指针类型 | |
c = new(hchan) | |
c.buf = mallocgc(mem, elem, true) | |
} | |
// 初始化 channel 的基本属性 | |
c.elemsize = uint16(elem.Size_) // 元素大小 | |
c.elemtype = elem // 元素类型 | |
c.dataqsiz = uint(size) // 缓冲区大小 | |
// 如果当前 goroutine 属于某个 syncGroup,则设置标志位 | |
if getg().syncGroup != nil { | |
c.synctest = true | |
} | |
// 初始化互斥锁 | |
lockInit(&c.lock, lockRankHchan) | |
if debugChan { | |
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") | |
} | |
return c | |
} |
- 判断申请内存空间大小是否越界,mem 大小为 element 类型大小与 element 个数相乘后得到,仅当无缓冲型 channel 时,因个数为 0 导致大小为 0
- 根据类型初始化 channel,分为无缓冲型、有缓冲元素为 struct 型、有缓冲元素为 pointer 型 channel
- 如果无缓冲型,则仅申请一个大小默认值 96 的空间
- 如果有缓冲的 struct 型,则一次性分配好 96 + mem 大小的空间,并调整 chan 的 buf 执行 mem 的起始位置
- 如果有缓冲的 pointer 型,则分别申请 chan 和 buf 的空间,两者无需连续
- 对 channel 的其余字段进行初始化,包括元素类型大小、元素类型、容量以及锁的初始化
# 3. 写流程

# 3.1 两类异常情况
- 对于未初始化的 chan,写入操作会引发死锁
- 对于已关闭的 chan,写入操作会引发 panic
func chansend1(c *hchan, elem unsafe.Pointer) { | |
chansend(c, elem, true, sys.GetCallerPC()) | |
} | |
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { | |
//channel 为空 | |
if c == nil { | |
if !block { | |
return false | |
} | |
// 阻塞操作,将当前 goroutine 永久挂起 | |
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) | |
throw("unreachable") | |
} | |
// !block:当前为非阻塞发送(如 select 中的 default 分支) | |
//c.closed == 0:通道未关闭 | |
//full (c):通道已满(根据缓冲区大小和当前元素判断) | |
// 如果三个条件同时成立,说明无法进行非阻塞发送,直接返回 false | |
if !block && c.closed == 0 && full(c) { | |
return false | |
} | |
// 加锁保护 channel 操作 | |
lock(&c.lock) | |
// 检查 channel 是否已关闭 | |
if c.closed != 0 { | |
unlock(&c.lock) | |
panic(plainError("send on closed channel")) | |
} | |
// 从阻塞的协程队列中取出一个 goroutine 的封装对象 sudog | |
if sg := c.recvq.dequeue(); sg != nil { | |
// 在 send 方法中,会基于 memmove 方法,直接将元素拷贝交给 sudog 对应的 goroutine | |
// 在 send 方法中完成解锁动作 | |
send(c, sg, ep, func() { unlock(&c.lock) }, 3) | |
return true | |
} | |
// 如果当前队列中的元素 小于 环形队列的大小,则说明缓冲区还有空间,直接将元素放入缓冲区 | |
// 无阻塞读协程 | |
if c.qcount < c.dataqsiz { | |
// 获取发送位置的缓冲区指针 | |
qp := chanbuf(c, c.sendx) | |
if raceenabled { | |
racenotify(c, c.sendx, nil) | |
} | |
// 建数据复制到缓冲区 | |
typedmemmove(c.elemtype, qp, ep) | |
// 更新发送索引 | |
c.sendx++ | |
if c.sendx == c.dataqsiz { | |
c.sendx = 0 // 环形缓冲区,索引归零 | |
} | |
c.qcount++ | |
unlock(&c.lock) | |
return true | |
} | |
// 非阻塞操作且无法立即发送 | |
if !block { | |
unlock(&c.lock) | |
return false | |
} | |
// Block on the channel. Some receiver will complete our operation for us. | |
// 无阻塞读协程,环形缓冲区满了 | |
// 获取当前 goroutine,封装到 sudog 对象 | |
gp := getg() | |
mysg := acquireSudog() // 获取 sudog 用于等待队列 | |
// 设置等待时间相关字段 | |
mysg.releasetime = 0 | |
if t0 != 0 { | |
mysg.releasetime = -1 | |
} | |
// 设置 sudog 的相关字段 | |
mysg.elem = ep | |
mysg.waitlink = nil | |
mysg.g = gp | |
mysg.isSelect = false | |
mysg.c = c | |
gp.waiting = mysg | |
gp.param = nil | |
// 将当前 goroutine 加入发送等待队列 (将 sudog 添加到当前 channel 的阻塞写协程队列中) | |
c.sendq.enqueue(mysg) | |
// 标记 goroutine 加入发送等待队列 | |
gp.parkingOnChan.Store(true) | |
// 将当前 goroutine 挂起,等待被唤醒 | |
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) | |
// 确保发送的值在接受者复制完成前不会被垃圾回收 | |
KeepAlive(ep) | |
// 被唤醒后的处理 | |
if mysg != gp.waiting { | |
throw("G waiting list is corrupted") | |
} | |
// 清理状态 | |
gp.waiting = nil | |
gp.activeStackChans = false | |
closed := !mysg.success | |
gp.param = nil | |
if mysg.releasetime > 0 { | |
blockevent(mysg.releasetime-t0, 2) | |
} | |
mysg.c = nil | |
releaseSudog(mysg) | |
// 检查是否因为 channel 关闭而被唤醒 | |
if closed { | |
if c.closed == 0 { | |
throw("chansend: spurious wakeup") | |
} | |
panic(plainError("send on closed channel")) | |
} | |
return true | |
} |
# 4. 读流程

// 从 channel 接收数据,并将数据写入 ep 指向的内存 | |
//ep 可以为 nil,此时接收的数据会被丢弃 | |
// 返回值 (selected, receiver): | |
//- 如果 block = false,且没有可用数据,返回 (false, false) | |
//- 如果 channel 已关闭,将 ep 清零并返回 (true, false) | |
//- 如果 成功接收数据,将数据写入 ep,并返回 (true, true) | |
//ep 必须指向堆内存或调用者的栈空间 | |
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { | |
// 空 channel | |
if c == nil { | |
// 非阻塞 channel,直接返回 | |
if !block { | |
return | |
} | |
// 阻塞 channel,挂起,死锁,并抛出异常 | |
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) | |
throw("unreachable") | |
} | |
// 处理定时器 | |
if c.timer != nil { | |
c.timer.maybeRunChan() | |
} | |
// 快速路径:检查非阻塞接收操作是否可以立即完成,而无需获取锁,避免了加锁,从而提升性能 | |
// 如果是非阻塞操作,并且 channel 为空 | |
if !block && empty(c) { | |
// 再次检查,确保 channel 没有被关闭 | |
if atomic.Load(&c.closed) == 0 { | |
return //channel 未关闭且为空,直接返回 | |
} | |
// 如果 channel 被关闭了,再调用一次 empty (c) 确认是否仍为空 | |
// 因为在上述检查期间可能有新数据到达 | |
if empty(c) { | |
// The channel is irreversibly closed and empty. | |
if raceenabled { | |
raceacquire(c.raceaddr()) | |
} | |
if ep != nil { | |
typedmemclr(c.elemtype, ep) // 清空接收变量 | |
} | |
// 如果确实为空,说明这是一个已关闭且无数据的 channel | |
// 这时即使没有数据可接收,也会返回 (true,false),表示成功接收了一个 “零值” | |
// 如果不为空,据需执行后续逻辑从 buffer 中读取数据 | |
return true, false | |
} | |
} | |
// 性能分析相关 | |
var t0 int64 | |
if blockprofilerate > 0 { | |
t0 = cputicks() | |
} | |
// 加锁进行接收操作 | |
lock(&c.lock) | |
// 处理已关闭的 channel | |
//channel 已关闭并且内部无元素,直接解锁并返回即可 | |
if c.closed != 0 { | |
if c.qcount == 0 { // 已经关闭且没有数据 | |
if raceenabled { | |
raceacquire(c.raceaddr()) | |
} | |
unlock(&c.lock) | |
if ep != nil { | |
typedmemclr(c.elemtype, ep) // 清空接收变量的内存 | |
} | |
return true, false // 返回 true 表示选中,false 表示未接收到数据(因为 channel 已关闭) | |
} | |
//channel 已经关闭,但缓冲区仍有数据 | |
} else { | |
//channel 未关闭,并且有正在等待发送的写 Goroutine, | |
if sg := c.sendq.dequeue(); sg != nil { | |
// 找到等待的发送者,进行接收操作 | |
// 倘若 channel 无缓冲区,则直接读取写协程元素,并唤醒写协程 | |
// 倘若 channel 有缓冲区,则读取缓冲区头部元素,并将写写成元素写入缓冲区的尾部后唤醒写协程 | |
// 接受数据到 ep,并唤醒发送方 Goroutine | |
// 解锁并返回 | |
recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 直接从发送者接收数据 | |
return true, true // 成功接收数据,返回 true,true | |
} | |
} | |
// 读时,无阻塞写协程,并且缓冲区有元素 | |
if c.qcount > 0 { // 从缓冲区接收数据 | |
// 直接从队列中接收 | |
// 获取到 recvx 对应位置的元素 | |
qp := chanbuf(c, c.recvx) | |
if raceenabled { | |
racenotify(c, c.recvx, nil) | |
} | |
if ep != nil { | |
typedmemmove(c.elemtype, ep, qp) // 复制数据 | |
} | |
typedmemclr(c.elemtype, qp) // 清空缓冲区 | |
// 更新接收索引 | |
c.recvx++ | |
if c.recvx == c.dataqsiz { | |
c.recvx = 0 // 环形缓冲区,索引为零 | |
} | |
c.qcount-- | |
unlock(&c.lock) | |
return true, true | |
} | |
// 非阻塞模式且无数据可接收 | |
if !block { | |
unlock(&c.lock) | |
return false, false | |
} | |
// 无可用的发送者,阻塞在该 channel 上,缓冲区无元素 | |
gp := getg() // 获取当前 goroutine | |
mysg := acquireSudog() // 获取一个 sudog 结构,用于等待队列 | |
// 设置等待时间 | |
mysg.releasetime = 0 | |
if t0 != 0 { | |
mysg.releasetime = -1 | |
} | |
// 设置 sudog 相关字段 | |
// 存储接收的数据指针 | |
mysg.elem = ep | |
mysg.waitlink = nil | |
gp.waiting = mysg | |
mysg.g = gp | |
mysg.isSelect = false | |
mysg.c = c | |
gp.param = nil | |
// 将 sudog 添加到当前 channel 的阻塞读协程队列中 | |
c.recvq.enqueue(mysg) | |
if c.timer != nil { | |
blockTimerChan(c) | |
} | |
// 标记 goroutine 即将在 channel 上阻塞 | |
gp.parkingOnChan.Store(true) | |
// 挂起当前 goroutine | |
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) | |
// 唤醒后,获取当前协程 | |
if mysg != gp.waiting { | |
throw("G waiting list is corrupted") | |
} | |
if c.timer != nil { | |
unblockTimerChan(c) | |
} | |
// 清理状态 | |
gp.waiting = nil | |
gp.activeStackChans = false // 停止标记栈正在被缩容 | |
if mysg.releasetime > 0 { | |
blockevent(mysg.releasetime-t0, 2) | |
} | |
success := mysg.success // 是否成功接收到数据 | |
gp.param = nil | |
mysg.c = nil | |
releaseSudog(mysg) // 释放 sudog | |
return true, success // 返回接收操作结果 | |
} |
# 5. 阻塞模式 与 非阻塞模式
以上分析流程,均是以阻塞模式为主线进行分析,忽略非阻塞模式有关处理逻辑
Go 运行时通过以 block 标识 “这个操作是否允许阻塞”,当 block==false 时,就是非阻塞路径 -- 运行时会做快速检测,不嫩恶搞立即完成就立即返回(不挂起、不入队);能立即完成就返回成功;只有允许阻塞的情况下才会把 goroutine 排队并挂起。
- 在运行时,发送 / 接收的内部实现函数是
chansend/chanrecv,他们都有一个block bool参数:true 表示可阻塞(会入队并 gopark),false 标识非阻塞,只做快路检测,不能立刻完成就返回。 - 编译器在遇到带 default 的 select 或做非阻塞尝试时,会以 block=false 的方式调用者和谐函数(或调用短小包装),因此变为非阻塞语义。select 的核心实现 selectgo 也接收 block 参数并在第一遍 “尝试不阻塞完成操作” 后在!block 情况下直接返回默认分支。
- chanrecv 的返回是两个布尔值
(selected, receiver):selected 表示该 case 是否被选中,received 表示是否真的接收到一个值(对于关闭通道则selected==true且received==false)。非阻塞 时不能完成则返回 (false,false)。
# 1. chansend 发送逻辑
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { | |
if c == nil { | |
if !block { | |
return false // [chan.go L169] 非阻塞 + nil 通道 => 直接失败 | |
} | |
gopark(...) // [L173] 阻塞 + nil 通道 => 永远阻塞 | |
} | |
if !block && c.closed == 0 && full(c) { | |
return false // [L200] 非阻塞 + 未关闭 + 满 => 快速返回 false | |
} | |
lock(&c.lock) | |
if c.closed != 0 { | |
unlock(&c.lock) | |
panic("send on closed channel") // [L220] 已关闭通道 => panic | |
} | |
if receiver := c.recvq.dequeue(); receiver != nil { | |
// [L240] 有等待接收者,直接配对发送,唤醒接收方 | |
unlock(&c.lock) | |
return true | |
} | |
if !c.full() { | |
// [L260] 缓冲未满,直接写入 | |
unlock(&c.lock) | |
return true | |
} | |
if !block { | |
unlock(&c.lock) | |
return false // [L270] 非阻塞 + 满 => 返回 false | |
} | |
// [L280+] 阻塞模式:入队,park 当前 goroutine 等待 | |
enqueue_sender_and_park(...) | |
} |
- 非阻塞
- nil 通道 -> false
- 缓冲满 -> false
- 否则尝试执行,如果通道已关闭 -> panic,否则成功
- 阻塞
- nil 通道 -> 永远阻塞
- 满时 -> 入队等待
# 2. chanrecv 接收逻辑
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { | |
if c == nil { | |
if !block { | |
return (false, false) // [L330] 非阻塞 + nil => 失败 | |
} | |
gopark(...) // [L334] 阻塞 + nil => 永远阻塞 | |
} | |
if !block && empty(c) { | |
if c.closed != 0 { | |
return (true, false) // [L350] 非阻塞 + 已关闭 + 空 => selected 成功,ok=false | |
} | |
return (false, false) // [L353] 非阻塞 + 空 => 失败 | |
} | |
lock(&c.lock) | |
if sender := c.sendq.dequeue(); sender != nil { | |
// [L370] 有等待发送者,直接配对 | |
unlock(&c.lock) | |
return (true, true) | |
} | |
if !empty(c) { | |
// [L390] 缓冲有数据,直接取出 | |
unlock(&c.lock) | |
return (true, true) | |
} | |
if c.closed != 0 { | |
unlock(&c.lock) | |
return (true, false) // [L400] 已关闭 + 空 => ok=false | |
} | |
if !block { | |
unlock(&c.lock) | |
return (false, false) // [L410] 非阻塞 + 空 => 失败 | |
} | |
// [L420+] 阻塞模式:入队,park 当前 goroutine 等待 | |
enqueue_receiver_and_park(...) | |
} |
- 非阻塞
nil->(false, false)- 缓冲空 + 未关闭 ->
(false, false) - 缓冲空 + 已关闭 ->
(true, false)(收到零值,ok=false) - 缓冲有数据 / 有等待发送者 ->
(true, true)
- 阻塞
- nil -> 永远阻塞
- 空 -> 入队等待
# 3. selectgo --select 的入口
func selectgo(cas0 *scase, order0 *uint16, ncases int, block bool) (int, bool) { | |
// 第一轮:非阻塞尝试 [L80-L120] | |
for each case { | |
if cas.kind == caseRecv { | |
if can_recv_now(...) { | |
return (index, true) // [L110] 立即完成 | |
} | |
} | |
if cas.kind == caseSend { | |
if can_send_now(...) { | |
return (index, true) // [L115] 立即完成 | |
} | |
} | |
} | |
if !block { | |
return (defaultIndex, false) // [L130] 非阻塞 + 无法完成 => default | |
} | |
// [L150+] 阻塞模式:随机化 case 顺序,入队,park 等待 | |
} |
- 先尝试一遍所有 case(非阻塞)
- 能立即完成就返回
- 如果
!block且都不能完成 -> 直接走default - 如果
block==true且都不能完成 -> 入队并阻塞等待
# 6. 关闭 channel

//closechan 关闭一个 channel,主要完成一下工作: | |
// 1. 设置 channel 的关闭标志 | |
// 2. 释放所有等待的接收者 (readers) | |
// 3. 释放所有等待的发送者 (writers) | |
// 4. 唤醒所有被阻塞的 goroutine | |
func closechan(c *hchan) { | |
// 检查 channel 是否为 nil | |
// 关闭一个 nil channel 会导致 panic | |
if c == nil { | |
panic(plainError("close of nil channel")) | |
} | |
// 加锁保护 channel 的并发访问 | |
lock(&c.lock) | |
// 检查 channel 是否已经关闭 | |
// 重复关闭同一个 channel 会导致 panic | |
if c.closed != 0 { | |
unlock(&c.lock) | |
panic(plainError("close of closed channel")) | |
} | |
// 设置关闭标志 | |
c.closed = 1 | |
// 创建一个 goroutine 列表,用于存储需要被唤醒的 goroutine | |
var glist gList | |
// 处理所有等待接收数据的 goroutine | |
// 将阻塞读协程中的协程结点统一添加到 glist | |
for { | |
// 从接收队列中取出一个 sudog (封装了等待的 goroutine) | |
sg := c.recvq.dequeue() | |
if sg == nil { | |
break // 队列已空 | |
} | |
// 如果接受者期望接收数据(elem 不为 nil) | |
// 则清空接受者的接收缓冲区(因为 channel 已关闭) | |
if sg.elem != nil { | |
typedmemclr(c.elemtype, sg.elem) | |
sg.elem = nil | |
} | |
// 设置释放时间 (用于 trace/debug) | |
if sg.releasetime != 0 { | |
sg.releasetime = cputicks() | |
} | |
// 获取 goroutine 并设置相关参数 | |
gp := sg.g | |
gp.param = unsafe.Pointer(sg) | |
// 设置接收未成功标志 | |
sg.success = false | |
if raceenabled { | |
raceacquireg(gp, c.raceaddr()) | |
} | |
// 将 goroutine 添加到等待唤醒列表 glist | |
glist.push(gp) | |
} | |
// 处理所有等待发送数据的 goroutine | |
// 这些 goroutine 被唤醒后会 panic,因为向已关闭的 channel 发送数据是不允许的 | |
for { | |
// 从发送队列中取出一个 sudog (封装了等待的 goroutine) | |
sg := c.sendq.dequeue() | |
if sg == nil { | |
break // 队列已空 | |
} | |
// 清空发送者的数据 | |
sg.elem = nil | |
// 设置释放时间 | |
if sg.releasetime != 0 { | |
sg.releasetime = cputicks() | |
} | |
// 获取 goroutine 并设置相关参数 | |
gp := sg.g | |
gp.param = unsafe.Pointer(sg) | |
// 设置发送未成功标志 | |
sg.success = false | |
if raceenabled { | |
raceacquireg(gp, c.raceaddr()) | |
} | |
// 将 goroutine 添加到等待唤醒列表 glist | |
glist.push(gp) | |
} | |
// 解锁 channel | |
unlock(&c.lock) | |
// 唤醒 glist 当中的所有协程 | |
// 这步操作在解锁后进行,避免死锁 | |
for !glist.empty() { | |
gp := glist.pop() | |
gp.schedlink = 0 | |
// 将 goroutine 标记未可运行状态 | |
goready(gp, 3) | |
} | |
} |
# 7. 总结
| 操作 | 空 Channel | 已关闭 Channel | 活跃 Channel |
|---|---|---|---|
| close(ch) | panic | panic | 成功关闭 |
| ch <- v | 永远阻塞 | panic | 成功发送或阻塞 |
| v,ok =<- ch | 永远阻塞 | 不阻塞 | 成功接收或阻塞 |