管道读写

Golang中channle是协程间信息交互的主要手段。Golang的channel分为有缓冲和无缓冲两种,关于他们之间的用法区别可以自行google,这里不再赘述。Golang中的channel读写均是同步语义,写满的、读空的channel都会触发协程调度。

向channel写数据

无论是无缓冲还是有缓冲channel,当向channel写数据发现channel已满时,都需要将当前写的协程挂起,并进行一次调度,当前M转而执行P内的其他协程。直到有人再读该channel时发现有阻塞等待写的协程时将其唤醒。

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......
    ......
    // 对应缓冲区大小为0的channel
    if c.dataqsiz == 0 {
        // 如果有接收者在等着
        // 这时候可以写入成功,将数据拷贝至
        // 接收者的缓冲区,唤醒接收者即可
        sg := c.recvq.dequeue()
        if sg != nil { 
            ......
        }
        // 如果尚未有接收者,需要将其block
        // no receiver available: block on this channel.
        gp := getg()
        mysg := acquireSudog()
        mysg.releasetime = 0
        if t0 != 0 {
            mysg.releasetime = -1
        }
        mysg.elem = ep
        mysg.waitlink = nil
        gp.waiting = mysg
        mysg.g = gp
        mysg.selectdone = nil
        gp.param = nil
        c.sendq.enqueue(mysg)
        // 在这里将发送者协程block
        goparkunlock(&c.lock, "chan send")
    }
    // 对于有缓冲区的channel,流程相似
    ......
}

如果尚未有接收者,那就将发送者阻塞,调用了函数goparkunlock

func goparkunlock(lock *mutex, reason string) {
    gopark(unsafe.Pointer(&parkunlock_c), unsafe.Pointer(lock), reason)
}

func gopark(unlockf unsafe.Pointer, lock unsafe.Pointer, reason string) {
    mp := acquirem()
    gp := mp.curg
    status := readgstatus(gp)
    if status != _Grunning && status != _Gscanrunning {
        gothrow("gopark: bad g status")
    }
    mp.waitlock = lock
    mp.waitunlockf = unlockf
    gp.waitreason = reason
    releasem(mp)
    // 最终调用函数park_m
    mcall(park_m)
}

void runtime·park_m(G *gp)
{
    bool ok;

    // 首先将g的status变为Gwaiting
    runtime·casgstatus(gp, Grunning, Gwaiting);
    dropg();

    if(g->m->waitunlockf) {
        ok = g->m->waitunlockf(gp, g->m->waitlock);
        g->m->waitunlockf = nil;
        g->m->waitlock = nil;
        if(!ok) {
            runtime·casgstatus(gp, Gwaiting, Grunnable);
            execute(gp);  // Schedule it back, never returns.
        }
    }
    // 发起一次调度,该m不再执行当前的g,而是选择一个新的g重新执行
    // schedule的具体实现不在这里描述
    schedule();
}

从channel读数据

从channel读数据和向channel写数据的实现几乎一样,只是方向反了而已,在这里不再赘述,各位自己钻研代码吧。

results matching ""

    No results matching ""