附录二

说明

之前在阅读整理golang网络库的时候,遇到了最核心的数据结构pollDesc,一直对该结构中的某些成员的含义不是十分清楚

type pollDesc struct {
    link *pollDesc 
    lock    mutex 
    fd      uintptr
    closing bool
    seq     uintptr 
    rg      uintptr 
    rt      timer   
    rd      int64   
    wg      uintptr 
    wt      timer   
    wd      int64   
    user    uint32 
}

不清楚的主要部分集中在rg和wg这两个成员。这两天花了点时间好好地将它梳理了下,终于明白了实现原理,特记录下来,也让自己可以好好思考下。

引子

我们前面文章在讲述golang网络连接的读写的时候说到:当操作系统的read/write返回EAGAIN时就会阻塞当前协程,直到该socket有可读写事件发生时,操作系统通知golang的runtime,runtime进而唤醒阻塞在该socket的读写协程。而协程阻塞和唤醒逻辑中主要使用了上面描述的pollDesc结构。

阻塞

我们以读过程为例,描述阻塞过程。 在每次Read真正读之前都会调用PrepareRead()来初始化一些变量:

func (pd *pollDesc) Prepare(mode int) error {
    res := runtime_pollReset(pd.runtimeCtx, mode)
    return convertErr(res)
}

func (pd *pollDesc) PrepareRead() error {
    return pd.Prepare('r')
}

func net_runtime_pollReset(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    if mode == 'r' {
        pd.rg = 0
    } else if mode == 'w' {
        pd.wg = 0
    }
    return 0
}

对于read来说,主要是将pd.rg设置为0。相当于初始状态。 接下来read如果返回了EAGAIN错误码,会进入协程阻塞函数:

func net_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

这里主要调用netpollblock来阻塞当前协程g,但是协程g可能有多种原因被唤醒:

  • 有读写事件到来;
  • 出现超时错误
  • 其他

判断是否是正常返回()的根据是netpollblock的返回值,如果返回值为true,表示是有读写事件发生;返回值为false代表可能出现超时或者其他错误

因此实现尚我们利用一个for循环来阻塞,一旦g从netpollblock()中返回且返回值为false,我们检查是否是出现了超时错误,如果是这样,就返回给应用程序该错误,如果是其他错误,我们继续进入netpollblock阻塞当前协程。 如果netpollblock返回值是true,代表了g被io事件唤醒,接下来调用者可以继续读了。

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // 首先将pd.rg设为pdWait
    // 之所以需要使用for循环是因为
    // casuintptr可能会失败
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("netpollblock: double wait")
        }
        // 如果成功,跳出循环
        if casuintptr(gpp, 0, pdWait) {
            break
        }
    }
    // gopark会阻塞当前协程g
    // gopark阻塞g之前,先调用了netpollblockcommit
    // 该函数将pd.rg从pdWait变成g的地址
    // casuintptr((*uintptr)(gpp), pdWait, 
    // uintptr(unsafe.Pointer(gp)))
    if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    }
    // 之所以判断old > pdWait
    // 是因为可能gopark压根就没有阻塞g成功
    // 此时会立即返回至现在逻辑
    // 被唤醒后查看pd.rg,此时应该是pdReady了
    // 如果是被IO事件唤醒的话
    old := xchguintptr(gpp, 0)
    if old > pdWait {
        throw("netpollblock: corrupted state")
    }
    // 如果阻塞的协程g返回且原因是pdReady,返回true
    return old == pdReady
}

这个函数比较关键:当前协程g在这里陷入阻塞,阻塞之前先将pd.rg状态设置为pdWait,表示等待IO输入,这很好理解。

接下来在调用gopark时传入了函数指针netpollblockcommit,这个函数会在真正block前被调用,而该函数又将pd.rg设置为g的地址(具体请参考该函数实现),这个应该也不难理解:如果pd不记录g的地址,以后该pd收到IO事件的时候就不知道到底该唤醒谁了。

上面的实现一个困惑的地方在于:既然g在阻塞前会设置pd.rg为g的地址,那为什么在之前还要多此一举地设置pd.rg为pdWait呢? 猜测:调用gopark是有条件的,参考if判断,如果条件不满足,此时pd.rg状态就是pdWait。那到底什么条件下无需阻塞当前协程呢?

好,至此,我们弄清楚了在协程g读的时候是如何被block的,接下来我们看看g是如何被唤醒的。

唤醒

被block的协程可能因为以下几种情况被唤醒:

  • 超时
  • IO事件到来

超时唤醒

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    lock(&pd.lock)
    // 判断是否是过期定时器
    if seq != pd.seq {
        unlock(&pd.lock)
        return
    }
    var rg *g
    if read {
        if pd.rd <= 0 || pd.rt.f == nil {
            ......
        }
        pd.rd = -1
        atomicstorep(unsafe.Pointer(&pd.rt.f), nil)
        rg = netpollunblock(pd, 'r', false)
    }
    var wg *g
    if write {
       ......
    }
    unlock(&pd.lock)

    if rg != nil {
        goready(rg, 0)
    }
    ......
}

可以看到,在超时处理函数中确实调用了netpollunblock()来唤醒等待协程g,且其第三个参数设置为false,表示并非由于IO事件唤醒。

另外,注意,这里唤醒的过程需要在pd.lock保护下执行,因为可能会有其他协程并发唤醒情况发生。

IO事件唤醒

func netpollready(gpp *guintptr, pd *pollDesc, mode int32) {
    var rg, wg guintptr
    if mode == 'r' || mode == 'r'+'w' {
        rg.set(netpollunblock(pd, 'r', true))
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg.set(netpollunblock(pd, 'w', true))
    }
    if rg != 0 {
        rg.ptr().schedlink = *gpp
        *gpp = rg
    }
    if wg != 0 {
        wg.ptr().schedlink = *gpp
        *gpp = wg
    }
}

这里同样调用netpollunblock唤醒等待协程g,只是这次调用netpollunblock的参数三变成了true。

唤醒过程

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    for {
        old := *gpp
        // 如果已经调用过netpollunblock
        // 这次返回nil
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            return nil
        }
        var new uintptr

        if ioready {
            new = pdReady
        }
        // 如果io事件到来了,将pd.rg状态设置为pdReady
        // 如果是由于超时被唤醒,pd.rg状态将是0
        // 如果casuintptr失败,继续循环
        if casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0
            }
            // 将阻塞在pd.rg上的协程g地址返回
            return (*g)(unsafe.Pointer(old))
        }
    }
}

netpollunblock返回了阻塞在pd上的协程g地址(如果有的话),并且将pd.rg状态设置为pdReady,表示netpollunblock阻塞的协程g已经被唤醒了。再来调用的话是唤醒不到任何协程的。

另外,由于netpollunblock内部使用了循环,且netpollunblock可能会被多协程并发访问,这就导致了pd.rg状态可能有很多(0/pdWait/pdReady等),因此内部做了复杂的状态判断。

总结

上面我们仔细梳理了pd.rg状态机模型,下图整理状态变化的一些过程:

results matching ""

    No results matching ""