附录二
说明
之前在阅读整理golang网络库的时候,遇到了最核心的数据结构pollDesc,一直对该结构中的某些成员的含义不是十分清楚
type pollDesc struct {
link *pollDesc
lock mutex
fd uintptr
closing bool
seq uintptr
rg uintptr
rt timer
rd int64
wg uintptr
wt timer
wd int64
user uint32
}
不清楚的主要部分集中在rg和wg这两个成员。这两天花了点时间好好地将它梳理了下,终于明白了实现原理,特记录下来,也让自己可以好好思考下。
引子
我们前面文章在讲述golang网络连接的读写的时候说到:当操作系统的read/write返回EAGAIN时就会阻塞当前协程,直到该socket有可读写事件发生时,操作系统通知golang的runtime,runtime进而唤醒阻塞在该socket的读写协程。而协程阻塞和唤醒逻辑中主要使用了上面描述的pollDesc结构。
阻塞
我们以读过程为例,描述阻塞过程。 在每次Read真正读之前都会调用PrepareRead()来初始化一些变量:
func (pd *pollDesc) Prepare(mode int) error {
res := runtime_pollReset(pd.runtimeCtx, mode)
return convertErr(res)
}
func (pd *pollDesc) PrepareRead() error {
return pd.Prepare('r')
}
func net_runtime_pollReset(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
if mode == 'r' {
pd.rg = 0
} else if mode == 'w' {
pd.wg = 0
}
return 0
}
对于read来说,主要是将pd.rg设置为0。相当于初始状态。 接下来read如果返回了EAGAIN错误码,会进入协程阻塞函数:
func net_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
这里主要调用netpollblock来阻塞当前协程g,但是协程g可能有多种原因被唤醒:
- 有读写事件到来;
- 出现超时错误
- 其他
判断是否是正常返回()的根据是netpollblock的返回值,如果返回值为true,表示是有读写事件发生;返回值为false代表可能出现超时或者其他错误
因此实现尚我们利用一个for循环来阻塞,一旦g从netpollblock()中返回且返回值为false,我们检查是否是出现了超时错误,如果是这样,就返回给应用程序该错误,如果是其他错误,我们继续进入netpollblock阻塞当前协程。 如果netpollblock返回值是true,代表了g被io事件唤醒,接下来调用者可以继续读了。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// 首先将pd.rg设为pdWait
// 之所以需要使用for循环是因为
// casuintptr可能会失败
for {
old := *gpp
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("netpollblock: double wait")
}
// 如果成功,跳出循环
if casuintptr(gpp, 0, pdWait) {
break
}
}
// gopark会阻塞当前协程g
// gopark阻塞g之前,先调用了netpollblockcommit
// 该函数将pd.rg从pdWait变成g的地址
// casuintptr((*uintptr)(gpp), pdWait,
// uintptr(unsafe.Pointer(gp)))
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
}
// 之所以判断old > pdWait
// 是因为可能gopark压根就没有阻塞g成功
// 此时会立即返回至现在逻辑
// 被唤醒后查看pd.rg,此时应该是pdReady了
// 如果是被IO事件唤醒的话
old := xchguintptr(gpp, 0)
if old > pdWait {
throw("netpollblock: corrupted state")
}
// 如果阻塞的协程g返回且原因是pdReady,返回true
return old == pdReady
}
这个函数比较关键:当前协程g在这里陷入阻塞,阻塞之前先将pd.rg状态设置为pdWait,表示等待IO输入,这很好理解。
接下来在调用gopark时传入了函数指针netpollblockcommit,这个函数会在真正block前被调用,而该函数又将pd.rg设置为g的地址(具体请参考该函数实现),这个应该也不难理解:如果pd不记录g的地址,以后该pd收到IO事件的时候就不知道到底该唤醒谁了。
上面的实现一个困惑的地方在于:既然g在阻塞前会设置pd.rg为g的地址,那为什么在之前还要多此一举地设置pd.rg为pdWait呢? 猜测:调用gopark是有条件的,参考if判断,如果条件不满足,此时pd.rg状态就是pdWait。那到底什么条件下无需阻塞当前协程呢?
好,至此,我们弄清楚了在协程g读的时候是如何被block的,接下来我们看看g是如何被唤醒的。
唤醒
被block的协程可能因为以下几种情况被唤醒:
- 超时
- IO事件到来
超时唤醒
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
lock(&pd.lock)
// 判断是否是过期定时器
if seq != pd.seq {
unlock(&pd.lock)
return
}
var rg *g
if read {
if pd.rd <= 0 || pd.rt.f == nil {
......
}
pd.rd = -1
atomicstorep(unsafe.Pointer(&pd.rt.f), nil)
rg = netpollunblock(pd, 'r', false)
}
var wg *g
if write {
......
}
unlock(&pd.lock)
if rg != nil {
goready(rg, 0)
}
......
}
可以看到,在超时处理函数中确实调用了netpollunblock()来唤醒等待协程g,且其第三个参数设置为false,表示并非由于IO事件唤醒。
另外,注意,这里唤醒的过程需要在pd.lock保护下执行,因为可能会有其他协程并发唤醒情况发生。
IO事件唤醒
func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
var rg, wg guintptr
if mode == 'r' || mode == 'r'+'w' {
rg.set(netpollunblock(pd, 'r', true))
}
if mode == 'w' || mode == 'r'+'w' {
wg.set(netpollunblock(pd, 'w', true))
}
if rg != 0 {
rg.ptr().schedlink = *gpp
*gpp = rg
}
if wg != 0 {
wg.ptr().schedlink = *gpp
*gpp = wg
}
}
这里同样调用netpollunblock唤醒等待协程g,只是这次调用netpollunblock的参数三变成了true。
唤醒过程
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
// 如果已经调用过netpollunblock
// 这次返回nil
if old == pdReady {
return nil
}
if old == 0 && !ioready {
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// 如果io事件到来了,将pd.rg状态设置为pdReady
// 如果是由于超时被唤醒,pd.rg状态将是0
// 如果casuintptr失败,继续循环
if casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
// 将阻塞在pd.rg上的协程g地址返回
return (*g)(unsafe.Pointer(old))
}
}
}
netpollunblock返回了阻塞在pd上的协程g地址(如果有的话),并且将pd.rg状态设置为pdReady,表示netpollunblock阻塞的协程g已经被唤醒了。再来调用的话是唤醒不到任何协程的。
另外,由于netpollunblock内部使用了循环,且netpollunblock可能会被多协程并发访问,这就导致了pd.rg状态可能有很多(0/pdWait/pdReady等),因此内部做了复杂的状态判断。
总结
上面我们仔细梳理了pd.rg状态机模型,下图整理状态变化的一些过程: