1、概述
GoRoutineMap 定義了一種類型,可以運(yùn)行具有名稱的 goroutine 并跟蹤它們的狀態(tài)。它防止創(chuàng)建具有相同名稱的多個(gè)goroutine,并且在上一個(gè)具有該名稱的 goroutine 完成后的一段退避時(shí)間內(nèi)可能阻止重新創(chuàng)建 goroutine。
使用GoRoutineMap場(chǎng)景:
- 使用協(xié)程的方式運(yùn)行函數(shù)邏輯,如果函數(shù)成功執(zhí)行,則退出該協(xié)程;如果函數(shù)執(zhí)行報(bào)錯(cuò),在指數(shù)退避的時(shí)間內(nèi)禁止再次執(zhí)行該函數(shù)邏輯。
使用GoRoutineMap大體步驟如下:
1)通過goRoutineMap.NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {....}方法創(chuàng)建GoRoutineMap結(jié)構(gòu)體對(duì)象,用于管理goroutine 并跟蹤它們的狀態(tài);
2)調(diào)用GoRoutineMap結(jié)構(gòu)體對(duì)象Run(operationName, operation)方法,其能夠防止創(chuàng)建具有相同名稱的多個(gè)goroutine,并使用協(xié)程的方式運(yùn)行函數(shù)邏輯,如果函數(shù)成功執(zhí)行,則退出該協(xié)程;如果函數(shù)執(zhí)行報(bào)錯(cuò),在指數(shù)退避的時(shí)間內(nèi)禁止再次執(zhí)行該函數(shù)邏輯。
注意 1:本文代碼基于Kubernetes 1.24.10版本,包路徑kubernetes-1.24.10/pkg/util/goroutinemap/goroutinemap.go。
注意 2:概述中涉及的代碼會(huì)在下文進(jìn)行詳細(xì)解釋。
2、goroutinemap工具包代碼詳解
2.1 相關(guān)類型詳解
GoRoutineMap工具包接口定義:
type GoRoutineMap interface { // Run adds operation name to the list of running operations and spawns a // new go routine to execute the operation. // If an operation with the same operation name already exists, an // AlreadyExists or ExponentialBackoff error is returned. // Once the operation is complete, the go routine is terminated and the // operation name is removed from the list of executing operations allowing // a new operation to be started with the same operation name without error. Run(operationName string, operationFunc func() error) error // Wait blocks until operations map is empty. This is typically // necessary during tests - the test should wait until all operations finish // and evaluate results after that. Wait() // WaitForCompletion blocks until either all operations have successfully completed // or have failed but are not pending. The test should wait until operations are either // complete or have failed. WaitForCompletion() IsOperationPending(operationName string) bool }
goRoutineMap結(jié)構(gòu)體實(shí)現(xiàn)GoRoutineMap接口,定義如下:
// goRoutineMap結(jié)構(gòu)體實(shí)現(xiàn)GoRoutineMap接口, type goRoutineMap struct { // 用于記錄goRoutineMap維護(hù)協(xié)程的狀態(tài) operations map[string]operation // 發(fā)生錯(cuò)誤時(shí)是否指數(shù)級(jí)補(bǔ)償 exponentialBackOffOnError bool // 用在多個(gè) Goroutine 等待,一個(gè) Goroutine 通知(事件發(fā)生)的場(chǎng)景 cond *sync.Cond lock sync.RWMutex } // operation結(jié)構(gòu)體對(duì)象維護(hù)單個(gè)goroutine的狀態(tài)。 type operation struct { // 是否操作掛起 operationPending bool // 單個(gè)goroutine執(zhí)行邏輯報(bào)錯(cuò)時(shí),實(shí)現(xiàn)以指數(shù)退避方式 expBackoff exponentialbackoff.ExponentialBackoff }
ExponentialBackoff結(jié)構(gòu)體包含最后一次出現(xiàn)的錯(cuò)誤、最后一次出現(xiàn)錯(cuò)誤的時(shí)間以及不允許重試的持續(xù)時(shí)間。
// ExponentialBackoff contains the last occurrence of an error and the duration // that retries are not permitted. type ExponentialBackoff struct { lastError error lastErrorTime time.Time durationBeforeRetry time.Duration }
2.2?GoRoutineMap結(jié)構(gòu)體對(duì)象初始化
通過goRoutineMap.NewGoRoutineMap方法創(chuàng)建GoRoutineMap結(jié)構(gòu)體對(duì)象,用于管理goroutine 并跟蹤它們的狀態(tài);
// NewGoRoutineMap returns a new instance of GoRoutineMap. func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap { g := &goRoutineMap{ operations: make(map[string]operation), exponentialBackOffOnError: exponentialBackOffOnError, } g.cond = sync.NewCond(&g.lock) return g }
2.3? GoRoutineMap.Run方法代碼詳解
調(diào)用GoRoutineMap結(jié)構(gòu)體對(duì)象Run(operationName, operation)方法,其能夠防止創(chuàng)建具有相同名稱的多個(gè)goroutine,并使用協(xié)程的方式運(yùn)行函數(shù)邏輯,如果函數(shù)成功執(zhí)行,則退出該協(xié)程;如果函數(shù)執(zhí)行報(bào)錯(cuò),在指數(shù)退避的時(shí)間內(nèi)禁止再次執(zhí)行該函數(shù)邏輯。
// Run函數(shù)是外部函數(shù),是goRoutineMap核心方法,其能夠防止創(chuàng)建具有相同名稱的多個(gè)goroutine,并使用協(xié)程的方式運(yùn)行函數(shù)邏輯 // 如果函數(shù)成功執(zhí)行,則退出該協(xié)程;如果函數(shù)執(zhí)行報(bào)錯(cuò),在指數(shù)退避的時(shí)間內(nèi)禁止再次執(zhí)行該函數(shù)邏輯。 func (grm *goRoutineMap) Run( operationName string, operationFunc func() error) error { grm.lock.Lock() defer grm.lock.Unlock() // 判斷grm.operations這個(gè)map中是否存在具有operationName名稱的協(xié)程 existingOp, exists := grm.operations[operationName] if exists { // 如果grm.operations這個(gè)map中已經(jīng)存在operationName名稱的協(xié)程,并且existingOp.operationPending==true,說明grm.operations中operationName名稱這個(gè)協(xié)程正在執(zhí)行函數(shù)邏輯,在這期間又有一個(gè)同名的 // operationName希望加入grm.operations這個(gè)map,此時(shí)加入map失敗并報(bào)AlreadyExistsError錯(cuò)誤 if existingOp.operationPending { return NewAlreadyExistsError(operationName) } // 到這步說明名稱為operationName名稱的協(xié)程執(zhí)行函數(shù)邏輯失敗,此時(shí)判斷此協(xié)程最后一次失敗時(shí)間 + 指數(shù)退避的時(shí)間 >= 當(dāng)前時(shí)間,如果不符合條件的話禁止執(zhí)行該協(xié)程函數(shù)邏輯。 if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil { return err } } // 如果grm.operations這個(gè)map中不存在operationName名稱的協(xié)程 或者 此協(xié)程最后一次失敗時(shí)間 + 指數(shù)退避的時(shí)間 < 當(dāng)前時(shí)間,則在grm.operations這個(gè)map中重新維護(hù)此協(xié)程(注意,operationPending=true) grm.operations[operationName] = operation{ operationPending: true, expBackoff: existingOp.expBackoff, } // 以協(xié)程方式執(zhí)行函數(shù)邏輯operationFunc() go func() (err error) { // 捕獲崩潰并記錄錯(cuò)誤,默認(rèn)不傳參的話,在程序發(fā)送崩潰時(shí),在控制臺(tái)打印一下崩潰日志后再崩潰,方便技術(shù)人員排查程序錯(cuò)誤。 defer k8sRuntime.HandleCrash() // 如果執(zhí)行operationFunc()函數(shù)邏輯不報(bào)錯(cuò)或者grm.exponentialBackOffOnError=false的話,將從grm.operations這個(gè)map中移除此operationName名稱協(xié)程; // 如果執(zhí)行operationFunc()函數(shù)邏輯報(bào)錯(cuò)并且grm.exponentialBackOffOnError=true,則將產(chǎn)生指數(shù)級(jí)補(bǔ)償,到達(dá)補(bǔ)償時(shí)間后才能再調(diào)用此operationName名稱協(xié)程的函數(shù)邏輯 // Handle completion of and error, if any, from operationFunc() defer grm.operationComplete(operationName, &err) // 處理operationFunc()函數(shù)發(fā)生的panic錯(cuò)誤,以便defer grm.operationComplete(operationName, &err)能執(zhí)行 // Handle panic, if any, from operationFunc() defer k8sRuntime.RecoverFromPanic(&err) return operationFunc() }() return nil }
如果給定lastErrorTime的durationBeforeRetry周期尚未過期,則SafeToRetry返回錯(cuò)誤。否則返回零。
// SafeToRetry returns an error if the durationBeforeRetry period for the given // lastErrorTime has not yet expired. Otherwise it returns nil. func (expBackoff *ExponentialBackoff) SafeToRetry(operationName string) error { if time.Since(expBackoff.lastErrorTime) <= expBackoff.durationBeforeRetry { return NewExponentialBackoffError(operationName, *expBackoff) } return nil }
operationComplete是一個(gè)內(nèi)部函數(shù),用于處理在goRoutineMap中已經(jīng)運(yùn)行完函數(shù)邏輯的協(xié)程。
// operationComplete是一個(gè)內(nèi)部函數(shù),用于處理在goRoutineMap中已經(jīng)運(yùn)行完函數(shù)邏輯的協(xié)程 // 如果執(zhí)行operationFunc()函數(shù)邏輯不報(bào)錯(cuò)或者grm.exponentialBackOffOnError=false的話,將從grm.operations這個(gè)map中移除此operationName名稱協(xié)程; // 如果執(zhí)行operationFunc()函數(shù)邏輯報(bào)錯(cuò)并且grm.exponentialBackOffOnError=true,則將產(chǎn)生指數(shù)級(jí)補(bǔ)償,到達(dá)補(bǔ)償時(shí)間后才能再調(diào)用此operationName名稱協(xié)程的函數(shù)邏輯 // operationComplete handles the completion of a goroutine run in the // goRoutineMap. func (grm *goRoutineMap) operationComplete( operationName string, err *error) { // Defer operations are executed in Last-In is First-Out order. In this case // the lock is acquired first when operationCompletes begins, and is // released when the method finishes, after the lock is released cond is // signaled to wake waiting goroutine. defer grm.cond.Signal() grm.lock.Lock() defer grm.lock.Unlock() if *err == nil || !grm.exponentialBackOffOnError { // 函數(shù)邏輯執(zhí)行完成無錯(cuò)誤或已禁用錯(cuò)誤指數(shù)級(jí)補(bǔ)償,將從grm.operations這個(gè)map中移除此operationName名稱協(xié)程; // Operation completed without error, or exponentialBackOffOnError disabled delete(grm.operations, operationName) if *err != nil { // Log error klog.Errorf("operation for %q failed with: %v", operationName, *err) } } else { // 函數(shù)邏輯執(zhí)行完成有錯(cuò)誤則將產(chǎn)生指數(shù)級(jí)補(bǔ)償,到達(dá)補(bǔ)償時(shí)間后才能再調(diào)用此operationName名稱協(xié)程的函數(shù)邏輯(注意,指數(shù)補(bǔ)充的協(xié)程,operationPending=false) // Operation completed with error and exponentialBackOffOnError Enabled existingOp := grm.operations[operationName] existingOp.expBackoff.Update(err) existingOp.operationPending = false grm.operations[operationName] = existingOp // Log error klog.Errorf("%v", existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName)) } }
Update是一個(gè)外部函數(shù),用于計(jì)算指數(shù)級(jí)別的退避時(shí)間。
const ( initialDurationBeforeRetry time.Duration = 500 * time.Millisecond maxDurationBeforeRetry time.Duration = 2*time.Minute + 2*time.Second ) func (expBackoff *ExponentialBackoff) Update(err *error) { if expBackoff.durationBeforeRetry == 0 { expBackoff.durationBeforeRetry = initialDurationBeforeRetry } else { expBackoff.durationBeforeRetry = 2 * expBackoff.durationBeforeRetry if expBackoff.durationBeforeRetry > maxDurationBeforeRetry { expBackoff.durationBeforeRetry = maxDurationBeforeRetry } } expBackoff.lastError = *err expBackoff.lastErrorTime = time.Now() }
3、總結(jié)
本文對(duì)Kubernetes GoRoutineMap工具包代碼進(jìn)行了詳解,通過?GoRoutineMap工具包能夠防止創(chuàng)建具有相同名稱的多個(gè)goroutine,并使用協(xié)程的方式運(yùn)行函數(shù)邏輯,如果函數(shù)成功執(zhí)行,則退出該協(xié)程;如果函數(shù)執(zhí)行報(bào)錯(cuò),在指數(shù)退避的時(shí)間內(nèi)禁止再次執(zhí)行該函數(shù)邏輯。使用Kubernetes GoRoutineMap包的好處包括以下幾點(diǎn):
-
減輕負(fù)載:當(dāng)出現(xiàn)錯(cuò)誤時(shí),使用指數(shù)退避時(shí)間可以避免過于頻繁地重新嘗試操作,從而減輕系統(tǒng)的負(fù)載。指數(shù)退避時(shí)間通過逐漸增加重試之間的等待時(shí)間,有效地減少了對(duì)系統(tǒng)資源的過度使用。
-
提高穩(wěn)定性:通過逐漸增加重試之間的等待時(shí)間,指數(shù)退避時(shí)間可以幫助應(yīng)對(duì)瞬時(shí)的故障或錯(cuò)誤。這種策略使得系統(tǒng)能夠在短時(shí)間內(nèi)自動(dòng)恢復(fù),并逐漸增加重試頻率,直到操作成功為止。這有助于提高應(yīng)用程序的穩(wěn)定性和可靠性。
-
降低網(wǎng)絡(luò)擁塞:當(dāng)網(wǎng)絡(luò)出現(xiàn)擁塞時(shí),頻繁地進(jìn)行重試可能會(huì)加重?fù)砣麊栴}并影響其他任務(wù)的正常運(yùn)行。指數(shù)退避時(shí)間通過增加重試之間的等待時(shí)間,可以降低對(duì)網(wǎng)絡(luò)的額外負(fù)載,有助于緩解網(wǎng)絡(luò)擁塞問題。
-
避免過早放棄:某些錯(cuò)誤可能是瞬時(shí)的或暫時(shí)性的,因此過早放棄重試可能會(huì)導(dǎo)致不必要的失敗。指數(shù)退避時(shí)間確保了在錯(cuò)誤發(fā)生時(shí)進(jìn)行適當(dāng)?shù)闹卦嚕员阆到y(tǒng)有更多機(jī)會(huì)恢復(fù)并成功完成操作。文章來源:http://www.zghlxwxcb.cn/news/detail-463636.html
綜上所述,使用Kubernetes GoRoutineMap工具包以協(xié)程方式處理函數(shù)邏輯可以提高系統(tǒng)的可靠性、穩(wěn)定性和性能,減輕負(fù)載并有效應(yīng)對(duì)錯(cuò)誤和故障情況。這是在Kubernetes中實(shí)施的一種常見的重試策略,常用于處理容器化應(yīng)用程序中的操作錯(cuò)誤。文章來源地址http://www.zghlxwxcb.cn/news/detail-463636.html
到了這里,關(guān)于Kubernetes GoRoutineMap工具包代碼詳解的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!