Socket超时

说明

我们前面仔细梳理了Golang中的网络实现,还有一个很重要的点没有说到的是超时,因为超时在网络编程中非常重要。我们今天来研究下golang超时的实现机制。

API

一个Connection可以通过下面的接口来实现该连接的读写超时:

SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error

实现

我们以SetReadDeadline()为例研究下超时实现

func (c *conn) SetReadDeadline(t time.Time) error {
    ...
    if err := c.fd.setReadDeadline(t); err != nil {
        return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
    }
    return nil
}

func (fd *netFD) setReadDeadline(t time.Time) error {
    return setDeadlineImpl(fd, t, 'r')
}

func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
    d := runtimeNano() + int64(t.Sub(time.Now()))
    if t.IsZero() {
        d = 0
    }
    if err := fd.incref(); err != nil {
        return err
    }
    runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
    fd.decref()
    return nil
}
/go:linkname net_runtime_pollSetDeadline net.runtime_pollSetDeadline
func net_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
    lock(&pd.lock)
    if pd.closing {
        unlock(&pd.lock)
        return
    }
    // 可以让老定时器无效
    pd.seq++ 
    // 定时器已经存在,删除已经存在的定时器.
    // 重设新的定时器
    if pd.rt.f != nil {
        deltimer(&pd.rt)
        pd.rt.f = nil
    }
    if pd.wt.f != nil {
        deltimer(&pd.wt)
        pd.wt.f = nil
    }
    // Setup new timers.
    if d != 0 && d <= nanotime() {
        d = -1
    }
    if mode == 'r' || mode == 'r'+'w' {
        pd.rd = d
    }
    if mode == 'w' || mode == 'r'+'w' {
        pd.wd = d
    }
    if pd.rd > 0 && pd.rd == pd.wd {
        pd.rt.f = netpollDeadline
        pd.rt.when = pd.rd
        pd.rt.arg = pd
        pd.rt.seq = pd.seq
        addtimer(&pd.rt)
    } else {
        if pd.rd > 0 {
            pd.rt.f = netpollReadDeadline
            pd.rt.when = pd.rd
            pd.rt.arg = pd
            pd.rt.seq = pd.seq
            addtimer(&pd.rt)
        }
        if pd.wd > 0 {
            pd.wt.f = netpollWriteDeadline
            pd.wt.when = pd.wd
            pd.wt.arg = pd
            pd.wt.seq = pd.seq
            addtimer(&pd.wt)
        }
    }
    ...
}

golang对socket的超时实现从上面来看,无非就是将socket绑定读/写定时器,超时时间由用户设置。关键是超时处理函数,我们看到设置的读超时函数为netpollReadDeadline。

func netpollReadDeadline(arg interface{}, seq uintptr) {
    netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
}

func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
    lock(&pd.lock)
    // 判断是否是过期定时器
    if seq != pd.seq {
        unlock(&pd.lock)
        return
    }
    var rg *g
    if read {
        if pd.rd <= 0 || pd.rt.f == nil {
            throw("netpolldeadlineimpl: inconsistent read deadline")
        }
        // pd.rd=-1表示读的deadline已经过了
        // 任何的reader在读之前应该判断deadline
        // 如果deadline过期了,应该返回timeout错误
        // 而且这里会唤醒已经阻塞在读的协程
        // 协程被唤醒后第一件事就是检查是否出现超时错误
        // (即pd.rd == -1)
        pd.rd = -1
        atomicstorep(unsafe.Pointer(&pd.rt.f), nil) 
        rg = netpollunblock(pd, 'r', false)
    }
    var wg *g
    if write {
        if pd.wd <= 0 || pd.wt.f == nil && !read {
            throw("netpolldeadlineimpl: inconsistent write deadline")
        }
        // pd.wd=-1表示写的deadline已经过了
        // 任何的writer在写之前应该判断deadline
        // 如果deadline过期了,应该返回timeout错误
        // 而且这里会唤醒已经阻塞在写的协程
        // 协程被唤醒后第一件事就是检查是否出现超时错误
        // (即pd.wd == -1)
        pd.wd = -1
        atomicstorep(unsafe.Pointer(&pd.wt.f), nil) 
        wg = netpollunblock(pd, 'w', false)
    }
    unlock(&pd.lock)
    // 唤醒等待读的协程
    if rg != nil {
        goready(rg, 0)
    }
    // 唤醒等待写的协程
    if wg != nil {
        goready(wg, 0)
    }
}

上面的超时处理逻辑看起来也非常简单:

将与该timer相关的socket标记为deadline已经到来,对socket的任何一次读写之前都要判断是否超时,我们在后面看到;

调用netpollunblock,这个设置socket当前状态为error

调用goready()唤醒任何等待读/写的协程,这里不再赘述。

我们前面说过了超时是通过定时器实现的,在定时器超时函数被触发时将会设置一个读写deadline已经超过的标记位(pd.rd = -1/pd.wd = -1)。 在定时器超时到达时候,上层应用可能有三种情况:

  • 应用程序调用的Read/Write在超时之后;
  • 超时发生在应用程序调用的Read/Write阻塞后;
  • 定时器发生在Read/Write成功后。

我们接下来按照这三种情况一一分析。

先超时,再Read

每次Read之前都检查是否已经读超时:

func (fd *netFD) Read(p []byte) (n int, err error) {
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    if err := fd.pd.PrepareRead(); err != nil 
    {
        return 0, err
    }
    .....
}

func (pd *pollDesc) PrepareRead() error {
    return pd.Prepare('r')
}

func (pd *pollDesc) Prepare(mode int) error {
    res := runtime_pollReset(pd.runtimeCtx, mode)
    return convertErr(res)
}

func net_runtime_pollReset(pd *pollDesc, mode int) int {
    // reset 之前先检查是否已经超时错误了
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    if mode == 'r' {
        pd.rg = 0
    } else if mode == 'w' {
        pd.wg = 0
    }
    return 0
}

// 这里检查了是否有错,对于已经超时的socket
// 返回错误码2
func netpollcheckerr(pd *pollDesc, mode int32) int {
    if pd.closing {
        return 1 // errClosing
    }
    if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
        return 2 // errTimeout
    }
    return 0
}

通过上面的分析,我们知道,在每次调用Read之前都会重设定时器(其实不算是重设,而是将定时器归零)。但是在归零之前先检查是否已经超时了,如果是,返回错误码2(timeout)。 因此,发生这种情况时我们是能够顺利检测出socket超时的。

Read阻塞后再超时

若满足此情况,我们前面分析了,超时定时器触发的时候会唤醒Read阻塞的协程,我们看看该协程被唤醒的时候做了什么:

func net_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    // As for now only Solaris uses level-triggered IO.
    if GOOS == "solaris" {
        netpollarm(pd, mode)
    }
    // netpollblock阻塞协程
    for !netpollblock(pd, int32(mode), false) {
        // 阻塞协程被唤醒时先检查是否出错
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            println("go routine ", getg(), " is woken up and got error:", err)
            return err
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return 0
}

因为该阻塞协程可能被以下两种事件唤醒:

  • 有数据可读
  • 超时函数中将其唤醒

所以被唤醒后的第一件事就是检测是否由于超时唤醒,如果是,则返回错误码2。

可见,这种情况在golang中也能正确被处理。

Read成功后再超时

这种情况其实是最难分析的。我们前面说过,每次读之前都会调用PrepareRead()。

func (pd *pollDesc) PrepareRead() error {
    return pd.Prepare('r')
}

func (pd *pollDesc) Prepare(mode int) error {
    res := runtime_pollReset(pd.runtimeCtx, mode)
    return convertErr(res)
}

func net_runtime_pollReset(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    // 如果未超时,那么将pd.rg设置为0
    // 这样下次定时器超时的时候就无法唤醒任何人
    if mode == 'r' {
        pd.rg = 0
    } else if mode == 'w' {
        pd.wg = 0
    }
    return 0
}

我们看到golang对这种情况的处理是:读之前将定时器的rg(记录了定时器该唤醒谁)清空。

如果socket超时发生在读成功后,因为在PrepareRead()中已经将pd.rg设置为0,因此无法唤醒任何人了。 但是下次再来读的时候先判断该连接上是否有错误发生,而发生超时的时候会将pd.rd = -1,因此下次再读的时候就会出错?这个好像不合理,

如下图:

如何解决上面这个问题呢?

目前分析代码发现,golang并没有解决这个问题,具体的测试请见我的另一篇关于golang超时的测试博客。

results matching ""

    No results matching ""