Socket超时
说明
我们前面仔细梳理了Golang中的网络实现,还有一个很重要的点没有说到的是超时,因为超时在网络编程中非常重要。我们今天来研究下golang超时的实现机制。
API
一个Connection可以通过下面的接口来实现该连接的读写超时:
SetDeadline(t time.Time) error
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
实现
我们以SetReadDeadline()为例研究下超时实现
func (c *conn) SetReadDeadline(t time.Time) error {
...
if err := c.fd.setReadDeadline(t); err != nil {
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
}
return nil
}
func (fd *netFD) setReadDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, 'r')
}
func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
d := runtimeNano() + int64(t.Sub(time.Now()))
if t.IsZero() {
d = 0
}
if err := fd.incref(); err != nil {
return err
}
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
fd.decref()
return nil
}
/go:linkname net_runtime_pollSetDeadline net.runtime_pollSetDeadline
func net_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
lock(&pd.lock)
if pd.closing {
unlock(&pd.lock)
return
}
// 可以让老定时器无效
pd.seq++
// 定时器已经存在,删除已经存在的定时器.
// 重设新的定时器
if pd.rt.f != nil {
deltimer(&pd.rt)
pd.rt.f = nil
}
if pd.wt.f != nil {
deltimer(&pd.wt)
pd.wt.f = nil
}
// Setup new timers.
if d != 0 && d <= nanotime() {
d = -1
}
if mode == 'r' || mode == 'r'+'w' {
pd.rd = d
}
if mode == 'w' || mode == 'r'+'w' {
pd.wd = d
}
if pd.rd > 0 && pd.rd == pd.wd {
pd.rt.f = netpollDeadline
pd.rt.when = pd.rd
pd.rt.arg = pd
pd.rt.seq = pd.seq
addtimer(&pd.rt)
} else {
if pd.rd > 0 {
pd.rt.f = netpollReadDeadline
pd.rt.when = pd.rd
pd.rt.arg = pd
pd.rt.seq = pd.seq
addtimer(&pd.rt)
}
if pd.wd > 0 {
pd.wt.f = netpollWriteDeadline
pd.wt.when = pd.wd
pd.wt.arg = pd
pd.wt.seq = pd.seq
addtimer(&pd.wt)
}
}
...
}
golang对socket的超时实现从上面来看,无非就是将socket绑定读/写定时器,超时时间由用户设置。关键是超时处理函数,我们看到设置的读超时函数为netpollReadDeadline。
func netpollReadDeadline(arg interface{}, seq uintptr) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
}
func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
lock(&pd.lock)
// 判断是否是过期定时器
if seq != pd.seq {
unlock(&pd.lock)
return
}
var rg *g
if read {
if pd.rd <= 0 || pd.rt.f == nil {
throw("netpolldeadlineimpl: inconsistent read deadline")
}
// pd.rd=-1表示读的deadline已经过了
// 任何的reader在读之前应该判断deadline
// 如果deadline过期了,应该返回timeout错误
// 而且这里会唤醒已经阻塞在读的协程
// 协程被唤醒后第一件事就是检查是否出现超时错误
// (即pd.rd == -1)
pd.rd = -1
atomicstorep(unsafe.Pointer(&pd.rt.f), nil)
rg = netpollunblock(pd, 'r', false)
}
var wg *g
if write {
if pd.wd <= 0 || pd.wt.f == nil && !read {
throw("netpolldeadlineimpl: inconsistent write deadline")
}
// pd.wd=-1表示写的deadline已经过了
// 任何的writer在写之前应该判断deadline
// 如果deadline过期了,应该返回timeout错误
// 而且这里会唤醒已经阻塞在写的协程
// 协程被唤醒后第一件事就是检查是否出现超时错误
// (即pd.wd == -1)
pd.wd = -1
atomicstorep(unsafe.Pointer(&pd.wt.f), nil)
wg = netpollunblock(pd, 'w', false)
}
unlock(&pd.lock)
// 唤醒等待读的协程
if rg != nil {
goready(rg, 0)
}
// 唤醒等待写的协程
if wg != nil {
goready(wg, 0)
}
}
上面的超时处理逻辑看起来也非常简单:
将与该timer相关的socket标记为deadline已经到来,对socket的任何一次读写之前都要判断是否超时,我们在后面看到;
调用netpollunblock,这个设置socket当前状态为error
调用goready()唤醒任何等待读/写的协程,这里不再赘述。
我们前面说过了超时是通过定时器实现的,在定时器超时函数被触发时将会设置一个读写deadline已经超过的标记位(pd.rd = -1/pd.wd = -1)。 在定时器超时到达时候,上层应用可能有三种情况:
- 应用程序调用的Read/Write在超时之后;
- 超时发生在应用程序调用的Read/Write阻塞后;
- 定时器发生在Read/Write成功后。
我们接下来按照这三种情况一一分析。
先超时,再Read
每次Read之前都检查是否已经读超时:
func (fd *netFD) Read(p []byte) (n int, err error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil
{
return 0, err
}
.....
}
func (pd *pollDesc) PrepareRead() error {
return pd.Prepare('r')
}
func (pd *pollDesc) Prepare(mode int) error {
res := runtime_pollReset(pd.runtimeCtx, mode)
return convertErr(res)
}
func net_runtime_pollReset(pd *pollDesc, mode int) int {
// reset 之前先检查是否已经超时错误了
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
if mode == 'r' {
pd.rg = 0
} else if mode == 'w' {
pd.wg = 0
}
return 0
}
// 这里检查了是否有错,对于已经超时的socket
// 返回错误码2
func netpollcheckerr(pd *pollDesc, mode int32) int {
if pd.closing {
return 1 // errClosing
}
if (mode == 'r' && pd.rd < 0) || (mode == 'w' && pd.wd < 0) {
return 2 // errTimeout
}
return 0
}
通过上面的分析,我们知道,在每次调用Read之前都会重设定时器(其实不算是重设,而是将定时器归零)。但是在归零之前先检查是否已经超时了,如果是,返回错误码2(timeout)。 因此,发生这种情况时我们是能够顺利检测出socket超时的。
Read阻塞后再超时
若满足此情况,我们前面分析了,超时定时器触发的时候会唤醒Read阻塞的协程,我们看看该协程被唤醒的时候做了什么:
func net_runtime_pollWait(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// As for now only Solaris uses level-triggered IO.
if GOOS == "solaris" {
netpollarm(pd, mode)
}
// netpollblock阻塞协程
for !netpollblock(pd, int32(mode), false) {
// 阻塞协程被唤醒时先检查是否出错
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
println("go routine ", getg(), " is woken up and got error:", err)
return err
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return 0
}
因为该阻塞协程可能被以下两种事件唤醒:
- 有数据可读
- 超时函数中将其唤醒
所以被唤醒后的第一件事就是检测是否由于超时唤醒,如果是,则返回错误码2。
可见,这种情况在golang中也能正确被处理。
Read成功后再超时
这种情况其实是最难分析的。我们前面说过,每次读之前都会调用PrepareRead()。
func (pd *pollDesc) PrepareRead() error {
return pd.Prepare('r')
}
func (pd *pollDesc) Prepare(mode int) error {
res := runtime_pollReset(pd.runtimeCtx, mode)
return convertErr(res)
}
func net_runtime_pollReset(pd *pollDesc, mode int) int {
err := netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
// 如果未超时,那么将pd.rg设置为0
// 这样下次定时器超时的时候就无法唤醒任何人
if mode == 'r' {
pd.rg = 0
} else if mode == 'w' {
pd.wg = 0
}
return 0
}
我们看到golang对这种情况的处理是:读之前将定时器的rg(记录了定时器该唤醒谁)清空。
如果socket超时发生在读成功后,因为在PrepareRead()中已经将pd.rg设置为0,因此无法唤醒任何人了。 但是下次再来读的时候先判断该连接上是否有错误发生,而发生超时的时候会将pd.rd = -1,因此下次再读的时候就会出错?这个好像不合理,
如下图:
如何解决上面这个问题呢?
目前分析代码发现,golang并没有解决这个问题,具体的测试请见我的另一篇关于golang超时的测试博客。