Java 常見并發(fā)容器
JDK 提供的這些容器大部分在 java.util.concurrent
包中:
- ConcurrentHashMap : 線程安全的 HashMap
- CopyOnWriteArrayList : 線程安全的 List,在讀多寫少的場(chǎng)合性能非常好,遠(yuǎn)遠(yuǎn)好于 Vector
- ConcurrentLinkedQueue : 高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。可以看做一個(gè)線程安全的 LinkedList,這是一個(gè)非阻塞隊(duì)列
- BlockingQueue : 這是一個(gè)接口,JDK 內(nèi)部通過鏈表、數(shù)組等方式實(shí)現(xiàn)了這個(gè)接口。表示阻塞隊(duì)列,非常適合用于作為數(shù)據(jù)共享的通道,解決生產(chǎn)者消費(fèi)者問題
- ConcurrentSkipListMap : 跳表的實(shí)現(xiàn)。這是一個(gè) Map,使用跳表的數(shù)據(jù)結(jié)構(gòu)進(jìn)行快速查找
ConcurrentHashMap
我們知道 HashMap 不是線程安全的,在并發(fā)場(chǎng)景下如果要保證一種可行的方式是使用 Collections.synchronizedMap()
方法來包裝 HashMap。但這是通過使用一個(gè)全局的鎖來同步不同線程間的并發(fā)訪問,因此會(huì)帶來不可忽視的性能問題。
所以就有了 HashMap 的線程安全版本 ConcurrentHashMap。
實(shí)現(xiàn)原理
在 ConcurrentHashMap 中,無論是讀操作還是寫操作都能保證很高的性能:在進(jìn)行讀操作時(shí)(幾乎)不需要加鎖,而在寫操作時(shí)通過鎖分段技術(shù)只對(duì)所操作的段加鎖而不影響客戶端對(duì)其它段的訪問。
JDK1.7
底層數(shù)據(jù)結(jié)構(gòu):Segments 數(shù)組 + HashEntry 數(shù)組 + 鏈表,采用分段鎖保證安全性。
容器中有多把鎖,每一把鎖鎖一段數(shù)據(jù),這樣在多線程訪問時(shí)不同段的數(shù)據(jù)時(shí),就不會(huì)存在鎖競(jìng)爭(zhēng)了,這樣便可以有效地提高并發(fā)效率。
JDK1.8
底層數(shù)據(jù)結(jié)構(gòu):Synchronized + CAS + Node + 紅黑樹,Node 的 val 和 next 都用 volatile 保證,保證可見性。查找、替換、賦值操作都使用 CAS。
CAS 是樂觀鎖,在一些場(chǎng)景中(并發(fā)不激烈的情況下)它比 Synchronized 和 ReentrentLock 的效率要高,當(dāng) CAS 保障不了線程安全的情況下(擴(kuò)容、hash 沖突)轉(zhuǎn)成Synchronized 來保證線程安全,大大提高了低并發(fā)下的性能。
讀取操作的實(shí)現(xiàn)
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get 操作全程無鎖。get 操作可以無鎖是由于 Node 元素的 val 和指針 next 是用 volatile 修飾的,在多線程環(huán)境下線程 A 修改節(jié)點(diǎn)的 val 或者新增節(jié)點(diǎn)的時(shí)候是對(duì)線程 B 可見的。
寫入操作的實(shí)現(xiàn)
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 根據(jù)key的進(jìn)行hash操作,找到Node數(shù)組中的位置
int hash = spread(key.hashCode());
int binCount = 0;
// 這邊加了一個(gè)循環(huán),就是不斷的嘗試,因?yàn)樵趖able的初始化和casTabAt用到了compareAndSwapInt、compareAndSwapObject
// 因?yàn)槿绻渌€程正在修改tab,那么嘗試就會(huì)失敗,所以這邊要加一個(gè)for循環(huán),不斷的嘗試
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 先判斷Node數(shù)組有沒有初始化,如果沒有初始化先初始化initTable();
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 不存在hash沖突,即該位置是null,直接用CAS插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
// 如果tab[i]不為空并且hash值為MOVED(-1),說明該鏈表正在進(jìn)行transfer操作,返回?cái)U(kuò)容完成后的table。
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 存在hash沖突,對(duì)鏈表的頭節(jié)點(diǎn)或者紅黑樹的頭節(jié)點(diǎn)加synchronized鎖,進(jìn)一步減少線程沖突
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
// 如果是鏈表,就遍歷鏈表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 如果key相同就執(zhí)行覆蓋操作
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 如果key不同就將元素插入到鏈表的尾部
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
// 如果是紅黑樹,就按照紅黑樹的結(jié)構(gòu)進(jìn)行插入。
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 鏈表長(zhǎng)度大于8, Node數(shù)組的長(zhǎng)度超過64時(shí),會(huì)將鏈表的轉(zhuǎn)化為紅黑樹
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 計(jì)數(shù)增加1,有可能觸發(fā)transfer操作(擴(kuò)容)。
addCount(1L, binCount);
return null;
}
- 根據(jù) key 的進(jìn)行 hash 操作,找到 Node 數(shù)組中的位置
- 然后加了一個(gè)死循環(huán),意思是不斷的嘗試,因?yàn)樵?table 的初始化和 casTabAt 用到了 compareAndSwapInt、compareAndSwapObject,因?yàn)槿绻渌€程正在修改table,那么嘗試就會(huì)失敗,所以要加一個(gè) fo 循環(huán),不斷的嘗試
- 之后判斷 Node 數(shù)組有沒有初始化,如果沒有初始化先初始化 initTable
- 如果不存在 hash 沖突,即該位置是 null,直接用 CAS 插入,之后跳出循環(huán)
- 如果存在了沖突并且 hash 值為 MOVED(-1),說明該鏈表正在進(jìn)行 transfer 操作,獲取到擴(kuò)容完成后的 table,進(jìn)入下一次的循環(huán)
- 存在產(chǎn)生 hash 沖突,那么對(duì)鏈表的頭節(jié)點(diǎn)或者紅黑樹的頭節(jié)點(diǎn)加 synchronized 鎖,進(jìn)一步減少線程沖突。
- 如果是鏈表,就遍歷鏈表,如果 key 相同就執(zhí)行覆蓋操作,key 不同就追加,之后跳出循環(huán)
- 如果是紅黑樹,就按照紅黑樹的結(jié)構(gòu)進(jìn)行插入,之后跳出循環(huán)
- 循環(huán)結(jié)束后判斷鏈表長(zhǎng)度是否大于8,當(dāng)同時(shí)滿足 Node 數(shù)組的長(zhǎng)度超過 64 時(shí),會(huì)將鏈表的轉(zhuǎn)化為紅黑樹
- 最后計(jì)數(shù)增加 1,同時(shí)有可能觸發(fā) transfer 操作(擴(kuò)容)
CopyOnWriteArrayList
在很多應(yīng)用場(chǎng)景中,讀操作可能會(huì)遠(yuǎn)遠(yuǎn)大于寫操作。由于讀操作根本不會(huì)修改原有的數(shù)據(jù),因此對(duì)于每次讀取都進(jìn)行加鎖其實(shí)是一種資源浪費(fèi)。我們應(yīng)該允許多個(gè)線程同時(shí)訪問 List 的內(nèi)部數(shù)據(jù),畢竟讀取操作是安全的。
這和我們之前在多線程章節(jié)講過 ReentrantReadWriteLock 讀寫鎖的思想非常類似,也就是讀讀共享、寫寫互斥、讀寫互斥、寫讀互斥。JDK 中提供了 CopyOnWriteArrayList 類比相比于在讀寫鎖的思想又更進(jìn)一步。為了將讀取的性能發(fā)揮到極致,CopyOnWriteArrayList 讀取是完全不用加鎖的,并且更厲害的是:寫入也不會(huì)阻塞讀取操作。只有寫入和寫入之間需要進(jìn)行同步等待。這樣一來,讀操作的性能就會(huì)大幅度提升。
實(shí)現(xiàn)原理
CopyOnWriteArrayList 類的所有可變操作(add,set 等等)都是通過創(chuàng)建底層數(shù)組的新副本來實(shí)現(xiàn)的。當(dāng) List 需要被修改的時(shí)候,我并不修改原有內(nèi)容,而是對(duì)原有數(shù)據(jù)進(jìn)行一次復(fù)制,將修改的內(nèi)容寫入副本。寫完之后,再將修改完的副本替換原來的數(shù)據(jù),這樣就可以保證寫操作不會(huì)影響讀操作了。
從 CopyOnWriteArrayList 的名字就能看出 CopyOnWriteArrayList 是滿足 CopyOnWrite 的。所謂 CopyOnWrite 也就是說:在計(jì)算機(jī),如果你想要對(duì)一塊內(nèi)存進(jìn)行修改時(shí),我們不在原有內(nèi)存塊中進(jìn)行寫操作,而是將內(nèi)存拷貝一份,在新的內(nèi)存中進(jìn)行寫操作,寫完之后呢,就將指向原來內(nèi)存指針指向新的內(nèi)存,原來的內(nèi)存就可以被回收掉了。
讀取操作的實(shí)現(xiàn)
private transient volatile Object[] array;
public E get(int index) {
return get(getArray(), index);
}
@SuppressWarnings("unchecked")
private E get(Object[] a, int index) {
return (E) a[index];
}
final Object[] getArray() {
return array;
}
讀取操作沒有任何同步控制和鎖操作,理由就是內(nèi)部數(shù)組 array 不會(huì)發(fā)生修改,只會(huì)被另外一個(gè) array 替換,因此可以保證數(shù)據(jù)安全。
寫入操作的實(shí)現(xiàn)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷貝新數(shù)組
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
// 釋放鎖
lock.unlock();
}
}
CopyOnWriteArrayList 寫入操作在添加元素的時(shí)候加了鎖,保證了同步,避免了多線程寫的時(shí)候會(huì) copy 出多個(gè)副本出來。
ConcurrentLinkedQueue
Java 提供的線程安全的 Queue 可以分為阻塞隊(duì)列和非阻塞隊(duì)列,其中阻塞隊(duì)列的典型例子是 BlockingQueue,非阻塞隊(duì)列的典型例子是 ConcurrentLinkedQueue,在實(shí)際應(yīng)用中要根據(jù)實(shí)際需要選用阻塞隊(duì)列或者非阻塞隊(duì)列。 阻塞隊(duì)列可以通過加鎖來實(shí)現(xiàn),非阻塞隊(duì)列可以通過 CAS 操作實(shí)現(xiàn)。
從名字可以看出,ConcurrentLinkedQueue 這個(gè)隊(duì)列使用鏈表作為其數(shù)據(jù)結(jié)構(gòu),它在高并發(fā)環(huán)境中性能表現(xiàn)得非常好。它之所有能有很好的性能,是因?yàn)槠鋬?nèi)部復(fù)雜的實(shí)現(xiàn)。
ConcurrentLinkedQueue 底層是通過 CAS 非阻塞算法來實(shí)現(xiàn)線程的。
阻塞隊(duì)列
阻塞隊(duì)列(BlockingQueue)被廣泛使用在“生產(chǎn)者-消費(fèi)者”問題中,其原因是 BlockingQueue 提供了可阻塞的插入和移除的方法。當(dāng)試圖向隊(duì)列添加元素而隊(duì)列已滿,或是想從隊(duì)列移出元素而隊(duì)列為空的時(shí)候,阻塞隊(duì)列導(dǎo)致線程阻塞。
BlockingQueue 是一個(gè)接口,繼承自 Queue,所以其實(shí)現(xiàn)類也可以作為 Queue 的實(shí)現(xiàn)來使用,而 Queue 又繼承自 Collection 接口。
阻塞隊(duì)列的相關(guān)方法如下:
- 如果將隊(duì)列當(dāng)作線程管理工具來使用,將要用到 put 和 take 方法。
- 當(dāng)試圖向滿的隊(duì)列中添加或從空的隊(duì)列中移出元素時(shí),add、remove 和 element 操作拋出異常。
- 在一個(gè)多線程程序中, 隊(duì)列會(huì)在任何時(shí)候空或滿,因此,一定要使用 offer、poll 和 peek 方法作為替代。這些方法如果不能完成任務(wù),只是給出一個(gè)錯(cuò)誤提示而不會(huì)拋出異常。poll 和 peek 方法通過返回空來指示失敗。因此,向這些隊(duì)列中插入 null 值是非法的。
java.util.concurrent
包提供了阻塞隊(duì)列的幾個(gè)變種。下面主要介紹一下 3 個(gè)常見的實(shí)現(xiàn)類:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。
ArrayBlockingQueue
ArrayBlockingQueue 是 BlockingQueue 接口的有界隊(duì)列實(shí)現(xiàn)類,底層采用數(shù)組來實(shí)現(xiàn)。ArrayBlockingQueue 在構(gòu)造時(shí)需要指定容量,并且有一個(gè)可選的參數(shù)來指定是否需要公平性。若設(shè)置了公平參數(shù),則那么等待了最長(zhǎng)時(shí)間的線程會(huì)優(yōu)先得到處理。通常,公平性會(huì)降低性能,只有在確實(shí)非常需要時(shí)才使用它。
ArrayBlockingQueue 一旦創(chuàng)建,容量不能改變。其并發(fā)控制采用可重入鎖 ReentrantLock,不管是插入操作還是讀取操作,都需要獲取到鎖才能進(jìn)行操作。當(dāng)隊(duì)列容量滿時(shí),嘗試將元素放入隊(duì)列將導(dǎo)致操作阻塞,嘗試從一個(gè)空隊(duì)列中取一個(gè)元素也會(huì)同樣阻塞。
/**
* 指定容量構(gòu)造
*/
public ArrayBlockingQueue(int capacity) {
// 不設(shè)置公平參數(shù)
this(capacity, false);
}
/**
* 指定容量和公平參數(shù)的構(gòu)造,fair為true則表示公平,等待越長(zhǎng)越優(yōu)先
* 底層通過公平鎖實(shí)現(xiàn)
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 添加一個(gè)元素,如果隊(duì)列滿,則阻塞
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
// 可重入鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 如果隊(duì)列滿,則阻塞
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
/**
* 移除并返回頭元素,如果隊(duì)列空,則阻塞
*/
public E take() throws InterruptedException {
// 可重入鎖
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果隊(duì)列空,則阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
LinkedBlockingQueue
LinkedBlockingQueue 底層基于單向鏈表實(shí)現(xiàn)的阻塞隊(duì)列,可以當(dāng)做無界隊(duì)列也可以當(dāng)做有界隊(duì)列來使用,同樣滿足先進(jìn)先出的特性,與 ArrayBlockingQueue 相比起來具有更高的吞吐量,為了防止 LinkedBlockingQueue 容量迅速增,損耗大量?jī)?nèi)存。通常在創(chuàng)建 LinkedBlockingQueue 對(duì)象時(shí),會(huì)指定其大小,如果未指定,容量等于 Integer.MAX_VALUE
。
其并發(fā)控制采用可重入鎖 ReentrantLock,不管是插入操作還是讀取操作,都需要獲取到鎖才能進(jìn)行操作。當(dāng)隊(duì)列容量滿時(shí),嘗試將元素放入隊(duì)列將導(dǎo)致操作阻塞,嘗試從一個(gè)空隊(duì)列中取一個(gè)元素也會(huì)同樣阻塞。
LinkedBlockingDeque 相當(dāng)于是 LinkedBlockingQueue 的變種,底層基于雙向鏈表實(shí)現(xiàn)。
/**
* 某種意義上的無界隊(duì)列,容量是Integer.MAX_VALUE
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 指定容量構(gòu)造,有界隊(duì)列
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
/**
* 添加一個(gè)元素,如果隊(duì)列滿,則阻塞
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* 移除并返回頭元素,如果隊(duì)列空,則阻塞
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
PriorityBlockingQueue
PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的無界阻塞隊(duì)列,它不是一個(gè)先進(jìn)先出的隊(duì)列。默認(rèn)情況下元素采用自然順序進(jìn)行排序,也可以通過自定義類實(shí)現(xiàn) compareTo()
方法來指定元素排序規(guī)則,或者初始化時(shí)通過構(gòu)造器參數(shù) Comparator 來指定排序規(guī)則。
PriorityBlockingQueue 底層基于數(shù)組實(shí)現(xiàn),默認(rèn)容量為 11。擴(kuò)容機(jī)制:每次新增檢測(cè)當(dāng)前容量,已滿的時(shí)候進(jìn)行擴(kuò)容,假設(shè)當(dāng)前容量為 n,如果 n < 64,那么新容量是 2n+2,否則是 1.5n。
PriorityBlockingQueue 并發(fā)控制采用的是可重入鎖 ReentrantLock,隊(duì)列為無界隊(duì)列(ArrayBlockingQueue 是有界隊(duì)列,LinkedBlockingQueue 也可以通過在構(gòu)造函數(shù)中傳入 capacity 指定隊(duì)列最大的容量,但是 PriorityBlockingQueue 只能指定初始的隊(duì)列大小,后面插入元素的時(shí)候,如果空間不夠的話會(huì)自動(dòng)擴(kuò)容)。
簡(jiǎn)單地說,它就是 PriorityQueue 的線程安全版本。不可以插入 null 值,同時(shí),插入隊(duì)列的對(duì)象必須是可比較大小的(comparable),否則報(bào) ClassCastException 異常。它的插入操作 put 方法不會(huì) block,因?yàn)樗菬o界隊(duì)列(take 方法在隊(duì)列為空的時(shí)候會(huì)阻塞)。文章來源:http://www.zghlxwxcb.cn/news/detail-803190.html
DelayQueue:基于 PriorityQueue 的一種特殊隊(duì)列,里面的元素只有在其到期時(shí)才能從隊(duì)列中取出。文章來源地址http://www.zghlxwxcb.cn/news/detail-803190.html
到了這里,關(guān)于安全無憂:Java并發(fā)集合容器的應(yīng)用與實(shí)踐的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!