抢占式调度

虽然我们一直强调golang调度器是非抢占式。非抢占式的一个最大坏处是无法保证公平性,如果一个g处于死循环状态,那么其他协程可能就会被饿死。 所幸的是,Golang在1.4版本中加入了抢占式调度的逻辑,抢占式调度必然可能在g执行的某个时刻被剥夺cpu,让给其他协程。

还记得我们之前说过Golang的sysmon协程么,该协程会定期唤醒作系统状态检查,我们前面说过了它如何检查处于Psyscall状态的p,以便让处于系统调用状态的P可以被继续执行,不至于饿死。 除了检查这个意外,sysmon还检查处于Prunning状态的P,检查它的目的就是避免这里的某个g占用了过多的cpu时间,并在某个时刻剥夺其cpu运行时间。

static uint32
retake(int64 now)
{
    uint32 i, s, n;
    int64 t;
    P *p;
    Pdesc *pd;

    n = 0;
    for(i = 0; i < runtime·gomaxprocs; i++) {
        p = runtime·allp[i];
        if(p==nil)
            continue;
        pd = &pdesc[i];
        s = p->status;
        if(s == Psyscall) {
            ......
        } else if(s == Prunning) {
            // Preempt G if it's running for more than 10ms.
            t = p->schedtick;
            if(pd->schedtick != t) {
                pd->schedtick = t;
                pd->schedwhen = now;
                continue;
            }
            if(pd->schedwhen + 10*1000*1000 > now)
                continue;
            // 如果自从上次发生调度时间已经超过了10ms
            preemptone(p);
        }
    }
    return n;
}

// 这里的抢占只是将g的preempt设置为true
// 只有在g进行函数调用时才会检查该标志位
// 并进而可能发生调度,非常弱
static bool
preemptone(P *p)
{
    M *mp;
    G *gp;

    mp = p->m;
    if(mp == nil || mp == g->m)
        return false;
    gp = mp->curg;
    if(gp == nil || gp == mp->g0)
        return false;
    gp->preempt = true;
    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    gp->stackguard0 = StackPreempt;
    return true;
}

之前我们说过在函数调用时会进行堆栈检测,现在将gp->stackGuard0设置为StackPreempt(-1314,非常小的值),肯定会调用一次runtime.morestack,逻辑如下:

TEXT runtime·morestack(SB),NOSPLIT,$0-0
    // Cannot grow scheduler stack (m->g0).
    get_tls(CX)
    MOVQ    g(CX), BX
    MOVQ    g_m(BX), BX
    MOVQ    m_g0(BX), SI
    CMPQ    g(CX), SI
    JNE 2(PC)
    INT $3

    // Cannot grow signal stack (m->gsignal).
    MOVQ    m_gsignal(BX), SI
    CMPQ    g(CX), SI
    JNE 2(PC)
    INT $3

    // Called from f.
    // Set m->morebuf to f's caller.
    MOVQ    8(SP), AX   // f's caller's PC
    MOVQ    AX, (m_morebuf+gobuf_pc)(BX)
    LEAQ    16(SP), AX  // f's caller's SP
    MOVQ    AX, (m_morebuf+gobuf_sp)(BX)
    get_tls(CX)
    MOVQ    g(CX), SI
    MOVQ    SI, (m_morebuf+gobuf_g)(BX)

    // Set g->sched to context in f.
    MOVQ    0(SP), AX // f's PC
    MOVQ    AX, (g_sched+gobuf_pc)(SI)
    MOVQ    SI, (g_sched+gobuf_g)(SI)
    LEAQ    8(SP), AX // f's SP
    MOVQ    AX, (g_sched+gobuf_sp)(SI)
    MOVQ    DX, (g_sched+gobuf_ctxt)(SI)
    MOVQ    BP, (g_sched+gobuf_bp)(SI)

    // Call newstack on m->g0's stack.
    MOVQ    m_g0(BX), BX
    MOVQ    BX, g(CX)
    MOVQ    (g_sched+gobuf_sp)(BX), SP
    CALL    runtime·newstack(SB)
    MOVQ    $0, 0x1003  // crash if newstack returns
    RET

最终调用newstack来进行堆栈扩容:

func newstack() {
    thisg := getg()
    // TODO: double check all gp. shouldn't be getg().
    if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {
        throw("stack growth after fork")
    }
    if thisg.m.morebuf.g.ptr() != thisg.m.curg {
        print("runtime: newstack called from g=", thisg.m.morebuf.g, "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n")
        morebuf := thisg.m.morebuf
        traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())
        throw("runtime: wrong goroutine in newstack")
    }

    gp := thisg.m.curg
    morebuf := thisg.m.morebuf
    thisg.m.morebuf.pc = 0
    thisg.m.morebuf.lr = 0
    thisg.m.morebuf.sp = 0
    thisg.m.morebuf.g = 0
    rewindmorestack(&gp.sched)

    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below.
    preempt := atomicloaduintptr(&gp.stackguard0) == stackPreempt
    if preempt {
        if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }
    }

    ......
    // 进行重新调度
    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }
        if gp.preemptscan {
            for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
                // Likely to be racing with the GC as
                // it sees a _Gwaiting and does the
                // stack scan. If so, gcworkdone will
                // be set and gcphasework will simply
                // return.
            }
            if !gp.gcscandone {
                scanstack(gp)
                gp.gcscandone = true
            }
            gp.preemptscan = false
            gp.preempt = false
            casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
            casgstatus(gp, _Gwaiting, _Grunning)
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }

        // Act like goroutine called runtime.Gosched.
        casgstatus(gp, _Gwaiting, _Grunning)
        // 放弃当前协程,调度新协程执行
        gopreempt_m(gp) // never return
    }
}

这里需要注意两个东西:

thisg := getg():这个代表当前执行newstack()函数的堆栈,也是当前线程的g0的stack;

gp := thisg.m.curg:这个代表的是申请栈扩容的协程,与上面的thisg不是一个东西。

因为虽然调用了newstack,但是对于stackguard0==stackPreempt的协程来说,它的目的压根不是堆栈扩容,而是发起一次调度,所以直接进入了gopreempt_m,这里将当前协程挂起,并发起一次schedule().

results matching ""

    No results matching ""