国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

golang并發(fā)安全-select

這篇具有很好參考價值的文章主要介紹了golang并發(fā)安全-select。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前面說了golang的channel,?今天我們看看golang select 是怎么實現(xiàn)的。

數(shù)據(jù)結(jié)構(gòu)

type scase struct {
	c    *hchan         // chan
	elem unsafe.Pointer // 數(shù)據(jù)
}

select 非默認的case 中都是處理channel 的 接受和發(fā)送,所有scase 結(jié)構(gòu)體中c是用來存儲select 的case中使用的channel

處理流程

golang并發(fā)安全-select,golang,開發(fā)語言,后端

select case 場景

編譯器在中間代碼生成期間會根據(jù) select 中 case 的不同對控制語句進行優(yōu)化,這一過程都發(fā)生在cmd/compile/internal/walk/select.go 中,下面會根據(jù)不同的場景進行分析代碼。

沒有case

代碼示例

func main() {
    select {}
}

如果是空的select語句,程序會被阻塞,golang 帶有死鎖監(jiān)測機制:如果當(dāng)前寫成無法被喚醒,則會panic

golang并發(fā)安全-select,golang,開發(fā)語言,后端

源碼解讀

在runtime/select.go中可以看到:如果cases為空直接調(diào)用gopark函數(shù)以waitReasonSelectNoCases的原因掛起當(dāng)前的協(xié)程,并且無法被喚醒,golang監(jiān)測到直接panic。

golang并發(fā)安全-select,golang,開發(fā)語言,后端

同樣我們在walk/select.go的walkSelectCases函數(shù)中可以看到,如果case為空直接調(diào)用runtime.block函數(shù)

golang并發(fā)安全-select,golang,開發(fā)語言,后端

只有一個case

代碼示例

func main() {
	ch := make(chan int)
	go func() {
		ch <- 1
	}()
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	}
}

如果有輸入直接打印ch data : 1 , 沒有的話會被檢測出all goroutines are asleep - deadlock!(和沒有case的一樣)

源碼解讀

如果一個非default case ,將讀寫轉(zhuǎn)換成 ch <- 或 <- ch, 正常的channel讀寫

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	// optimization: one-case select: single op.
	if ncas == 1 {
		cas := cases[0] //獲取case
		ir.SetPos(cas)
		l := cas.Init()
		if cas.Comm != nil { // 不是默認
			n := cas.Comm // 獲取case的條件語句
			l = append(l, ir.TakeInit(n)...)
			switch n.Op() {
			default:
				base.Fatalf("select %v", n.Op())

			case ir.OSEND: // 如果是 send, 無須處理
				// already ok

			case ir.OSELRECV2:
				r := n.(*ir.AssignListStmt)
                // 如果不是 data, ok := <- ch 類型,處理成<- ch
				if ir.IsBlank(r.Lhs[0]) && ir.IsBlank(r.Lhs[1]) {
					n = r.Rhs[0]
					break
				}
                // 是的話, op設(shè)置成data, ok := <- ch形式
				r.SetOp(ir.OAS2RECV)
			}

			l = append(l, n)
		}
        // 將case 條件后要執(zhí)行的語句加入帶執(zhí)行的列表
		l = append(l, cas.Body...)
        // 加入 break類型,跳出select-case 
		l = append(l, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		return l
	}
    // convert case value arguments to addresses.
	// this rewrite is used by both the general code and the next optimization.
	var dflt *ir.CommClause
	for _, cas := range cases {
		ir.SetPos(cas)
		n := cas.Comm
		if n == nil {
			dflt = cas
			continue
		}
		switch n.Op() {
		case ir.OSEND:
			n := n.(*ir.SendStmt)
			n.Value = typecheck.NodAddr(n.Value)
			n.Value = typecheck.Expr(n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[0]) {
				n.Lhs[0] = typecheck.NodAddr(n.Lhs[0])
				n.Lhs[0] = typecheck.Expr(n.Lhs[0])
			}
		}
	}
}

兩個case(一個default)

代碼示例

func main() {
	ch := make(chan int)
	select {
	case data := <-ch:
		fmt.Println("ch data:", data)
	default:
		fmt.Println("default")
	}
}

如果寫入就走<- 讀取,反之走默認

源碼解讀

如果是兩個case,其中一個是default,非default的會根據(jù)send還是recv 調(diào)用channel的selectnbsend和 selectnbrecv。這兩個方法是非阻塞的

func walkSelectCases(cases []*ir.CommClause) []ir.Node){
	// optimization: two-case select but one is default: single non-blocking op.
	if ncas == 2 && dflt != nil {
		cas := cases[0]
		if cas == dflt { // 如果是default 放在 cases[1]
			cas = cases[1]
		}

		n := cas.Comm
		ir.SetPos(n)
		r := ir.NewIfStmt(base.Pos, nil, nil, nil)
		r.SetInit(cas.Init())
		var cond ir.Node
		switch n.Op() {
		default:
			base.Fatalf("select %v", n.Op())

		case ir.OSEND:
            // 調(diào)用selectnbsend(c, v)
			// if selectnbsend(c, v) { body } else { default body }
			n := n.(*ir.SendStmt)
			ch := n.Chan
			cond = mkcall1(chanfn("selectnbsend", 2, ch.Type()), types.Types[types.TBOOL], r.PtrInit(), ch, n.Value)

		case ir.OSELRECV2:
			n := n.(*ir.AssignListStmt)
			recv := n.Rhs[0].(*ir.UnaryExpr)
			ch := recv.X
			elem := n.Lhs[0]
			if ir.IsBlank(elem) { //空的話 elem= NodNil
				elem = typecheck.NodNil()
			}
			cond = typecheck.Temp(types.Types[types.TBOOL])
            // 調(diào)用 selectnbrecv
			fn := chanfn("selectnbrecv", 2, ch.Type())
			call := mkcall1(fn, fn.Type().Results(), r.PtrInit(), elem, ch)
			as := ir.NewAssignListStmt(r.Pos(), ir.OAS2, []ir.Node{cond, n.Lhs[1]}, []ir.Node{call})
			r.PtrInit().Append(typecheck.Stmt(as))
		}

		r.Cond = typecheck.Expr(cond)
		r.Body = cas.Body
		r.Else = append(dflt.Init(), dflt.Body...)
		return []ir.Node{r, ir.NewBranchStmt(base.Pos, ir.OBREAK, nil)}
	}    
}

每次嘗試從channel讀/寫值,如果不成功則直接返回,不會阻塞。從selectnbsend和selectnbrecv看出,最后轉(zhuǎn)換成if-else

// compiler implements
//
//	select {
//	case c <- v:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selectnbsend(c, v) {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    // block:false
    // chan將 select準(zhǔn)換if-else
	return chansend(c, elem, false, getcallerpc())
}

// compiler implements
//
//	select {
//	case v, ok = <-c:
//		... foo
//	default:
//		... bar
//	}
//
// as
//
//	if selected, ok = selectnbrecv(&v, c); selected {
//		... foo
//	} else {
//		... bar
//	}
//
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
	// block:false
    // chan將 select準(zhǔn)換if-else
    return chanrecv(c, elem, false)
}

多個case

代碼示例

func main() {
	ch := make(chan int)
	go func() {
		tempArr := []int{1,2,3,4,5,6}
		for i := range tempArr {
			ch <- i
		}
	}()
	go func() {
		for {
			select {
			case i := <-ch:
				println("first: ", i)
			case i := <-ch:
				println("second", i)
			}
		}
	}()
	time.Sleep(3 * time.Second)

}

golang并發(fā)安全-select,golang,開發(fā)語言,后端

可以看到多個case,會隨機選取一個case執(zhí)行

源碼解讀

func walkSelectCases(cases []*ir.CommClause) []ir.Node {
	ncas := len(cases)
	sellineno := base.Pos
	if dflt != nil {
		ncas--
	}
    // 定義casorder為ncas大小的case語句的數(shù)組
	casorder := make([]*ir.CommClause, ncas)
    // 分別定義nsends為發(fā)送channel的case個數(shù),nrecvs為接收channel的case個數(shù)
	nsends, nrecvs := 0, 0
    // 多case編譯后待執(zhí)行的語句列表
	var init []ir.Node

	// generate sel-struct
	base.Pos = sellineno
    // 定義selv為長度為ncas的scase類型的數(shù)組
    // scasetype()函數(shù)返回的就是scase結(jié)構(gòu)體,包含c和elem兩個字段
	selv := typecheck.Temp(types.NewArray(scasetype(), int64(ncas)))
	init = append(init, typecheck.Stmt(ir.NewAssignStmt(base.Pos, selv, nil)))

	// No initialization for order; runtime.selectgo is responsible for that.
	// 定義order為2倍的ncas長度的TUINT16類型的數(shù)組
    // 注意:selv和order作為runtime.selectgo()函數(shù)的入?yún)?,前者存放scase列表內(nèi)存地址,后者用來做scase排序使用,排序是為了便于挑選出待執(zhí)行的case
    order := typecheck.Temp(types.NewArray(types.Types[types.TUINT16], 2*int64(ncas)))

	var pc0, pcs ir.Node
	if base.Flag.Race {
		pcs = typecheck.Temp(types.NewArray(types.Types[types.TUINTPTR], int64(ncas)))
		pc0 = typecheck.Expr(typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(0))))
	} else {
		pc0 = typecheck.NodNil()
	}

	// register cases 遍歷case生成scase對象放到selv中
	for _, cas := range cases {
		ir.SetPos(cas)

		init = append(init, ir.TakeInit(cas)...)

		n := cas.Comm
		if n == nil { // default:
			continue
		}

		var i int
		var c, elem ir.Node
		switch n.Op() { // 根據(jù)類型獲取chan, elem的值
		default:
			base.Fatalf("select %v", n.Op())
		case ir.OSEND: // 發(fā)送chan類型,i從0開始遞增
			n := n.(*ir.SendStmt)
			i = nsends
			nsends++
			c = n.Chan
			elem = n.Value
		case ir.OSELRECV2: // 接收chan,i從ncas開始遞減
			n := n.(*ir.AssignListStmt)
			nrecvs++
			i = ncas - nrecvs
			recv := n.Rhs[0].(*ir.UnaryExpr)
			c = recv.X
			elem = n.Lhs[0]
		}

		casorder[i] = cas
       // 定義一個函數(shù),寫入c或elem到selv數(shù)組
		setField := func(f string, val ir.Node) {
            // 放到selv數(shù)組
			r := ir.NewAssignStmt(base.Pos, ir.NewSelectorExpr(base.Pos, ir.ODOT, ir.NewIndexExpr(base.Pos, selv, ir.NewInt(int64(i))), typecheck.Lookup(f)), val)
            // 添加到帶執(zhí)行列表
			init = append(init, typecheck.Stmt(r))
		}

		c = typecheck.ConvNop(c, types.Types[types.TUNSAFEPTR])
		setField("c", c)
		if !ir.IsBlank(elem) {
			elem = typecheck.ConvNop(elem, types.Types[types.TUNSAFEPTR])
			setField("elem", elem)
		}

		// TODO(mdempsky): There should be a cleaner way to
		// handle this.
		if base.Flag.Race {
			r := mkcallstmt("selectsetpc", typecheck.NodAddr(ir.NewIndexExpr(base.Pos, pcs, ir.NewInt(int64(i)))))
			init = append(init, r)
		}
	}
    // 如果發(fā)送chan和接收chan的個數(shù)不等于ncas,直接報錯
	if nsends+nrecvs != ncas {
		base.Fatalf("walkSelectCases: miscount: %v + %v != %v", nsends, nrecvs, ncas)
	}

	// run the select  開始執(zhí)行select動作
	base.Pos = sellineno
    // 定義chosen, recvOK作為selectgo()函數(shù)的兩個返回值
    // chosen 表示被選中的case的索引,recvOK表示對于接收操作,是否成功接收
	chosen := typecheck.Temp(types.Types[types.TINT])
	recvOK := typecheck.Temp(types.Types[types.TBOOL])
	r := ir.NewAssignListStmt(base.Pos, ir.OAS2, nil, nil)
	r.Lhs = []ir.Node{chosen, recvOK}
    // 調(diào)用runtime.selectgo()函數(shù)作為運行時實際執(zhí)行多case的select動作的函數(shù)
	fn := typecheck.LookupRuntime("selectgo")
	var fnInit ir.Nodes
	r.Rhs = []ir.Node{mkcall1(fn, fn.Type().Results(), &fnInit, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), pc0, ir.NewInt(int64(nsends)), ir.NewInt(int64(nrecvs)), ir.NewBool(dflt == nil))}
	init = append(init, fnInit...)
	init = append(init, typecheck.Stmt(r))

	// selv and order are no longer alive after selectgo.
    // 執(zhí)行完selectgo()函數(shù)后,銷毀selv和order數(shù)組.
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, selv))
	init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, order))
	if base.Flag.Race {
		init = append(init, ir.NewUnaryExpr(base.Pos, ir.OVARKILL, pcs))
	}

	// dispatch cases 
    //定義一個函數(shù),根據(jù)chosen確定的case分支生成if語句,執(zhí)行該分支的語句
	dispatch := func(cond ir.Node, cas *ir.CommClause) {
		cond = typecheck.Expr(cond)
		cond = typecheck.DefaultLit(cond, nil)

		r := ir.NewIfStmt(base.Pos, cond, nil, nil)

		if n := cas.Comm; n != nil && n.Op() == ir.OSELRECV2 {
			n := n.(*ir.AssignListStmt)
			if !ir.IsBlank(n.Lhs[1]) {
				x := ir.NewAssignStmt(base.Pos, n.Lhs[1], recvOK)
				r.Body.Append(typecheck.Stmt(x))
			}
		}

		r.Body.Append(cas.Body.Take()...)
		r.Body.Append(ir.NewBranchStmt(base.Pos, ir.OBREAK, nil))
		init = append(init, r)
	}
    // 如果多case中有default分支,并且chosen小于0,執(zhí)行該default分支
	if dflt != nil {
		ir.SetPos(dflt)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OLT, chosen, ir.NewInt(0)), dflt)
	}
    // 如果有chosen選中的case分支,即chosen等于i,則執(zhí)行該分支
	for i, cas := range casorder {
		ir.SetPos(cas)
		dispatch(ir.NewBinaryExpr(base.Pos, ir.OEQ, chosen, ir.NewInt(int64(i))), cas)
	}

	return init
}

從上面代碼可以看出:

1- 初始化過程: 生成scase數(shù)組,定義selv 存放scase數(shù)組內(nèi)存地址,定義order 來給scase排序

2- 遍歷所有的case ,將case放到帶執(zhí)行列表(不包括default)

3- 調(diào)用runtime。selectgo并將selv和order作為入?yún)魅雜electgo

4- 根據(jù)selectgo返回的chosen來生成if語句,執(zhí)行對應(yīng)的case

解鎖加鎖

加鎖的順序和解鎖的順序相反。


func sellock(scases []scase, lockorder []uint16) {
	var c *hchan
	for _, o := range lockorder {
		c0 := scases[o].c
		if c0 != c {
			c = c0
			lock(&c.lock)
		}
	}
}

func selunlock(scases []scase, lockorder []uint16) {
	// 我們必須非常小心,在解鎖最后一把鎖后不要觸摸sel,因為sel可以在最后一次解鎖后立即釋放。
    //考慮以下情況。第一個M調(diào)用runtime·park()在runtime·selectgo()中傳遞sel。
    //一旦runtime·park()解鎖了最后一個鎖,另一個M會使調(diào)用select的G再次可運行,
    //并安排其執(zhí)行。當(dāng)G在另一個M上運行時,它鎖定所有鎖并釋放sel?,F(xiàn)在,如果第一個M觸摸sel,它將訪問釋放的內(nèi)存。
	for i := len(lockorder) - 1; i >= 0; i-- {
		c := scases[lockorder[i]].c
		if i > 0 && c == scases[lockorder[i-1]].c {
			continue // will unlock it on the next iteration
		}
		unlock(&c.lock)
	}
}

selectgo

selectgo 處理邏輯

golang并發(fā)安全-select,golang,開發(fā)語言,后端

// cas0指向[ncases]scase類型的數(shù)組,order0指向[2*ncases]uint16類型的數(shù)組(其中ncases必須<=65536)。
// 返回值有兩個, chosen 和 recvOK,分別表示選中的case的序號,和對接收操作是否接收成功的布爾值
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
	if debugSelect {
		print("select: cas0=", cas0, "\n")
	}
    //==== 執(zhí)行必要的初始化操作,并生成處理case的兩種順序:輪詢順序polIorder和加鎖順序lockorder。
  // 為了將scase分配到棧上,這里直接給cas1分配了64KB大小的數(shù)組,同理, 給order1分配了128KB大小的數(shù)組
	// NOTE: In order to maintain a lean stack size, the number of scases
	// is capped at 65536.
	cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
	order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
    // ncases個數(shù) = 發(fā)送chan個數(shù)+ 接收chan個數(shù)
	ncases := nsends + nrecvs
    // scases是cas1數(shù)組的前ncases個元素
	scases := cas1[:ncases:ncases]
    // 順序列表pollorder是order1的0- ncases個元素
	pollorder := order1[:ncases:ncases]
    // 加鎖列表lockorder是order1的ncase到 2 ncases 個元素
	lockorder := order1[ncases:][:ncases:ncases]
	// NOTE: 編譯器初始化的pollorder/lockorder的基礎(chǔ)數(shù)組不是零。
	// Even when raceenabled is true, there might be select
	// statements in packages compiled without -race (e.g.,
	// ensureSigM in runtime/signal_unix.go).
	var pcs []uintptr
	if raceenabled && pc0 != nil {
		pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
		pcs = pc1[:ncases:ncases]
	}
	casePC := func(casi int) uintptr {
		if pcs == nil {
			return 0
		}
		return pcs[casi]
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

    // 生成排列順序
    norder := 0
	for i := range scases {
		cas := &scases[i]

		// Omit cases without channels from the poll and lock orders.
        // 處理case中channel為空的情況
		if cas.c == nil {
			cas.elem = nil // 便于GC
			continue
		}
       // 通過fastrandn函數(shù)引入隨機性,確定pollorder列表中case的隨機順序索引
		j := fastrandn(uint32(norder + 1))
		pollorder[norder] = pollorder[j]
		pollorder[j] = uint16(i)
		norder++
	}
	// 重新生成列表
	pollorder = pollorder[:norder]
	lockorder = lockorder[:norder]

    // 根據(jù)chan地址確定lockorder加鎖排序列表的順序
    // 簡單的堆排序,以保證nlogn時間復(fù)雜度完成排序
	for i := range lockorder {
		j := i
        // 從輪詢順序開始,在同一channel上排序。
		c := scases[pollorder[i]].c
		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
			k := (j - 1) / 2
			lockorder[j] = lockorder[k]
			j = k
		}
		lockorder[j] = pollorder[i]
	}
	for i := len(lockorder) - 1; i >= 0; i-- {
		o := lockorder[i]
		c := scases[o].c
		lockorder[i] = lockorder[0]
		j := 0
		for {
			k := j*2 + 1
			if k >= i {
				break
			}
			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
				k++
			}
			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
				lockorder[j] = lockorder[k]
				j = k
				continue
			}
			break
		}
		lockorder[j] = o
	}

	if debugSelect {
		for i := 0; i+1 < len(lockorder); i++ {
			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
				throw("select: broken sort")
			}
		}
	}

    // 鎖定select中涉及的所有channel
	sellock(scases, lockorder)

	var (
		gp     *g
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)

	// === pass 1 - 查找可以等待處理的channel
	var casi int
	var cas *scase
	var caseSuccess bool
	var caseReleaseTime int64 = -1
	var recvOK bool
	for _, casei := range pollorder {
		casi = int(casei) // case的索引
		cas = &scases[casi]  
		c = cas.c
		if casi >= nsends { // 處理接收channel的case
			sg = c.sendq.dequeue()
			if sg != nil {
                // 如果當(dāng)前channel的sendq上有等待的goroutine,
                // 跳到recv代碼 并從緩沖區(qū)讀取數(shù)據(jù)后將等待goroutine中的數(shù)據(jù)放入到緩沖區(qū)中相同的位置
				goto recv
			}
			if c.qcount > 0 {
                //如果當(dāng)前channel的緩沖區(qū)不為空,就會跳到bufrecv標(biāo)簽處從緩沖區(qū)獲取數(shù)據(jù);
				goto bufrecv
			}
			if c.closed != 0 {
                //如果當(dāng)前channel已經(jīng)被關(guān)閉,就會跳到rclose讀取末尾數(shù)據(jù)和收尾工作;
				goto rclose
			}
		} else { // 處理發(fā)送channel的case
			if raceenabled {
				racereadpc(c.raceaddr(), casePC(casi), chansendpc)
			}
			if c.closed != 0 {
                // 如果當(dāng)前channel已經(jīng)被關(guān)閉就會直接跳到sclose標(biāo)簽(panic中止程序)
				goto sclose
			}
			sg = c.recvq.dequeue()
			if sg != nil {
                // 如果當(dāng)前channel的recvq上有等待的goroutine,就會跳到 send標(biāo)簽向channel發(fā)送數(shù)據(jù);
				goto send
			}
			if c.qcount < c.dataqsiz {
                // 如果當(dāng)前channel的緩沖區(qū)存在空閑位置,就會將待發(fā)送的數(shù)據(jù)存入緩沖區(qū);
				goto bufsend
			}
		}
	}

	if !block { // 如果是非阻塞,即包含default分支,解鎖所有channel并返回
		selunlock(scases, lockorder)
		casi = -1
		goto retc
	}

	// === pass 2 - 將當(dāng)前goroutine根據(jù)需要掛在chan的sendq或recvq上
	gp = getg() // 獲取當(dāng)前的groutine
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting // 正在等待的sudog結(jié)構(gòu);按鎖定順序
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		c = cas.c
		sg := acquireSudog()
        // 獲取sudog,將當(dāng)前goroutine綁定到sudog上
		sg.g = gp
		sg.isSelect = true
        // 在分配elem和在gp.waiting上排隊sg之間沒有堆棧分割,copystack可以找到它。
		sg.elem = cas.elem
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		sg.c = c
        // 按鎖定順序構(gòu)建waiting list 。
		*nextp = sg
		nextp = &sg.waitlink
        // 加入相應(yīng)等待隊列
		if casi < nsends {
			c.sendq.enqueue(sg)
		} else {
			c.recvq.enqueue(sg)
		}
	}

	// 被喚醒后會根據(jù) param 來判斷是否是由 close 操作喚醒的,所以先置為 nil
	gp.param = nil
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
    // 掛起當(dāng)前goroutine
	gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
	gp.activeStackChans = false
    // 加鎖所有的channel
	sellock(scases, lockorder)

	gp.selectDone = 0
	sg = (*sudog)(gp.param)
     // param 存放喚醒 goroutine 的 sudog,如果是關(guān)閉操作喚醒的,那么就為 nil
	gp.param = nil

	// === pass 3 - 當(dāng)前 Goroutine 被喚醒之后找到滿足條件的 Channel 并進行處理
    //dequeue from unsuccessful chans
	// otherwise they stack up on quiet channels
	// record the successful case, if any.
	// We singly-linked up the SudoGs in lock order.
    // 從不成功的通道中退出隊列,否則它們會堆積在安靜的通道上,記錄成功的案例(如果有的話)。我們單獨將SudoG按鎖定順序連接起來。


	casi = -1
	cas = nil
	caseSuccess = false
    // 當(dāng)前goroutine 的 waiting 鏈表按照lockorder順序存放著case的sudog
	sglist = gp.waiting
    // 在從 gp.waiting 取消case的sudog鏈接之前清除所有元素,便于GC
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.isSelect = false
		sg1.elem = nil
		sg1.c = nil
	}
    // 清楚當(dāng)前goroutine的waiting鏈表,因為被sg代表的協(xié)程喚醒了
	gp.waiting = nil

	for _, casei := range lockorder {
		k = &scases[casei]
        // 如果相等說明,goroutine是被當(dāng)前case的channel收發(fā)操作喚醒的
		if sg == sglist {
            // sg喚醒了當(dāng)前goroutine, 則當(dāng)前G已經(jīng)從sg的隊列中出隊,這里不需要再次出隊
			casi = int(casei)
			cas = k
			caseSuccess = sglist.success
			if sglist.releasetime > 0 {
				caseReleaseTime = sglist.releasetime
			}
		} else {
            // 不是此case喚醒當(dāng)前goroutine, 將goroutine從case對應(yīng)的隊列(發(fā)送或接收)出隊
			c = k.c
			if int(casei) < nsends {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
         // 釋放當(dāng)前case的sudog,然后處理下一個case的sudog
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}

	if cas == nil {
		throw("selectgo: bad wakeup")
	}

	c = cas.c

	if debugSelect {
		print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
	}

	if casi < nsends {
		if !caseSuccess {
			goto sclose
		}
	} else {
		recvOK = caseSuccess
	}

	if raceenabled {
		if casi < nsends {
			raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
		} else if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
	}
	if msanenabled {
		if casi < nsends {
			msanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			msanwrite(cas.elem, c.elemtype.size)
		}
	}
	if asanenabled {
		if casi < nsends {
			asanread(cas.elem, c.elemtype.size)
		} else if cas.elem != nil {
			asanwrite(cas.elem, c.elemtype.size)
		}
	}

	selunlock(scases, lockorder)
	goto retc

bufrecv:
    // 能從buffer獲取數(shù)據(jù)
	if raceenabled {
		if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
		}
		racenotify(c, c.recvx, nil)
	}
	if msanenabled && cas.elem != nil {
		msanwrite(cas.elem, c.elemtype.size)
	}
	if asanenabled && cas.elem != nil {
		asanwrite(cas.elem, c.elemtype.size)
	}
	recvOK = true
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	// 發(fā)送數(shù)據(jù)到緩存
	if raceenabled {
		racenotify(c, c.sendx, nil)
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

recv:
    // 從休眠sender(sg)接收
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncrecv: cas0=", cas0, " c=", c, "\n")
	}
	recvOK = true
	goto retc

rclose:
    // 讀取結(jié)束的channel
	selunlock(scases, lockorder)
	recvOK = false
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	if raceenabled {
		raceacquire(c.raceaddr())
	}
	goto retc

send:
	// 想休眠的接收房發(fā)送數(shù)據(jù)
	if raceenabled {
		raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	if asanenabled {
		asanread(cas.elem, c.elemtype.size)
	}
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncsend: cas0=", cas0, " c=", c, "\n")
	}
	goto retc

retc:
	if caseReleaseTime > 0 {
		blockevent(caseReleaseTime-t0, 1)
	}
	return casi, recvOK

sclose:
	// 向關(guān)閉的channel發(fā)送數(shù)據(jù)
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
}

總結(jié)

簡單總結(jié)下select對case處理邏輯:

1- 空的case 會被golang監(jiān)聽到無法喚醒的協(xié)程,會panic

2- 如果只有一個case, 根據(jù)操作類型轉(zhuǎn)換成 <- ch 或 成ch <- () (會跳用channel 的 chansend , chanrecv)

3- 如果一個default 一個非default 的case,非default會走 selectnbsend 和 selectnbrecv 非阻塞的方法(最后轉(zhuǎn)換成if-else 語句)

4- 多個case 的情況下, cmd/compile/internal/walk/select.go 優(yōu)化程序中:

4.1 對 scase 數(shù)組, selv ,order數(shù)組初始化,將case放在帶執(zhí)行列表中

4.2 調(diào)用selectgo函數(shù),根據(jù)返回的chosen 結(jié)果來生成if語句,執(zhí)行對應(yīng)的case

selectgo 函數(shù):

1- 隨機生成一個便利case 的 輪詢 poollorder, 根據(jù)channel 地址生成一個枷鎖順序的lockorder。(隨機順序保證公平性,加鎖順序能夠避免思索)

2- 根據(jù)pollorder順序查找cases是否包含立即處理的chan, 如果有就處理。沒有處理的話,創(chuàng)建 sudo 結(jié)構(gòu),將當(dāng)前的G 加入各case的channel 對應(yīng)的 接收發(fā)送隊列,等待其他G喚醒

3- 當(dāng)調(diào)度器 喚醒當(dāng)前的G,會按照lockorder ,訪問所有的case。從中找到需要處理的case進行讀寫處理,同時從所有的case 的發(fā)送姐搜隊列中移除當(dāng)前的文章來源地址http://www.zghlxwxcb.cn/news/detail-819410.html

到了這里,關(guān)于golang并發(fā)安全-select的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Golang 中的 map 為什么是并發(fā)不安全的?

    Golang 中的 map 為什么是并發(fā)不安全的?

    ??golang 中的 map 是并發(fā)不安全的,多個 go 協(xié)程同時對同一個 map 進行讀寫操作時,會導(dǎo)致數(shù)據(jù)競爭(data race)問題,程序會 panic。 ??如果一個協(xié)程正在寫入 map,而另一個協(xié)程正在讀取或?qū)懭?map,那么就有可能出現(xiàn)一些未定義的行為,例如:讀取到的值可能是過期的、不

    2024年02月05日
    瀏覽(25)
  • golang實現(xiàn)webgis后端開發(fā)

    golang實現(xiàn)webgis后端開發(fā)

    目錄 前言 二、實現(xiàn)步驟 1.postgis數(shù)據(jù)庫和model的綁定 2.將pg庫中的要素轉(zhuǎn)換為geojson (1)幾何定義 (2)將wkb解析為幾何類型 (3)定義geojson類型 (4)數(shù)據(jù)轉(zhuǎn)換 (5)數(shù)據(jù)返回 ?2.前端傳入的geojson儲存到數(shù)據(jù)庫 3、其他功能實現(xiàn) 總結(jié) ????????停更了接近一個月都在研究一門新語言gola

    2024年02月08日
    瀏覽(26)
  • Go 語言 map 是并發(fā)安全的嗎?

    原文鏈接: Go 語言 map 是并發(fā)安全的嗎? Go 語言中的 map 是一個非常常用的數(shù)據(jù)結(jié)構(gòu),它允許我們快速地存儲和檢索鍵值對。然而,在并發(fā)場景下使用 map 時,還是有一些問題需要注意的。 本文將探討 Go 語言中的 map 是否是并發(fā)安全的,并提供三種方案來解決并發(fā)問題。 先來

    2024年02月06日
    瀏覽(16)
  • 建站系列(六)--- 后端開發(fā)語言

    建站系列(一)— 網(wǎng)站基本常識 建站系列(二)— 域名、IP地址、URL、端口詳解 建站系列(三)— 網(wǎng)絡(luò)協(xié)議 建站系列(四)— Web服務(wù)器之Apache、Nginx 建站系列(五)— 前端開發(fā)語言之HTML、CSS、JavaScript 建站系列(六)— 后端開發(fā)語言 建站系列(七)— 常用前后端框架

    2024年02月09日
    瀏覽(24)
  • 【Golang】go編程語言適合哪些項目開發(fā)?

    【Golang】go編程語言適合哪些項目開發(fā)?

    前言 在當(dāng)今數(shù)字化時代,軟件開發(fā)已成為各行各業(yè)的核心需求之一。 而選擇適合的編程語言對于項目的成功開發(fā)至關(guān)重要。 本文將重點探討Go編程語言適合哪些項目開發(fā),以幫助讀者在選擇合適的編程語言時做出明智的決策。 Go 編程語言適合哪些項目開發(fā)? Go是由Google開發(fā)

    2024年02月04日
    瀏覽(29)
  • select實現(xiàn)服務(wù)器并發(fā)
  • 后端開發(fā)必知的11個線程安全小技巧

    后端開發(fā)必知的11個線程安全小技巧

    ? 對于從事后端開發(fā)的同學(xué)來說,線程安全問題是我們每天都需要考慮的問題。 ? 線程安全問題通俗地講主要是在多線程的環(huán)境下,不同線程同時讀和寫公共資源(臨界資源)導(dǎo)致的數(shù)據(jù)異常問題。 ? 比如:變量a=0,線程1給該變量+1,線程2也給該變量+1。此時,線程3獲取

    2024年02月15日
    瀏覽(21)
  • Go學(xué)習(xí)圣經(jīng):Go語言實現(xiàn)高并發(fā)CRUD業(yè)務(wù)開發(fā)

    Go學(xué)習(xí)圣經(jīng):Go語言實現(xiàn)高并發(fā)CRUD業(yè)務(wù)開發(fā)

    現(xiàn)在 拿到offer超級難 ,甚至連面試電話,一個都搞不到。 尼恩的技術(shù)社群中(50+),很多小伙伴憑借 “左手云原生+右手大數(shù)據(jù)”的絕活,拿到了offer,并且是非常優(yōu)質(zhì)的offer, 據(jù)說年終獎都足足18個月 。 第二個案例就是:前段時間,一個2年小伙伴希望漲薪到18K, 尼恩把

    2024年02月11日
    瀏覽(20)
  • 【Golang】VsCode下開發(fā)Go語言的環(huán)境配置(超詳細圖文詳解)

    【Golang】VsCode下開發(fā)Go語言的環(huán)境配置(超詳細圖文詳解)

    ??推薦網(wǎng)站(不斷完善中):個人博客 ??個人主頁:個人主頁 ??相關(guān)專欄:CSDN專欄、個人專欄 ??立志賺錢,干活想躺,瞎分享的摸魚工程師一枚 ? 話說在前,Go語言的編碼方式是 UTF-8 ,理論上你直接使用文本進行編輯也是可以的,當(dāng)然為了提升我們的開發(fā)效率我們還是需

    2024年02月07日
    瀏覽(26)
  • 后端開發(fā)有哪幾種語言? - 易智編譯EaseEditing

    后端開發(fā)有哪幾種語言? - 易智編譯EaseEditing

    后端開發(fā)是構(gòu)建應(yīng)用程序的一部分,負責(zé)處理服務(wù)器端的邏輯、數(shù)據(jù)庫交互和數(shù)據(jù)處理。有許多編程語言可用于后端開發(fā),以下是一些常見的后端開發(fā)語言: Java: Java是一種廣泛使用的面向?qū)ο缶幊陶Z言,具有強大的跨平臺能力。在后端開發(fā)中,Java通常與Java EE(Java Platfor

    2024年02月11日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包