内部实现

这一章节我们将详细描述网络关键API的实现,主要包括Listen、Accept、Read、Write等。 另外,为了突出关键流程,我们选择忽略所有的错误。这样可以使得代码看起来更为简单。 而且我们只关注tcp协议实现,udp和unix socket不是我们关心的。

Listen()

func Listen(net, laddr string) (Listener, error) {
    la, err := resolveAddr("listen", net, laddr, noDeadline)
    ......
    switch la := la.toAddr().(type) {
    case *TCPAddr:
        l, err = ListenTCP(net, la)
    case *UnixAddr:
        ......
    }
   ......
}

// 对于tcp协议,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
    ......
    fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
    ......
    return &TCPListener{fd}, nil
}

func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
    ......
    return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}

func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
    // 创建底层socket,设置属性为O_NONBLOCK
    s, err := sysSocket(family, sotype, proto)
    ......
    setDefaultSockopts(s, family, sotype, ipv6only)
    // 创建新netFD结构
    fd, err = newFD(s, family, sotype, net)
    ......
    if laddr != nil && raddr == nil {
        switch sotype {
        case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
            // 调用底层listen监听创建的套接字
            fd.listenStream(laddr, listenerBacklog)
            return fd, nil
        case syscall.SOCK_DGRAM:
            ......
        }
    }   
}

// 最终调用该函数来创建一个socket
// 并且将socket属性设置为O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
    syscall.ForkLock.RLock()
    s, err := syscall.Socket(family, sotype, proto)
    if err == nil {
        syscall.CloseOnExec(s)
    }
    syscall.ForkLock.RUnlock()
    if err != nil {
        return -1, err
    }
    if err = syscall.SetNonblock(s, true); err != nil {
        syscall.Close(s)
        return -1, err
    }
    return s, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
    if err := setDefaultListenerSockopts(fd.sysfd)
    if lsa, err := laddr.sockaddr(fd.family); err != nil {
        return err
    } else if lsa != nil {
        // Bind绑定至该socket
        if err := syscall.Bind(fd.sysfd, lsa); err != nil {
            return os.NewSyscallError("bind", err)
        }
    }
    // 监听该socket
    if err := syscall.Listen(fd.sysfd, backlog); 
    // 这里非常关键:初始化socket与异步IO相关的内容
    if err := fd.init(); err != nil {
        return err
    }
    lsa, _ := syscall.Getsockname(fd.sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

我们这里看到了如何实现Listen。流程基本都很简单,但是因为我们使用了异步编程,因此,我们在Listen完该s ocket后,还必须将其添加到监听队列中,以后该socket有事件到来时能够及时通知到。

对linux有所了解的应该都知道epoll,没错golang使用的就是epoll机制来实现socket事件通知。那我们看对一个 监听socket,是如何将其添加到epoll的监听队列中呢?

func (fd *netFD) init() error {
    if err := fd.pd.Init(fd); err != nil {
        return err
    }
    return nil
}

func (pd *pollDesc) Init(fd *netFD) error {
    // 利用了Once机制,保证一个进程只会执行一次
    // runtime_pollServerInit: 
    // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
    // JMP runtime·netpollServerInit(SB)
    serverInit.Do(runtime_pollServerInit)
    // runtime_pollOpen:
    // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
    // JMP runtime·netpollOpen(SB)
    ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
    if errno != 0 {
        return syscall.Errno(errno)
    }
    pd.runtimeCtx = ctx
    return nil
}

这里就是socket异步编程的关键:

  • netpollServerInit()初始化异步编程结构,对于epoll,该函数是netpollinit,且使用Once机制保证一个进程 只会初始化一次;

    func netpollinit() {
      epfd = epollcreate1(_EPOLL_CLOEXEC)
      if epfd >= 0 {
          return
      }
      epfd = epollcreate(1024)
      if epfd >= 0 {
          closeonexec(epfd)
          return
      }
      ......
    }
    
  • netpollOpen则在socket被创建出来后将其添加到epoll队列中,对于epoll,该函数被实例化为netpollopen。

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

OK,看到这里,我们也就明白了,监听一个套接字的时候无非就是传统的socket异步编程,然后将该socket添加到 epoll的事件监听队列中。

Accept()

既然我们描述的重点的tcp协议,因此,我们看看TCPListener的Accept方法是怎么实现的:

func (l *TCPListener) Accept() (Conn, error) {
    c, err := l.AcceptTCP()
    ......
}

func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
    ......
    fd, err := l.fd.accept()
    ......
    // 返回给调用者一个新的TCPConn
    return newTCPConn(fd), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 为什么对该函数加读锁?
    if err := fd.readLock(); err != nil {
        return nil, err
    }
    defer fd.readUnlock()
    ......
    for {
        // 这个accept是golang包装的系统调用
        // 用来处理跨平台
        s, rsa, err = accept(fd.sysfd)
        if err != nil {
            if err == syscall.EAGAIN {
                // 如果没有可用连接,WaitRead()阻塞该协程
                // 后面会详细分析WaitRead.
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            } else if err == syscall.ECONNABORTED {
                // 如果连接在Listen queue时就已经被对端关闭
                continue
            }
        }
        break
    }

    netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
    ......
    // 这个前面已经分析,将该fd添加到epoll队列中
    err = netfd.init()
    ......
    lsa, _ := syscall.Getsockname(netfd.sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

OK,从前面的编程事例中我们知道,一般在主协程中会accept新的connection,使用异步编程我们知道,如果没有 新连接到来,该协程会一直被阻塞,直到新连接到来有人唤醒了该协程。

一般在主协程中调用accept,如果返回值为EAGAIN,则调用WaitRead来阻塞当前协程,后续在该socket有事件到来 时被唤醒,WaitRead以及唤醒过程我们会在后面仔细分析。

Read

func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Read(b)
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    // 为什么对函数调用加读锁
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    // 这个又是干嘛?
    if err := fd.pd.PrepareRead(); err != nil {
        return 0, &OpError{"read", fd.net, fd.raddr, err}
    }
    for {
        n, err = syscall.Read(int(fd.sysfd), p)
        if err != nil {
            n = 0
            // 如果返回EAGIN,阻塞当前协程直到有数据可读被唤醒
            if err == syscall.EAGAIN {
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            }
        }
        // 检查错误,封装io.EOF
        err = chkReadErr(n, err, fd)
        break
    }
    if err != nil && err != io.EOF {
        err = &OpError{"read", fd.net, fd.raddr, err}
    }
    return
}

func chkReadErr(n int, err error, fd *netFD) error {
    if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
        return io.EOF
    }
    return err
}

Read的流程与Accept流程极其一致,阅读起来也很简单。相信不用作过多解释,自己看吧。 需要注意的是每次Read不能保证可以读到想读的那么多内容,比如缓冲区大小是10,而实际可能只读到5,应用程 序需要能够处理这种情况。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) {
    // 为什么这里加写锁
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    // 这个是干什么?
    if err := fd.pd.PrepareWrite(); err != nil {
        return 0, &OpError{"write", fd.net, fd.raddr, err}
    }
    // nn记录总共写入的数据量,每次Write可能只能写入部分数据
    for {
        var n int
        n, err = syscall.Write(int(fd.sysfd), p[nn:])
        if n > 0 {
            nn += n
        }
        // 如果数组数据已经全部写完,函数返回
        if nn == len(p) {
            break
        }
        // 如果写入数据时被block了,阻塞当前协程
        if err == syscall.EAGAIN {
            if err = fd.pd.WaitWrite(); err == nil {
                continue
            }
        }
        if err != nil {
            n = 0
            break
        }
        // 如果返回值为0,代表了什么?
        if n == 0 {
            err = io.ErrUnexpectedEOF
            break
        }
    }
    if err != nil {
        err = &OpError{"write", fd.net, fd.raddr, err}
    }
    return nn, err
}

注意Write语义与Read不一样的地方:

Write尽量将用户缓冲区的内容全部写入至底层socket,如果遇到socket暂时不可写入,会阻塞当前协程; Read在某次读取成功时立即返回,可能会导致读取的数据量少于用户缓冲区的大小; 为什么会在实现上有此不同,我想可能read的优先级比较高吧,应用程序可能一直在等着,我们不能等到数据一直 读完才返回,会阻塞用户。 而写不一样,优先级相对较低,而且用户一般也不着急写立即返回,所以可以将所有的数据全部写入,而且这样 也能简化应用程序的写法。

总结

上面我们基本说完了golang网络编程内的关键API流程,我们遗留了一个关键内容:当系统调用返回EAGAIN时,会 调用WaitRead/WaitWrite来阻塞当前协程,我会在接下来的章节中继续分析。

results matching ""

    No results matching ""