go語(yǔ)言提供的數(shù)據(jù)類(lèi)型中,只有channel是并發(fā)安全的,基礎(chǔ)map并不是并發(fā)安全的。以下三種方案實(shí)現(xiàn)了并發(fā)安全的map。
方案一:讀寫(xiě)鎖+map
實(shí)現(xiàn)原理:
給map添加一把讀寫(xiě)鎖,讀操作加讀鎖進(jìn)行讀??;添加,更新,刪除,遍歷,獲取長(zhǎng)度這些操作加寫(xiě)鎖后在進(jìn)行操作。
代碼實(shí)現(xiàn):
以下代碼是并發(fā)map的實(shí)現(xiàn)演示:
type RWMap struct {
sync.RWMutex
m map[any]any
}
func NewGRWMap() *RWMap {
return &RWMap{
m: make(map[any]any),
}
}
func (m *RWMap) Get(k int) (any, bool) {
m.RLock()
defer m.RUnlock()
v, existed := m.m[k]
return v, existed
}
func (m *RWMap) Set(k any, v any) {
m.Lock()
defer m.Unlock()
m.m[k] = v
}
func (m *RWMap) Delete(k any) {
m.Lock()
defer m.Unlock()
delete(m.m, k)
}
func (m *RWMap) Len() int {
m.RLock()
defer m.RUnlock()
return len(m.m)
}
func (m *RWMap) Each(f func(k, v any) bool) {
m.RLock()
defer m.RUnlock()
for k, v := range m.m {
if !f(k, v) {
return
}
}
}
上述代碼在讀的時(shí)候加了個(gè)讀鎖,這個(gè)讀鎖在sync.RWMutex中并沒(méi)有使用鎖,只是將 readerCount
這個(gè)字段+1。增刪改是加了寫(xiě)鎖,寫(xiě)鎖在sync.RWMutex中每次都需要加鎖。
以上可知,讀寫(xiě)鎖的加鎖力度很大,當(dāng)需要讀多寫(xiě)少的情況下可以使用讀寫(xiě)鎖加map實(shí)現(xiàn)并發(fā)安全。
方案二:分片加鎖
實(shí)現(xiàn)原理:
分片加鎖的原理就如其名字一樣,將一個(gè)map分成多個(gè)片段(一個(gè)片段是一個(gè)map),每個(gè)片段有自己的鎖。
代碼實(shí)現(xiàn):
concurrent-map
提供了一種高性能的解決方案:通過(guò)對(duì)內(nèi)部map
進(jìn)行分片,降低鎖粒度,從而達(dá)到最少的鎖等待時(shí)間(鎖沖突)
以下是分片加鎖中重要的數(shù)據(jù)類(lèi)型的結(jié)構(gòu)體:
var SHARD_COUNT = 32
type ConcurrentMap[K comparable, V any] struct {
shards []*ConcurrentMapShared[K, V]
sharding func(key K) uint32
}
type ConcurrentMapShared[K comparable, V any] struct {
items map[K]V
sync.RWMutex
}
結(jié)構(gòu)如下圖所示:
上述代碼使用泛型實(shí)現(xiàn)了不同類(lèi)型的map[comparable]any
。調(diào)用NewWithCustomShardingFunction
函數(shù),傳入泛型的類(lèi)型參數(shù)和哈希函數(shù),就可以得到一個(gè)并發(fā)安全的map了。
func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
m := ConcurrentMap[K, V]{
sharding: sharding,
shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT),
}
for i := 0; i < SHARD_COUNT; i++ {
m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)}
}
return m
}
func New[V any]() ConcurrentMap[string, V] {
return create[string, V](fnv32)
}
func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
return create[K, V](sharding)
}
添加過(guò)程
func (m ConcurrentMap[K, V]) Set(key K, value V) {
shard := m.GetShard(key)
shard.Lock()
shard.items[key] = value
shard.Unlock()
}
func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}
大致流程如上圖所示:
- 調(diào)用sharding得到哈希值。
- 對(duì)哈希值取模于切片長(zhǎng)度,得到對(duì)應(yīng)的分片map。
- 對(duì)map加寫(xiě)鎖進(jìn)行操作。
其他流程大致一樣,大致都是先找“坑”,再進(jìn)行讀寫(xiě)操作。也可以將分片加鎖方案簡(jiǎn)單的理解為是對(duì)讀寫(xiě)加鎖的方案的升級(jí)。
方案三:sync.Map
我們先看一下1.21版本的sync.Map的結(jié)構(gòu)體:
type Map struct {
mu Mutex
read atomic.Pointer[readOnly]
dirty map[any]*entry
misses int
}
type readOnly struct {
m map[any]*entry
amended bool
}
type entry struct {
p atomic.Pointer[any]
}
這個(gè)結(jié)構(gòu)的關(guān)系如下圖所示:
應(yīng)對(duì)特殊場(chǎng)景的 sync.Map
在官方文檔中指出,在以下兩個(gè)場(chǎng)景中使用sync.Map,會(huì)比使用讀寫(xiě)鎖+map,的性能好:
- 只會(huì)增長(zhǎng)的緩存系統(tǒng)中,一個(gè)key只寫(xiě)入一次而被讀很多次;
- 多個(gè)goroutine為不相交的讀,寫(xiě)和重寫(xiě)的鍵值對(duì)。
sync.Map的操作流程
讀Load()
func (x *Pointer[T]) Load() *T { return (*T)(LoadPointer(&x.v)) }
func (m *Map) loadReadOnly() readOnly {
if p := m.read.Load(); p != nil {
return *p
}
return readOnly{}
}
func (m *Map) Load(key any) (value any, ok bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
return e.load()
}
func (e *entry) load() (value any, ok bool) {
p := e.p.Load()
if p == nil || p == expunged {
return nil, false
}
return *p, true
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
m.read.Store(&readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
讀流程:
先去read中讀,有數(shù)據(jù)直接讀取e.load()
結(jié)束;沒(méi)有加鎖,去dirty中讀,e換成dirty的,調(diào)用m.missLocked()
,判斷dirty是否存在這個(gè)key,不存在return nil, false
;存在e.load()
.
!ok && read.amended
這個(gè)判斷是在read不存在key并且dirty存在read中沒(méi)有的數(shù)據(jù)時(shí)為true。 m.missLocked()
記錄miss的次數(shù),當(dāng)miss的次數(shù)大于m.dirty
的長(zhǎng)度時(shí)將dirty數(shù)據(jù)給read,dirty清空,miss重置為0。
寫(xiě)Store()
func (m *Map) Store(key, value any) {
_, _ = m.Swap(key, value)
}
func (m *Map) Swap(key, value any) (previous any, loaded bool) {
read := m.loadReadOnly()
if e, ok := read.m[key]; ok {
if v, ok := e.trySwap(&value); ok {
if v == nil {
return nil, false
}
return *v, true
}
}
m.mu.Lock()
read = m.loadReadOnly()
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// The entry was previously expunged, which implies that there is a
// non-nil dirty map and this entry is not in it.
m.dirty[key] = e
}
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else if e, ok := m.dirty[key]; ok {
if v := e.swapLocked(&value); v != nil {
loaded = true
previous = *v
}
} else {
if !read.amended {
// We're adding the first new key to the dirty map.
// Make sure it is allocated and mark the read-only map as incomplete.
m.dirtyLocked()
m.read.Store(&readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
m.mu.Unlock()
return previous, loaded
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read := m.loadReadOnly()
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
寫(xiě)的流程:
先去read看key是否存在;存在:如果key的value值為expunged
,返回false,走dirty操作;否則,使用cas原子操作直接賦值,結(jié)束流程。
返回false,走dirty操作:先加鎖,再走一次read,看是否存在key。
- read存在,使用
e.unexpungeLocked()
使用cas將entry設(shè)置為nil,若cas成功,將dirty中的entry設(shè)置為nil。使用cas設(shè)置value。 - read不存在,dirty存在,使用cas設(shè)置value。
- 以上都不滿(mǎn)足(read,dirty都不存在),判斷read中是否缺少數(shù)據(jù),缺少時(shí)給dirty添加key-value;不缺少時(shí)調(diào)用
m.dirtyLocked()
,將read中的數(shù)據(jù)更新到dirty中,將其中刪除的數(shù)據(jù)設(shè)置為expunged
,之后將read的amended
設(shè)置為true,最后給dirty添加key-value。
解鎖,結(jié)束。
刪Delete()
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read := m.loadReadOnly()
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read = m.loadReadOnly()
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
delete(m.dirty, key)
m.missLocked()
}
m.mu.Unlock()
}
if ok {
return e.delete()
}
return nil, false
}
func (e *entry) delete() (value any, ok bool) {
for {
p := e.p.Load()
if p == nil || p == expunged {
return nil, false
}
if e.p.CompareAndSwap(p, nil) {
return *p, true
}
}
}
刪除流程:
先看read中是否存在,存在,直接調(diào)用e.delete()
結(jié)束
不存在,且read中缺少數(shù)據(jù),加鎖,再次查看read,存在:解鎖,調(diào)用e.delete()
結(jié)束;不存在:刪除dirty中的key,再調(diào)用m.missLocked(),解鎖,若dirty中存在并刪除了,還需要調(diào)用e.delete()
,若dirty不存在key,return結(jié)束。
遍歷Range()
func (m *Map) Range(f func(key, value any) bool) {
read := m.loadReadOnly()
if read.amended {
m.mu.Lock()
read = m.loadReadOnly()
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(&read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
遍歷流程:
獲取read,若read的數(shù)據(jù)全,遍歷read,若數(shù)據(jù)不全,加鎖,將dirty數(shù)據(jù)更新到read中,并將dirty值為nil,misses置0,再遍歷read。
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-774944.html
sync.Map安全并發(fā)實(shí)現(xiàn)
sync.Map在實(shí)現(xiàn)并發(fā)問(wèn)題的同時(shí)提升性能的幾個(gè)優(yōu)化:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-774944.html
- 用空間換時(shí)間,使用兩個(gè)map,一個(gè)無(wú)鎖,一個(gè)有鎖,減少加鎖對(duì)性能的影響。
- 優(yōu)先從無(wú)鎖的map中讀取,更新,刪除。
- 動(dòng)態(tài)調(diào)整,miss次數(shù)多之后,將加鎖map數(shù)據(jù)給無(wú)鎖map。
- 延遲刪除,刪除一個(gè)鍵值只是進(jìn)行了軟刪除,在動(dòng)態(tài)調(diào)整時(shí)會(huì)進(jìn)行硬刪除。
- double check,訪(fǎng)問(wèn)有鎖map,加鎖后再次檢查無(wú)鎖map,是否有數(shù)據(jù)。
到了這里,關(guān)于【Go進(jìn)階】怎么實(shí)現(xiàn)并發(fā)安全的map的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!