您的位置:首頁(yè) > 軟件教程 > 教程 > Go runtime 調(diào)度器精講(五):調(diào)度策略

Go runtime 調(diào)度器精講(五):調(diào)度策略

來(lái)源:好特整理 | 時(shí)間:2024-09-15 09:57:02 | 閱讀:175 |  標(biāo)簽: T GO TIM 策略   | 分享到:

原創(chuàng)文章,歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明出處,謝謝。 0. 前言 在 第四講 我們介紹了 main goroutine 是如何運(yùn)行的。其中針對(duì) main goroutine 介紹了調(diào)度函數(shù) schedule 是怎么工作的,對(duì)于整個(gè)調(diào)度器的調(diào)度策略并沒(méi)有介紹,這點(diǎn)是不完整的,這一講會(huì)完善調(diào)度器的調(diào)度策略部分。

原創(chuàng)文章,歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明出處,謝謝。


0. 前言

在 第四講 我們介紹了 main goroutine 是如何運(yùn)行的。其中針對(duì) main goroutine 介紹了調(diào)度函數(shù) schedule 是怎么工作的,對(duì)于整個(gè)調(diào)度器的調(diào)度策略并沒(méi)有介紹,這點(diǎn)是不完整的,這一講會(huì)完善調(diào)度器的調(diào)度策略部分。

1. 調(diào)度時(shí)間點(diǎn)

runtime.schedule 實(shí)現(xiàn)了調(diào)度器的調(diào)度策略。那么對(duì)于調(diào)度時(shí)間點(diǎn),查看哪些函數(shù)調(diào)用的 runtime.schedule 即可順藤摸瓜理出調(diào)度器的調(diào)度時(shí)間點(diǎn),如下圖:

Go runtime 調(diào)度器精講(五):調(diào)度策略

調(diào)度時(shí)間點(diǎn)不是本講的重點(diǎn),這里有興趣的同學(xué)可以順藤摸瓜,摸摸觸發(fā)調(diào)度時(shí)間點(diǎn)的路徑,這里就跳過(guò)了。

2. 調(diào)度策略

調(diào)度策略才是我們的重點(diǎn),進(jìn)到 runtime.schedule

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    mp := getg().m                  // 獲取當(dāng)前執(zhí)行線(xiàn)程

top:
	pp := mp.p.ptr()                // 獲取執(zhí)行線(xiàn)程綁定的 P
	pp.preempt = false

    // Safety check: if we are spinning, the run queue should be empty.
	// Check this before calling checkTimers, as that might call
	// goready to put a ready goroutine on the local run queue.
    if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}

    gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available

    ...
    execute(gp, inheritTime)        // 執(zhí)行找到的 goroutine
}

runtime.schedule 的重點(diǎn)在 findRunnable() 。 findRunnable() 函數(shù)很長(zhǎng),為避免影響可讀性,這里對(duì)大部分流程做了注釋?zhuān)竺嬖谟兄攸c(diǎn)的加以介紹。進(jìn)入 findRunnable()

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
// tryWakeP indicates that the returned goroutine is not normal (GC worker, trace
// reader) so the caller should try to wake a P.
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
	mp := getg().m                                      // 獲取當(dāng)前執(zhí)行線(xiàn)程

top:
	pp := mp.p.ptr()                                    // 獲取線(xiàn)程綁定的 P
	...
	
    // Check the global runnable queue once in a while to ensure fairness.
	// Otherwise two goroutines can completely occupy the local runqueue
	// by constantly respawning each other.
	if pp.schedtick%61 == 0 && sched.runqsize > 0 {
		lock(&sched.lock)
		gp := globrunqget(pp, 1)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false, false
		}
	}

    // local runq
	if gp, inheritTime := runqget(pp); gp != nil {      // 從 P 的本地隊(duì)列中獲取 goroutine
		return gp, inheritTime, false
	}

    // global runq
	if sched.runqsize != 0 {                            // 如果本地隊(duì)列獲取不到就判斷全局隊(duì)列中有無(wú) goroutine
		lock(&sched.lock)                               // 如果有的話(huà),為全局變量加鎖
		gp := globrunqget(pp, 0)                        // 從全局隊(duì)列中拿 goroutine
		unlock(&sched.lock)                             // 為全局變量解鎖
		if gp != nil {
			return gp, false, false
		}
	}

    // 如果全局隊(duì)列中沒(méi)有 goroutine 則從 network poller 中取 goroutine
    if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
		...
	}

    // 如果 network poller 中也沒(méi)有 goroutine,那么嘗試從其它 P 中偷 goroutine
    // Spinning Ms: steal work from other Ps.
	//
	// Limit the number of spinning Ms to half the number of busy Ps.
	// This is necessary to prevent excessive CPU consumption when
	// GOMAXPROCS>>1 but the program parallelism is low.
    // 如果下面兩個(gè)條件至少有一個(gè)滿(mǎn)足,則進(jìn)入偷 goroutine 邏輯
    // 條件 1: 當(dāng)前線(xiàn)程是 spinning 自旋狀態(tài)
    // 條件 2: 當(dāng)前活躍的 P 要遠(yuǎn)大于自旋的線(xiàn)程,說(shuō)明需要線(xiàn)程去分擔(dān)活躍線(xiàn)程的壓力,不要睡覺(jué)了
	if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
        if !mp.spinning {                                       // 因?yàn)槭莾蓚(gè)條件至少滿(mǎn)足一個(gè)即可,這里首先判斷當(dāng)前線(xiàn)程是不是自旋狀態(tài)
			mp.becomeSpinning()                                 // 如果不是,更新線(xiàn)程的狀態(tài)為自旋狀態(tài)
		}

        gp, inheritTime, tnow, w, newWork := stealWork(now)     // 偷 goroutine
		if gp != nil {
			// Successfully stole.
			return gp, inheritTime, false                       // 如果 gp 不等于 nil,表示偷到了,返回偷到的 goroutine
		}
		if newWork {                
			// There may be new timer or GC work; restart to
			// discover.
			goto top                                            // 如果 gp 不等于 nil,且 network 為 true,則跳到 top 標(biāo)簽重新找 goroutine
		}

		now = tnow
		if w != 0 && (pollUntil == 0 || w < pollUntil) {
			// Earlier timer to wait for.
			pollUntil = w
		}
	}

    ...
    if sched.runqsize != 0 {                                    // 偷都沒(méi)偷到,還要在找一遍全局隊(duì)列,防止偷的過(guò)程中,全局隊(duì)列又有 goroutine 了
		gp := globrunqget(pp, 0)
		unlock(&sched.lock)
		return gp, false, false
	}

    if !mp.spinning && sched.needspinning.Load() == 1 {         // 在判斷一遍,如果 mp 不是自旋狀態(tài),且 sched.needspinning == 1 則更新 mp 為自旋,調(diào)用 top 重新找一遍 goroutine
		// See "Delicate dance" comment below.
		mp.becomeSpinning()
		unlock(&sched.lock)
		goto top
	}

    // 實(shí)在找不到 goroutine,表明當(dāng)前線(xiàn)程多, goroutine 少,準(zhǔn)備掛起線(xiàn)程
    // 首先,調(diào)用 releasep 取消線(xiàn)程和 P 的綁定
    if releasep() != pp {                                       
		throw("findrunnable: wrong p")
	}

    ...
    now = pidleput(pp, now)                                     // 將解綁的 P 放到全局空閑隊(duì)列中
    unlock(&sched.lock)

    wasSpinning := mp.spinning                                  // 到這里 mp.spinning == true,線(xiàn)程處于自旋狀態(tài)
	if mp.spinning {
		mp.spinning = false                                     // 設(shè)置 mp.spinning = false,這是要準(zhǔn)備休眠了
		if sched.nmspinning.Add(-1) < 0 {                       // 將全局變量的自旋線(xiàn)程數(shù)減 1,因?yàn)楫?dāng)前線(xiàn)程準(zhǔn)備休眠,不偷 goroutine 了
			throw("findrunnable: negative nmspinning")
		}
        ...
    }
    stopm()                                                     // 線(xiàn)程休眠,直到喚醒
	goto top                                                    // 能執(zhí)行到這里,說(shuō)明線(xiàn)程已經(jīng)被喚醒了,繼續(xù)找一遍 goroutine
}

看完線(xiàn)程的調(diào)度策略我都要被感動(dòng)到了,何其的敬業(yè),窮盡一切方式去找活干,找不到活,休眠之前還要在找一遍,真的是勞模啊。

大致流程是比較清楚的,我們把其中一些值得深挖的部分在單拎出來(lái)。

首先,從本地隊(duì)列中找 goroutine,如果找不到則進(jìn)入全局隊(duì)列找,這里如果看 gp := globrunqget(pp, 0) 可能會(huì)覺(jué)得疑惑,從全局隊(duì)列中拿 goroutine 為什么要把 P 傳進(jìn)去,我們看這個(gè)函數(shù)在做什么:

// Try get a batch of G's from the global runnable queue.
// sched.lock must be held.											// 注釋說(shuō)的挺清晰了,把全局隊(duì)列的 goroutine 放到 P 的本地隊(duì)列
func globrunqget(pp *p, max int32) *g {
	assertLockHeld(&sched.lock)										

	if sched.runqsize == 0 {
		return nil
	}

	n := sched.runqsize/gomaxprocs + 1								// 全局隊(duì)列是線(xiàn)程共享的,這里要除 gomaxprocs 平攤到每個(gè)線(xiàn)程綁定的 P
	if n > sched.runqsize {
		n = sched.runqsize											// 執(zhí)行到這里,說(shuō)明 gomaxprocs == 1
	}
	if max > 0 && n > max {
		n = max
	}
	if n > int32(len(pp.runq))/2 {									
		n = int32(len(pp.runq)) / 2									// 如果 n 比本地隊(duì)列長(zhǎng)度的一半要長(zhǎng),則 n == len(P.runq)/2
	}

	sched.runqsize -= n												// 全局隊(duì)列長(zhǎng)度減 n,準(zhǔn)備從全局隊(duì)列中拿 n 個(gè) goroutine 到 P 中

	gp := sched.runq.pop()											// 把全局隊(duì)列隊(duì)頭的 goroutine 拿出來(lái),這個(gè) goroutine 是要返回的 goroutine
	n--																// 拿出了一個(gè)隊(duì)頭的 goroutine,這里 n 要減 1
	for ; n > 0; n-- {				
		gp1 := sched.runq.pop()										// 循環(huán)拿全局隊(duì)列中的 goroutine 出來(lái)
		runqput(pp, gp1, false)										// 將拿出的 goroutine 放到全局隊(duì)列中
	}
	return gp
}

調(diào)用 globrunqget 說(shuō)明本地隊(duì)列沒(méi)有 goroutine 要從全局隊(duì)列拿,那么就可以把全局隊(duì)列中的 goroutine 放到 P 中,提高了全局隊(duì)列 goroutine 的優(yōu)先級(jí)。

如果全局隊(duì)列也沒(méi)找到 goroutine,在從 network poller 找,如果 network poller 也沒(méi)找到,則準(zhǔn)備進(jìn)入自旋,從別的線(xiàn)程的 P 那里偷活干。我們看線(xiàn)程是怎么偷活的:

// stealWork attempts to steal a runnable goroutine or timer from any P.
//
// If newWork is true, new work may have been readied.
//
// If now is not 0 it is the current time. stealWork returns the passed time or
// the current time if now was passed as 0.
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
	pp := getg().m.p.ptr()																// pp 是當(dāng)前線(xiàn)程綁定的 P

	ranTimer := false

	const stealTries = 4																// 線(xiàn)程偷四次,每次都要隨機(jī)循環(huán)一遍所有 P
	for i := 0; i < stealTries; i++ {
		stealTimersOrRunNextG := i == stealTries-1

		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {			// 為保證偷的隨機(jī)性,隨機(jī)開(kāi)始偷 P。隨機(jī)開(kāi)始,后面每個(gè) P 都可以輪到
			...
			p2 := allp[enum.position()]													// 從 allp 中獲取 P
			if pp == p2 {
				continue																// 如果獲取的是當(dāng)前線(xiàn)程綁定的 P,則繼續(xù)循環(huán)下一個(gè) P
			}
			...
			// Don't bother to attempt to steal if p2 is idle.
			if !idlepMask.read(enum.position()) {										// 判斷拿到的 P 是不是 idle 狀態(tài),如果是,表明 P 還沒(méi)有 goroutine,跳過(guò)它,偷下一家
				if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {			// P 不是 idle,調(diào)用 runqsteal 偷它!
					return gp, false, now, pollUntil, ranTimer
				}
			}
		}
	}

	// No goroutines found to steal. Regardless, running a timer may have
	// made some goroutine ready that we missed. Indicate the next timer to
	// wait for.
	return nil, false, now, pollUntil, ranTimer
}

線(xiàn)程隨機(jī)的偷一個(gè)可偷的 P,偷 P 的實(shí)現(xiàn)在 runqsteal ,查看 runqsteal 怎么偷的:

// Steal half of elements from local runnable queue of p2
// and put onto local runnable queue of p.
// Returns one of the stolen elements (or nil if failed).						// 給寶寶餓壞了,直接偷一半的 goroutine 啊,夠狠的!
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
	t := pp.runqtail															// t 指向當(dāng)前 P 本地隊(duì)列的隊(duì)尾
	n := runqgrab(p2, &pp.runq, t, stealRunNextG)								// runqgrab 把 P2 本地隊(duì)列的一半 goroutine 拿到 P 的 runq 隊(duì)列中
	if n == 0 {
		return nil
	}
	n--
	gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()								// 把偷到的本地隊(duì)列隊(duì)尾的 goroutine 拿出來(lái)
	if n == 0 {
		return gp																// 如果只偷到了這一個(gè),則直接返回。有總比沒(méi)有好
	}
	h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with consumers
	if t-h+n >= uint32(len(pp.runq)) {
		throw("runqsteal: runq overflow")										// 如果 t-h+n >= len(p.runq) 表示偷多了...
	}
	atomic.StoreRel(&pp.runqtail, t+n) 											// 更新 P 的本地隊(duì)列的隊(duì)尾
	return gp
}

這個(gè)偷就是把“地主家”(P2)的余糧 (goroutine) 給它搶一半過(guò)來(lái),沒(méi)辦法我也要吃飯啊。

如果連偷都沒(méi)偷到(好吧,太慘了點(diǎn)...),那就準(zhǔn)備休眠了,不干活了還不行嘛。不干活之前在去看看全局隊(duì)列有沒(méi)有 goroutine 了(口是心非的 M 人)。還是沒(méi)活,好吧,準(zhǔn)備休眠了。

準(zhǔn)備休眠,首先解除和 P 的綁定:

func releasep() *p {
	gp := getg()

	if gp.m.p == 0 {
		throw("releasep: invalid arg")
	}
	pp := gp.m.p.ptr()
	if pp.m.ptr() != gp.m || pp.status != _Prunning {
		print("releasep: m=", gp.m, " m->p=", gp.m.p.ptr(), " p->m=", hex(pp.m), " p->status=", pp.status, "\n")
		throw("releasep: invalid p state")
	}
	...
	gp.m.p = 0
	pp.m = 0
	pp.status = _Pidle
	return pp
}

就是指針的解綁操作,代碼很清晰,連注釋都不用,我們也不講了。

解綁之后, pidleput 把空閑的 P 放到全局空閑隊(duì)列中。

接著,更新線(xiàn)程的狀態(tài),從自旋更新為非自旋,調(diào)用 stopm 準(zhǔn)備休眠:

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
	gp := getg()							// 當(dāng)前線(xiàn)程執(zhí)行的 goroutine

	...

	lock(&sched.lock)
	mput(gp.m)								// 將線(xiàn)程放到全局空閑線(xiàn)程隊(duì)列中
	unlock(&sched.lock)
	mPark()
	acquirep(gp.m.nextp.ptr())
	gp.m.nextp = 0
}

stopm 將線(xiàn)程放到全局空閑線(xiàn)程隊(duì)列,接著調(diào)用 mPark 休眠線(xiàn)程:

// mPark causes a thread to park itself, returning once woken.
//
//go:nosplit
func mPark() {
	gp := getg()
	notesleep(&gp.m.park)					// notesleep 線(xiàn)程休眠
	noteclear(&gp.m.park)
}

func notesleep(n *note) {
	gp := getg()
	if gp != gp.m.g0 {
		throw("notesleep not on g0")
	}
	ns := int64(-1)
	if *cgo_yield != nil {
		// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
		ns = 10e6
	}
	for atomic.Load(key32(&n.key)) == 0 {					// 這里通過(guò) n.key 判斷線(xiàn)程是否喚醒,如果等于 0,表示未喚醒,線(xiàn)程繼續(xù)休眠
		gp.m.blocked = true
		futexsleep(key32(&n.key), 0, ns)					// 調(diào)用 futex 休眠線(xiàn)程,線(xiàn)程會(huì)“阻塞”在這里,直到被喚醒
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		gp.m.blocked = false								// “喚醒”,設(shè)置線(xiàn)程的 blocked 標(biāo)記為 false
	}
}

// One-time notifications.
func noteclear(n *note) {									
	n.key = 0												// 執(zhí)行到 noteclear 說(shuō)明,線(xiàn)程已經(jīng)被喚醒了,這時(shí)候線(xiàn)程重置 n.key 標(biāo)志位為 0
}

線(xiàn)程休眠是通過(guò)調(diào)用 futex 進(jìn)入操作系統(tǒng)內(nèi)核完成線(xiàn)程休眠的,關(guān)于 futex 的內(nèi)容可以參考 這里 。

線(xiàn)程的 n.key 是休眠的標(biāo)志位,當(dāng) n.key 不等于 0 時(shí)表示有線(xiàn)程在喚醒休眠線(xiàn)程,線(xiàn)程從休眠狀態(tài)恢復(fù)到正常狀態(tài)。喚醒休眠線(xiàn)程通過(guò)調(diào)用 notewakeup(&nmp.park) 函數(shù)實(shí)現(xiàn):

func notewakeup(n *note) {
	old := atomic.Xchg(key32(&n.key), 1)
	if old != 0 {
		print("notewakeup - double wakeup (", old, ")\n")
		throw("notewakeup - double wakeup")
	}
	futexwakeup(key32(&n.key), 1)					// 調(diào)用 futexwakeup 喚醒休眠線(xiàn)程
}

首先,線(xiàn)程是怎么找到休眠線(xiàn)程的?線(xiàn)程通過(guò)全局空閑線(xiàn)程隊(duì)列找到空閑的線(xiàn)程,并且將空閑線(xiàn)程的休眠標(biāo)志位 m.park 傳給 notewakeup ,最后調(diào)用 futexwakeup 喚醒休眠線(xiàn)程。

值得一提的是,喚醒的線(xiàn)程在喚醒之后還是會(huì)繼續(xù)找可運(yùn)行的 goroutine 直到找到:

func stopm() {
	...
	mPark()								// 如果 mPark 返回,表示線(xiàn)程被喚醒,開(kāi)始正常工作
	acquirep(gp.m.nextp.ptr())			// 前面休眠前,線(xiàn)程已經(jīng)和 P 解綁了。這里在給線(xiàn)程找一個(gè) P 綁定
	gp.m.nextp = 0						// 線(xiàn)程已經(jīng)綁定到 P 了,重置 nextp
}

基本這就是調(diào)度策略中很重要的一部分,線(xiàn)程如何找 goroutine。找到 goroutine 之后調(diào)用 gogo 執(zhí)行該 goroutine。

3. 小結(jié)

本講繼續(xù)豐富了調(diào)度器的調(diào)度策略,下一講,我們開(kāi)始非 main goroutine 的介紹。


小編推薦閱讀

好特網(wǎng)發(fā)布此文僅為傳遞信息,不代表好特網(wǎng)認(rèn)同期限觀點(diǎn)或證實(shí)其描述。

Go v1.62
Go v1.62
類(lèi)型:動(dòng)作冒險(xiǎn)  運(yùn)營(yíng)狀態(tài):正式運(yùn)營(yíng)  語(yǔ)言:中文   

游戲攻略

游戲禮包

游戲視頻

游戲下載

游戲活動(dòng)

GoEscape是一款迷宮逃脫休閑闖關(guān)游戲。在這款游戲中,玩家可以挑戰(zhàn)大量關(guān)卡,通過(guò)旋轉(zhuǎn)屏幕的方式幫助球球

相關(guān)視頻攻略

更多

掃二維碼進(jìn)入好特網(wǎng)手機(jī)版本!

掃二維碼進(jìn)入好特網(wǎng)微信公眾號(hào)!

本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請(qǐng)發(fā)郵件[email protected]

湘ICP備2022002427號(hào)-10 湘公網(wǎng)安備:43070202000427號(hào)© 2013~2025 haote.com 好特網(wǎng)