@
-
概述
- 基礎(chǔ)理論
- 并發(fā)原語
- 協(xié)程-Goroutine
- 通道-Channel
- 多路復(fù)用-Select
-
通道使用
- 超時(shí)-Timeout
- 非阻塞通道操作
- 關(guān)閉通道
- 通道迭代
- 定時(shí)器-TimerAndTicker
- 工作池-Worker Pools
- 等待組-WaitGroup
- 原子操作-Atomic
- 互斥鎖-Mutex
- 讀寫互斥鎖-RWMutex
- 有狀態(tài)協(xié)程
- 單執(zhí)行-Once
- 條件-Cond
- 上下文-Context
- 信號(hào)-signal
- Pool
- 線程安全Map
概述
基礎(chǔ)理論
Do not communicate by sharing memory; instead, share memory by communicating
也即是不要通過共享內(nèi)存來通信,相反的要通過通信來實(shí)現(xiàn)內(nèi)存共享;使用通道來控制訪問可以更容易地編寫清晰、正確的程序。
簡(jiǎn)單來說所謂并發(fā)編程是指在一個(gè)處理器上“同時(shí)”處理多個(gè)任務(wù);宏觀上并發(fā)是指在一段時(shí)間內(nèi),有多個(gè)程序在同時(shí)運(yùn)行;在微觀上 并發(fā)是指在同一時(shí)刻只能有一條指令執(zhí)行,但多個(gè)程序指令被快速的輪換執(zhí)行,使得在宏觀上具有多個(gè)進(jìn)程同時(shí)執(zhí)行的效果,但在微觀上并不是同時(shí)執(zhí)行的,只是把時(shí)間分成若干段,使多個(gè)程序快速交替的執(zhí)行。
在許多環(huán)境中,實(shí)現(xiàn)對(duì)共享變量的正確訪問使得并發(fā)編程變得困難。Go鼓勵(lì)通過共享值在通道上傳遞,實(shí)際上沒有被單獨(dú)的執(zhí)行線程主動(dòng)共享。在任何給定時(shí)刻只有一個(gè)線程可以訪問該值,因此在數(shù)據(jù)競(jìng)爭(zhēng)在設(shè)計(jì)上是不會(huì)發(fā)生的。單線程程序不需要同步原語,也不需要同步。如果通信是同步器,則仍然不需要其他同步。例如,Unix管道就非常適合這個(gè)模型;盡管Go的并發(fā)方法起源于Hoare的通信順序進(jìn)程(CSP),但也可以被視為Unix管道的類型安全泛化。
并發(fā)原語
在操作系統(tǒng)中,往往設(shè)計(jì)一些完成特定功能的、不可中斷的過程,這些不可中斷的過程稱為原語。并發(fā)原語就是在編程語言設(shè)計(jì)之初以及后續(xù)的擴(kuò)展過程中,專門為并發(fā)設(shè)計(jì)而開發(fā)的關(guān)鍵詞或代碼片段或一部分功能,進(jìn)而能夠?yàn)樵撜Z言實(shí)現(xiàn)并發(fā)提供更好的支持。
- Go官方提供并發(fā)原語:goroutine、sync包下的Mutex、RWMutex、Once、WaitGroup、Cond、channel、Pool、Context、Timer、atomic等等。
- 擴(kuò)展并發(fā)原語:Semaphore、SingleFlight、CyclicBarrier、ReentrantLock等等。
協(xié)程-Goroutine
在Go語言中,每一個(gè)并發(fā)的執(zhí)行單元叫作一個(gè)goroutine,它是一個(gè)輕量級(jí)的執(zhí)行線程,被稱為協(xié)程,有別于線程、進(jìn)程程等。協(xié)程以簡(jiǎn)單的模型運(yùn)行,在同一地址空間中與其他運(yùn)行協(xié)程并發(fā)執(zhí)行的函數(shù);只需要分配堆??臻g。堆棧開始時(shí)很小因此開銷很低,并按需分配實(shí)現(xiàn)堆空間申請(qǐng)和釋放。線程被多路復(fù)用到多個(gè)操作系統(tǒng)線程上,所以如果一個(gè)線程阻塞了,比如在等待I/O時(shí),其他線程會(huì)繼續(xù)運(yùn)行。Goroutines設(shè)計(jì)隱藏了線程創(chuàng)建和管理的許多復(fù)雜性。在Go語言開啟協(xié)程非常簡(jiǎn)單,在函數(shù)或方法調(diào)用前加上go關(guān)鍵字,例如有一個(gè)函數(shù)調(diào)用f(s),這種調(diào)用它的方式是同步,而在程序中使用go f(s)調(diào)用,則會(huì)新開協(xié)程將與調(diào)用協(xié)程并發(fā)執(zhí)行。
package main
import (
"fmt"
"time"
)
func f(from string) {
for i := 0; i < 3; i++ {
fmt.Println(from, ":", i)
}
}
func main() {
f("direct")
go f("goroutine")
go func(msg string) {
fmt.Println(msg)
}("going")
time.Sleep(time.Second)
fmt.Println("done")
}
通道-Channel
Channels是一種編程結(jié)構(gòu),允許在代碼的不同部分之間移動(dòng)數(shù)據(jù),通常來自不同的 goroutine。與映射一樣,Channels通道也使用make分配,返回對(duì)底層數(shù)據(jù)結(jié)構(gòu)的引用。如果提供了一個(gè)可選的整數(shù)參數(shù)則可設(shè)置通道的緩沖區(qū)大小。對(duì)于非緩沖通道或同步通道,默認(rèn)值為零。無緩沖通道將通信(值的交換)與同步結(jié)合起來,保證兩個(gè)計(jì)算(例程)處于已知狀態(tài)。
通道是連接并發(fā)程序的管道,可以從一個(gè)運(yùn)行協(xié)程向通道發(fā)送值,并從另一個(gè)運(yùn)行協(xié)程接收這些值。默認(rèn)情況下,通道是無緩沖的,這意味著只有當(dāng)有相應(yīng)的接收(<- chan)準(zhǔn)備接收發(fā)送的值時(shí),通道才會(huì)接受發(fā)送(chan <-)。緩沖通道接受有限數(shù)量的值,而沒有相應(yīng)的接收器接收這些值。還可以使用通道來同步跨程序的執(zhí)行,使用阻塞接收來等待程序完成,而需要等待多個(gè)協(xié)程完成時(shí)可能更多會(huì)使用WaitGroup,后面再介紹;當(dāng)使用通道作為函數(shù)參數(shù)時(shí),可以指定通道是只發(fā)送還是接收值,也叫做定向通道,其增加了程序的類型安全性。
package main
import (
"fmt"
"time"
)
func worker(done chan bool) {
fmt.Print("working...")
time.Sleep(time.Second)
fmt.Println("done")
done <- true
}
func ping(pings chan<- string, msg string) {
pings <- msg
}
func pong(pings <-chan string, pongs chan<- string) {
msg := <-pings
pongs <- msg
}
func main() {
messages := make(chan string)
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
messagesBuf := make(chan string, 2)
messagesBuf <- "buffered"
messagesBuf <- "channel"
fmt.Println(<-messagesBuf)
fmt.Println(<-messagesBuf)
done := make(chan bool)
go worker(done)
<-done
pings := make(chan string, 1)
pongs := make(chan string, 1)
ping(pings, "passed message")
pong(pings, pongs)
fmt.Println(<-pongs)
}
多路復(fù)用-Select
-
select是一種go可以處理多個(gè)通道之間的機(jī)制,看起來和switch語句很相似,但是select其實(shí)和IO機(jī)制中的select一樣,多路復(fù)用通道,隨機(jī)選取一個(gè)進(jìn)行執(zhí)行,如果說通道(channel)實(shí)現(xiàn)了多個(gè)goroutine之間的同步或者通信,那么select則實(shí)現(xiàn)了多個(gè)通道(channel)的同步或者通信,并且select具有阻塞的特性。
-
select 是 Go 中的一個(gè)控制結(jié)構(gòu),類似于用于通信的 switch 語句。每個(gè) case 必須是一個(gè)通信操作,要么是發(fā)送要么是接收。
-
select 隨機(jī)執(zhí)行一個(gè)可運(yùn)行的 case,如果沒有 case 可運(yùn)行,它將阻塞,直到有 case 可運(yùn)行。一個(gè)默認(rèn)的子句應(yīng)該總是可運(yùn)行的。
-
當(dāng)有多個(gè)通道等待接收信息時(shí),可以使用該select語句,并且希望在其中任何一個(gè)通道首先完成時(shí)執(zhí)行一個(gè)動(dòng)作。Go的select允許等待多個(gè)通道操作,將gooutine和channel與select結(jié)合是Go的一個(gè)強(qiáng)大功能。
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
c1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
通道使用
超時(shí)-Timeout
對(duì)于連接到外部資源或需要限制執(zhí)行時(shí)間的程序來說超時(shí)非常重要。在Go 通道和select中實(shí)現(xiàn)超時(shí)是簡(jiǎn)單且優(yōu)雅的。
package main
import (
"fmt"
"time"
)
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
case <-time.After(1 * time.Second):
fmt.Println("timeout 1")
}
c2 := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second)
c2 <- "result 2"
}()
select {
case res := <-c2:
fmt.Println(res)
case <-time.After(3 * time.Second):
fmt.Println("timeout 2")
}
}
非阻塞通道操作
通道上的基本發(fā)送和接收阻塞,但可以使用帶有默認(rèn)子句的select來實(shí)現(xiàn)非阻塞發(fā)送、接收,甚至非阻塞多路選擇。無阻塞的接收如果消息上有一個(gè)可用的值,那么select將使用該值的<-messages情況;如果沒有可用的值則立即采用默認(rèn)情況。非阻塞發(fā)送的工作原理類似這里不能將msg發(fā)送到消息通道,因?yàn)樵撏ǖ罌]有緩沖區(qū),也沒有接收器,因此選擇默認(rèn)情況。可以在默認(rèn)子句之上使用多種情況來實(shí)現(xiàn)多路非阻塞選擇,對(duì)消息和信號(hào)進(jìn)行非阻塞接收。
package main
import "fmt"
func main() {
messages := make(chan string)
signals := make(chan bool)
select {
case msg := <-messages:
fmt.Println("received message", msg)
default:
fmt.Println("no message received")
}
msg := "hi"
select {
case messages <- msg:
fmt.Println("sent message", msg)
default:
fmt.Println("no message sent")
}
select {
case msg := <-messages:
fmt.Println("received message", msg)
case sig := <-signals:
fmt.Println("received signal", sig)
default:
fmt.Println("no activity")
}
}
關(guān)閉通道
關(guān)閉通道表示不再在該通道上發(fā)送任何值,可用于完成通信發(fā)送給信道的接收器。
package main
import "fmt"
func main() {
jobs := make(chan int, 5)
done := make(chan bool)
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Println("sent job", j)
}
close(jobs)
fmt.Println("sent all jobs")
<-done
}
通道迭代
上一篇基礎(chǔ)實(shí)戰(zhàn)中介紹使用for和range如何提供對(duì)基本數(shù)據(jù)結(jié)構(gòu)的迭代,在這里可以使用該range語法迭代從通道接收的值。
package main
import "fmt"
func main() {
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
fmt.Println(elem)
}
}
定時(shí)器-TimerAndTicker
經(jīng)常實(shí)際項(xiàng)目有不少需求需要使用在將來的某個(gè)時(shí)間點(diǎn)執(zhí)行Go代碼,或者在某個(gè)時(shí)間間隔重復(fù)執(zhí)行;Go內(nèi)置的定時(shí)器就能很簡(jiǎn)單實(shí)現(xiàn)這個(gè)功能。GO標(biāo)準(zhǔn)庫中的定時(shí)器主要有兩種,一種為Timer定時(shí)器,一種為Ticker定時(shí)器。Timer計(jì)時(shí)器使用一次后,就失效了,需要Reset()才能再次生效,而Ticker計(jì)時(shí)器會(huì)一直生效。在一個(gè)GO進(jìn)程中,其中的所有計(jì)時(shí)器都是由一個(gè)運(yùn)行著 timerproc() 函數(shù)的 goroutine 來保護(hù)。它使用時(shí)間堆(最小堆)的算法來保護(hù)所有的 Timer,其底層的數(shù)據(jù)結(jié)構(gòu)基于數(shù)組的最小堆,堆頂?shù)脑厥情g隔超時(shí)最近的 Timer,這個(gè) goroutine 會(huì)定期 wake up,讀取堆頂?shù)?Timer,執(zhí)行對(duì)應(yīng)的 f 函數(shù)或者 sendtime()函數(shù),而后將其從堆頂移除。Timer數(shù)據(jù)結(jié)構(gòu)如下:
package main
import (
"fmt"
"time"
)
func main() {
timer1 := time.NewTimer(2 * time.Second)
<-timer1.C
fmt.Println("Timer 1 fired")
timer2 := time.NewTimer(time.Second)
go func() {
<-timer2.C
fmt.Println("Timer 2 fired")
}()
stop2 := timer2.Stop()
if stop2 {
fmt.Println("Timer 2 stopped")
}
time.Sleep(2 * time.Second)
ticker := time.NewTicker(500 * time.Millisecond)
done := make(chan bool)
go func() {
for {
select {
case <-done:
return
case t := <-ticker.C:
fmt.Println("Tick at", t)
}
}
}()
time.Sleep(1600 * time.Millisecond)
ticker.Stop()
done <- true
fmt.Println("Ticker stopped")
}
速率限制是控制資源利用和保持服務(wù)質(zhì)量的重要機(jī)制。Go優(yōu)雅地支持用 goroutines、channels和tickers來實(shí)現(xiàn)限制速率。
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
fmt.Println("request", req, time.Now())
}
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("request", req, time.Now())
}
}
工作池-Worker Pools
工作池是一種常用的并發(fā)設(shè)計(jì)模式,它利用一組固定數(shù)量的 goroutine 來處理一組任務(wù)。任務(wù)可以被異步地添加到工作池中,等待可用的 worker goroutine 來處理。當(dāng)沒有更多的任務(wù)需要處理時(shí),worker goroutine 將會(huì)保持空閑狀態(tài),等待新的任務(wù)到來。 在 Go 中,我們可以使用通道和 Goroutine 來實(shí)現(xiàn)這種模式
package main
import (
"fmt"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "started job", j)
time.Sleep(time.Second)
fmt.Println("worker", id, "finished job", j)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}
等待組-WaitGroup
在Go語言中,sync包下的WaitGroup結(jié)構(gòu)體對(duì)象用于等待一組線程的結(jié)束;WaitGroup是go并發(fā)中最常用的工具,可以通過WaitGroup來表達(dá)這一組協(xié)程的任務(wù)是否完成,以決定是否繼續(xù)往下走,或者取任務(wù)結(jié)果。WaitGroup數(shù)據(jù)結(jié)構(gòu)如下:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
worker(i)
}()
}
wg.Wait()
}
原子操作-Atomic
- 定義:原子操作即是進(jìn)行過程中不能被中斷的操作;也就是說,針對(duì)某個(gè)值的原子操作在被進(jìn)行的過程當(dāng)中,CPU絕不會(huì)再去進(jìn)行其它的針對(duì)該值的操作。為了實(shí)現(xiàn)這樣的嚴(yán)謹(jǐn)性,原子操作僅會(huì)由一個(gè)獨(dú)立的CPU指令代表和完成。只有這樣才能夠在并發(fā)環(huán)境下保證原子操作的絕對(duì)安全。
- Go支持操作類型:int32、int64、uint32、uint64、uintptr和unsafe.Pointer類型。
- Go原子操作:
- 增或減:被用于進(jìn)行增或減的原子操作(以下簡(jiǎn)稱原子增/減操作)的函數(shù)名稱都以“Add”為前綴,并后跟針對(duì)的具體類型的名稱。例如,實(shí)現(xiàn)針對(duì)uint32類型的原子增/減操作的函數(shù)的名稱為AddUint32。事實(shí)上,sync/atomic包中的所有函數(shù)的命名都遵循此規(guī)則。
- 比較并交換:Compare And Swap,簡(jiǎn)稱CAS。在sync/atomic包中,這類原子操作由名稱以“CompareAndSwap”為前綴的若干個(gè)函數(shù)代表。
- 載入:為了原子的讀取某個(gè)值,sync/atomic代碼包同樣為我們提供了一系列的函數(shù)。這些函數(shù)的名稱都以“Load”為前綴,意為載入。
- 存儲(chǔ):與讀取操作相對(duì)應(yīng)的是寫入操作。而sync/atomic包也提供了與原子的值載入函數(shù)相對(duì)應(yīng)的原子的值存儲(chǔ)函數(shù)。這些函數(shù)的名稱均以“Store”為前綴。
- 交換:在sync/atomic代碼包中還存在著一類函數(shù)。它們的功能與前文所講的CAS操作和原子載入操作都有些類似。這樣的功能可以被稱為原子交換操作。這類函數(shù)的名稱都以“Swap”為前綴。
Go語言提供的原子操作都是非侵入式的。它們由標(biāo)準(zhǔn)庫代碼包sync/atomic中的眾多函數(shù)代表??梢酝ㄟ^調(diào)用這些函數(shù)對(duì)幾種簡(jiǎn)單的類型的值進(jìn)行原子操作。Go中管理狀態(tài)的主要機(jī)制是通過通道進(jìn)行通信,下面演示使用sync/atomic包來處理由多個(gè)線程例程訪問的原子計(jì)數(shù)器。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var ops uint64
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
for c := 0; c < 1000; c++ {
atomic.AddUint64(&ops, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Println("ops:", ops)
}
互斥鎖-Mutex
Go sync包提供了兩種鎖類型:互斥鎖sync.Mutex 和 讀寫互斥鎖sync.RWMutex,都屬于悲觀鎖。Mutex是互斥鎖,當(dāng)一個(gè) goroutine 獲得了鎖后,其他 goroutine 不能獲取鎖(只能存在一個(gè)寫或讀,不能同時(shí)讀和寫)。應(yīng)用于多個(gè)線程同時(shí)訪問臨界區(qū)],為保證數(shù)據(jù)的安全,鎖住一些共享資源, 以防止并發(fā)訪問這些共享數(shù)據(jù)時(shí)可能導(dǎo)致的數(shù)據(jù)不一致問題。數(shù)據(jù)結(jié)構(gòu)如下:
state表示鎖的狀態(tài),有鎖定、被喚醒、饑餓模式等,并且是用state的二進(jìn)制位來標(biāo)識(shí)的,不同模式下會(huì)有不同的處理方式。sema表示信號(hào)量,mutex阻塞隊(duì)列的定位是通過這個(gè)變量來實(shí)現(xiàn)的,從而實(shí)現(xiàn)goroutine的阻塞和喚醒。鎖的實(shí)現(xiàn)一般會(huì)依賴于原子操作、信號(hào)量,通過atomic 包中的一些原子操作來實(shí)現(xiàn)鎖的鎖定,通過信號(hào)量來實(shí)現(xiàn)線程的阻塞與喚醒。
package main
import (
"fmt"
"sync"
)
type Container struct {
mu sync.Mutex
counters map[string]int
}
func (c *Container) inc(name string) {
c.mu.Lock()
defer c.mu.Unlock()
c.counters[name]++
}
func main() {
c := Container{
counters: map[string]int{"a": 0, "b": 0},
}
var wg sync.WaitGroup
doIncrement := func(name string, n int) {
for i := 0; i < n; i++ {
c.inc(name)
}
wg.Done()
}
wg.Add(3)
go doIncrement("a", 10000)
go doIncrement("a", 10000)
go doIncrement("b", 10000)
wg.Wait()
fmt.Println(c.counters)
}
讀寫互斥鎖-RWMutex
讀寫互斥鎖RWMutex,是對(duì)Mutex的一個(gè)擴(kuò)展,當(dāng)一個(gè) goroutine 獲得了讀鎖后,其他 goroutine可以獲取讀鎖,但不能獲取寫鎖;當(dāng)一個(gè) goroutine 獲得了寫鎖后,其他 goroutine既不能獲取讀鎖也不能獲取寫鎖(只能存在一個(gè)寫或多個(gè)讀,可以同時(shí)讀)。常用于讀多于寫的情況(既保證線程安全,又保證性能不太差)。數(shù)據(jù)結(jié)構(gòu)如下:
- 讀寫鎖區(qū)分讀者和寫者,而互斥鎖不區(qū)分。
- 互斥鎖同一時(shí)間只允許一個(gè)線程訪問該對(duì)象,無論讀寫;讀寫鎖同一時(shí)間內(nèi)只允許一個(gè)寫者,但是允許多個(gè)讀者同時(shí)讀對(duì)象
package main
import (
"fmt"
"sync"
"time"
)
type Counter struct {
value int
rwMutex sync.RWMutex
}
func (c *Counter) GetValue() int {
c.rwMutex.RLock()
defer c.rwMutex.RUnlock()
return c.value
}
func (c *Counter) Increment() {
c.rwMutex.Lock()
defer c.rwMutex.Unlock()
c.value++
}
func main() {
counter := Counter{value: 0}
// 讀操作
for i := 0; i < 10; i++ {
go func() {
for {
fmt.Println("Value: ", counter.GetValue())
time.Sleep(time.Millisecond)
}
}()
}
// 寫操作
for {
counter.Increment()
time.Sleep(time.Second)
}
}
有狀態(tài)協(xié)程
在前面的例子中,我們使用了帶有互斥鎖的顯式鎖來同步跨多個(gè)線程對(duì)共享狀態(tài)的訪問。而另一種選擇是使用程序和通道的內(nèi)置同步特性來實(shí)現(xiàn)相同的結(jié)果。這種基于通道的方法符合Go的思想,即通過通信共享內(nèi)存,并使每個(gè)數(shù)據(jù)塊由一個(gè)線程程序擁有。
package main
import (
"fmt"
"math/rand"
"sync/atomic"
"time"
)
type readOp struct {
key int
resp chan int
}
type writeOp struct {
key int
val int
resp chan bool
}
func main() {
var readOps uint64
var writeOps uint64
reads := make(chan readOp)
writes := make(chan writeOp)
go func() {
var state = make(map[int]int)
for {
select {
case read := <-reads:
read.resp <- state[read.key]
case write := <-writes:
state[write.key] = write.val
write.resp <- true
}
}
}()
for r := 0; r < 100; r++ {
go func() {
for {
read := readOp{
key: rand.Intn(5),
resp: make(chan int)}
reads <- read
<-read.resp
atomic.AddUint64(&readOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
for w := 0; w < 10; w++ {
go func() {
for {
write := writeOp{
key: rand.Intn(5),
val: rand.Intn(100),
resp: make(chan bool)}
writes <- write
<-write.resp
atomic.AddUint64(&writeOps, 1)
time.Sleep(time.Millisecond)
}
}()
}
time.Sleep(time.Second)
readOpsFinal := atomic.LoadUint64(&readOps)
fmt.Println("readOps:", readOpsFinal)
writeOpsFinal := atomic.LoadUint64(&writeOps)
fmt.Println("writeOps:", writeOpsFinal)
}
單執(zhí)行-Once
Once 是 Go 內(nèi)置庫 sync 中一個(gè)比較簡(jiǎn)單的并發(fā)原語;顧名思義,它的作用就是執(zhí)行那些只需要執(zhí)行一次的動(dòng)作。
package main
import (
"fmt"
"sync"
)
func main() {
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
}
Once 最典型的使用場(chǎng)景就是單例對(duì)象的初始化,類似思想如在 MySQL 或者 Redis 這種頻繁訪問數(shù)據(jù)的場(chǎng)景中,建立連接的代價(jià)遠(yuǎn)遠(yuǎn)高于數(shù)據(jù)讀寫的代價(jià),因此我們會(huì)用單例模式來實(shí)現(xiàn)一次建立連接,多次訪問數(shù)據(jù),從而提升服務(wù)性能。
package main
import (
"net"
"sync"
"time"
)
// 使用互斥鎖保證線程(goroutine)安全
var connMu sync.Mutex
var conn net.Conn
func getConn() net.Conn {
connMu.Lock()
defer connMu.Unlock()
// 返回已創(chuàng)建好的連接
if conn != nil {
return conn
}
// 創(chuàng)建連接
conn, _ = net.DialTimeout("tcp", "baidu.com:80", 10*time.Second)
return conn
}
// 使用連接
func main() {
conn := getConn()
if conn == nil {
panic("conn is nil")
}
}
條件-Cond
Go 標(biāo)準(zhǔn)庫提供 Cond 原語的目的是,為等待 / 通知場(chǎng)景下的并發(fā)問題提供支持。Cond通常應(yīng)用于等待某個(gè)條件的一組goroutine,等條件變?yōu)閠rue的時(shí)候,其中一個(gè)goroutine或者所有的goroutine都會(huì)被喚醒執(zhí)行。開發(fā)實(shí)踐中使用到Cond場(chǎng)景比較少,且Cond場(chǎng)景一般也能用Channel方式實(shí)現(xiàn),所以更多人會(huì)選擇使用Channel。
package main
import (
"fmt"
"sync"
"time"
)
var (
// 1. 定義一個(gè)互斥鎖
mu sync.Mutex
cond *sync.Cond
count int
)
func init() {
// 2.將互斥鎖和sync.Cond進(jìn)行關(guān)聯(lián)
cond = sync.NewCond(&mu)
}
func worker(id int) {
// 消費(fèi)者
for {
// 3. 在需要等待的地方,獲取互斥鎖,調(diào)用Wait方法等待被通知
mu.Lock()
// 這里會(huì)不斷循環(huán)判斷 是否有待消費(fèi)的任務(wù)
for count == 0 {
cond.Wait() // 等待任務(wù)
}
count--
fmt.Printf("worker %d: 處理了一個(gè)任務(wù)", id)
// 5. 最后釋放鎖
mu.Unlock()
}
}
func main() {
// 啟動(dòng)5個(gè)消費(fèi)者
for i := 1; i <= 5; i++ {
go worker(i)
}
for {
// 生產(chǎn)者
time.Sleep(1 * time.Second)
mu.Lock()
count++
// 4. 在需要等待的地方,獲取互斥鎖,調(diào)用BroadCast/Singal方法進(jìn)行通知
cond.Broadcast()
mu.Unlock()
}
}
上下文-Context
定義:Golang 的 Context 應(yīng)用開發(fā)常用的并發(fā)控制工具,用于在程序中的 API 層或進(jìn)程之間共享請(qǐng)求范圍的數(shù)據(jù)、取消信號(hào)以及超時(shí)或截止日期。Context 又被稱為上下文,與 WaitGroup 不同的是,context 對(duì)于派生 goroutine 有更強(qiáng)的控制力,可以管理多級(jí)的 goroutine。context包的核心原理,鏈?zhǔn)絺鬟fcontext,基于context構(gòu)造新的context。下面是http的上下文示例:
package main
import (
"fmt"
"net/http"
"time"
)
func hello(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
fmt.Println("server: hello handler started")
defer fmt.Println("server: hello handler ended")
select {
case <-time.After(10 * time.Second):
fmt.Fprintf(w, "hello\n")
case <-ctx.Done():
err := ctx.Err()
fmt.Println("server:", err)
internalError := http.StatusInternalServerError
http.Error(w, err.Error(), internalError)
}
}
func main() {
http.HandleFunc("/hello", hello)
http.ListenAndServe(":8090", nil)
}
# 訪問http的接口
curl http://localhost:8090/hello
信號(hào)-signal
信號(hào)是事件發(fā)生時(shí)對(duì)進(jìn)程的通知機(jī)制。有時(shí)也稱之為軟件中斷。信號(hào)與硬件中斷的相似之處在于打斷了程序執(zhí)行的正常流程,大多數(shù)情況下,無法預(yù)測(cè)信號(hào)到達(dá)的精確時(shí)間。有時(shí)希望Go程序能夠智能地處理Unix信號(hào);例如希望服務(wù)器在接收到SIGTERM時(shí)優(yōu)雅地關(guān)閉,或者希望命令行工具在接收到SIGINT時(shí)停止處理輸入。Go程序無法捕獲信號(hào) SIGKILL 和 SIGSTOP (終止和暫停進(jìn)程),因此 os/signal
包對(duì)這兩個(gè)信號(hào)無效。
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
)
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
done := make(chan bool, 1)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
done <- true
}()
fmt.Println("awaiting signal")
<-done
fmt.Println("exiting")
}
Pool
go提供的sync.Pool是為了對(duì)象的復(fù)用,如果某些對(duì)象的創(chuàng)建比較頻繁,就把他們放入Pool中緩存起來以便使用,這樣重復(fù)利用內(nèi)存,減少GC的壓力,Go同步包中,sync.Pool
提供了保存和訪問一組臨時(shí)對(duì)象并復(fù)用它們的能力。
對(duì)于一些創(chuàng)建成本昂貴、頻繁使用的臨時(shí)對(duì)象,使用sync.Pool
可以減少內(nèi)存分配,降低GC壓力。因?yàn)?code>Go的gc算法是根據(jù)標(biāo)記清除改進(jìn)的三色標(biāo)記法,如果頻繁創(chuàng)建大量臨時(shí)對(duì)象,勢(shì)必給GC標(biāo)記帶來負(fù)擔(dān),CPU也很容易出現(xiàn)毛刺現(xiàn)象。當(dāng)然需要注意的是:存儲(chǔ)在Pool
中的對(duì)象隨時(shí)都可能在不被通知的情況下被移除。所以并不是所有頻繁使用、創(chuàng)建昂貴的對(duì)象都適用,比如DB連接、線程池。
package main
import "sync"
type Person struct {
Age int
}
// 初始化pool
var personPool = sync.Pool{
New: func() interface{} {
return new(Person)
},
}
func main() {
// 獲取一個(gè)實(shí)例
newPerson := personPool.Get().(*Person)
// 回收對(duì)象 以備其他協(xié)程使用
defer personPool.Put(newPerson)
newPerson.Age = 25
}
線程安全Map
Go中自己通過make創(chuàng)建的map不是線程安全的,Go為了解決這個(gè)問題,專門給我們提供了一個(gè)并發(fā)安全的map,這個(gè)并發(fā)安全的map不用通過make創(chuàng)建,拿來即可用,并且提供了一些不同于普通map的操作方法。文章來源:http://www.zghlxwxcb.cn/news/detail-433713.html
package main
import (
"fmt"
"sync"
)
// 創(chuàng)建一個(gè)sync包下的線程安全map對(duì)象
var myConcurrentMap = sync.Map{}
// 遍歷數(shù)據(jù)用的
var myRangeMap = sync.Map{}
func main() {
//存儲(chǔ)數(shù)據(jù)
myConcurrentMap.Store(1, "li_ming")
//取出數(shù)據(jù)
name, ok := myConcurrentMap.Load(1)
if !ok {
fmt.Println("不存在")
return
}
//打印值 li_ming
fmt.Println(name)
//該key有值,則ok為true,返回它原來存在的值,不做任何操作;該key無值,則執(zhí)行添加操作,ok為false,返回新添加的值
name2, ok2 := myConcurrentMap.LoadOrStore(1, "xiao_hong")
//因?yàn)閗ey=1存在,所以打印是 li_ming true
fmt.Println(name2, ok2)
name3, ok3 := myConcurrentMap.LoadOrStore(2, "xiao_hong")
//因?yàn)閗ey=2不存在,所以打印是 xiao_hong false
fmt.Println(name3, ok3)
//標(biāo)記刪除值
myConcurrentMap.Delete(1)
//取出數(shù)據(jù)
//name4,ok4 := myConcurrentMap.Load(1)
//if(!ok4) {
// fmt.Println("name4=不存在")
// return
//}
//fmt.Println(name4)
//遍歷數(shù)據(jù)
rangeFunc()
}
// 遍歷
func rangeFunc() {
myRangeMap.Store(1, "xiao_ming")
myRangeMap.Store(2, "xiao_li")
myRangeMap.Store(3, "xiao_ke")
myRangeMap.Store(4, "xiao_lei")
myRangeMap.Range(func(k, v interface{}) bool {
fmt.Println("data_key_value = :", k, v)
//return true代表繼續(xù)遍歷下一個(gè),return false代表結(jié)束遍歷操作
return true
})
}
文章來源地址http://www.zghlxwxcb.cn/news/detail-433713.html
- 本人博客網(wǎng)站IT小神 www.itxiaoshen.com
到了這里,關(guān)于云原生時(shí)代崛起的編程語言Go并發(fā)編程實(shí)戰(zhàn)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!