一、概述
?
常見的容器如下圖,我們會挑選高并發(fā)中常用的容器進(jìn)行介紹。
二、ConcurrentHashMap
個ConcurrentHashMap提高效率主要提高在讀上面,由于它往里插的時候內(nèi)部又做了各種各樣的判斷,本來是鏈表的,到8之后又變成了紅黑樹,然后里面又做了各種各樣的cas的判斷,所以他往里插的數(shù)據(jù)是要更低一些的。HashMap和Hashtable雖然說讀的效率會稍微低一些,但是它往里插的時候檢查的東西特別的少,就加個鎖然后往里一插。所以,關(guān)于效率,還是看你實際當(dāng)中的需求。用幾個簡單的小程序來給大家列舉了這幾個不同的區(qū)別。
public class T04_TestConcurrentHashMap {
static Map<UUID, UUID> m = new ConcurrentHashMap<>();
static int count = Constants.COUNT;
static UUID[] keys = new UUID[count];
static UUID[] values = new UUID[count];
static final int THREAD_COUNT = Constants.THREAD_COUNT;
static {
for (int i = 0; i < count; i++) {
keys[i] = UUID.randomUUID();
values[i] = UUID.randomUUID();
}
}
static class MyThread extends Thread {
int start;
int gap = count/THREAD_COUNT;
public MyThread(int start) {
this.start = start;
}
@Override
public void run() {
for(int i=start; i<start+gap; i++) {
m.put(keys[i], values[i]);
}
}
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
Thread[] threads = new Thread[THREAD_COUNT];
for(int i=0; i<threads.length; i++) {
threads[i] =
new MyThread(i * (count/THREAD_COUNT));
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println(end - start);
System.out.println(m.size());
//-----------------------------------
start = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < 10000000; j++) {
m.get(keys[10]);
}
});
}
for(Thread t : threads) {
t.start();
}
for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
end = System.currentTimeMillis();
System.out.println(end - start);
}
}
三、BlockingQueue
BlockingQueue,是我們后面講線程池需要用到的這方面的內(nèi)容,是給線程池來做準(zhǔn)備的。BlockingQueue的概念重點是在Blocking上,Blocking阻塞,Queue隊列,是阻塞隊列。他提供了一系列的方法,我們可以在這些方法的基礎(chǔ)之上做到讓線程實現(xiàn)自動的阻塞。
我們現(xiàn)在聊的就是這個Queue里面所提供的一些可以給多線程比較友好的接口。他提供了一些什么接口呢,第一個就是offer對應(yīng)的是原來的那個add,提供了poll取數(shù)據(jù),然后提供了peek拿出來這個數(shù)據(jù)。那么這個是什么意思呢,我們讀一下這個offer的概念,offer是往里頭添加,加進(jìn)去沒加進(jìn)去它會給你一個布爾類型的返回值,和原來的add是什么區(qū)別呢,add如果加不進(jìn)去了是會拋異常的。所以一般的情況下我們用的最多的Queue里面都用offer,它會給你一個返回值,peek的概念是去取并不是讓你remove掉,poll是取并且remove掉,而且這幾個對于BlockingQueue來說也確實是線程安全的一個操作。對于Queue經(jīng)常用的接口就這么幾個,大家了解就可以。
public class T04_ConcurrentQueue {
public static void main(String[] args) {
Queue<String> strs = new ConcurrentLinkedQueue<>();
for(int i=0; i<10; i++) {
strs.offer("a" + i); //add
}
System.out.println(strs);
System.out.println(strs.size());
System.out.println(strs.poll());
System.out.println(strs.size());
System.out.println(strs.peek());
System.out.println(strs.size());
//雙端隊列Deque
}
}
四、BlockingQueue
LinkedBlockingQueue,體現(xiàn)Concurrent的這個點在哪里呢,我們來看這個LinkedBlockingQueue,用鏈表實現(xiàn)的BlockingQueue,是一個無界隊列。就是它可以一直裝到你內(nèi)存滿了為止,一直添加。
來看一下這個小程序,這么一些線程,第一個線程是我往里頭加內(nèi)容,加put。BlockingQueue在Queue的基礎(chǔ)上又添加了兩個方法,這兩個方法一個叫put,一個叫take。這兩個方法是真真正正的實現(xiàn)了阻塞。put往里裝如果滿了的話我這個線程會阻塞住,take往外取如果空了的話線程會阻塞住。所以這個BlockingQueue就實現(xiàn)了生產(chǎn)者消費者里面的那個容器。這個小程序是往里面裝了100個字符串,a開頭i結(jié)尾,每裝一個的時候睡1秒鐘。然后,后面又啟動了5個線程不斷的從里面take,空了我就等著,什么時候新加了我就馬上給它取出來。這是BlockingQueue和Queue的一個基本的概念。
public class T05_LinkedBlockingQueue {
static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
static Random r = new Random();
public static void main(String[] args) {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
try {
strs.put("a" + i); //如果滿了,就會等待
TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "p1").start();
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (;;) {
try {
System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就會等待
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "c" + i).start();
}
}
}
五、ArrayBlockingQueue
ArrayBlockingQueue是有界的,你可以指定它一個固定的值10,它容器就是10,那么當(dāng)你往里面扔容器的時候,一旦他滿了這個put方法就會阻塞住。然后你可以看看用add方法滿了之后他會報異常。offer用返回值來判斷到底加沒加成功,offer還有另外一個寫法你可以指定一個時間嘗試著往里面加1秒鐘,1秒鐘之后如果加不進(jìn)去它就返回了。
回到那個面試經(jīng)常被問到的問題,Queue和List的區(qū)別到底在哪里,主要就在這里,添加了offer、peek、poll、put、take這些個對線程友好的或者阻塞,或者等待方法。
public class T06_ArrayBlockingQueue {
static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
static Random r = new Random();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
strs.put("a" + i);
}
//strs.put("aaa"); //滿了就會等待,程序阻塞
//strs.add("aaa");
//strs.offer("aaa");
strs.offer("aaa", 1, TimeUnit.SECONDS);
System.out.println(strs);
}
}
六、DelayQueue
DelayQueue可以實現(xiàn)在時間上的排序,這個DelayQueue能實現(xiàn)按照在里面等待的時間來進(jìn)行排序。這里我們new了一個DelayQueue,他是BlockingQueue的一種也是用于阻塞的隊列,這個阻塞隊列裝任務(wù)的時候要求你必須實現(xiàn)Delayed接口,Delayed往后拖延推遲,Delayed需要做一個比較compareTo,最后這個隊列的實現(xiàn),這個時間等待越短的就會有優(yōu)先的得到運(yùn)行,所以你需要做一個比較 ,這里面他就有一個排序了,這個排序是按時間來排的,所以去做好,哪個時間返回什么樣的值,不同的內(nèi)容比較的時候可以按照時間來排序??偠灾阋獙崿F(xiàn)Comparable接口重寫 compareTo方法來確定你這個任務(wù)之間是怎么排序的。getDelay去拿到你Delay多長時間了。往里頭裝任務(wù)的時候首先拿到當(dāng)前時間,在當(dāng)前時間的基礎(chǔ)之上指定在多長時間之后這個任務(wù)要運(yùn)行,添加順序參看代碼,但是當(dāng)我們?nèi)ツ玫臅r候,一般的隊列是先加那個先往外拿那個,先進(jìn)先出。這個隊列是不一樣的,按時間進(jìn)行排序(按緊迫程度進(jìn)行排序)。DelayQueue就是按照時間進(jìn)行任務(wù)調(diào)度。
public class T07_DelayQueue {
static BlockingQueue<MyTask> tasks = new DelayQueue<>();
static Random r = new Random();
static class MyTask implements Delayed {
String name;
long runningTime;
MyTask(String name, long rt) {
this.name = name;
this.runningTime = rt;
}
@Override
public int compareTo(Delayed o) {
if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
return -1;
else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
return 1;
else
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public String toString() {
return name + " " + runningTime;
}
}
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
MyTask t1 = new MyTask("t1", now + 1000);
MyTask t2 = new MyTask("t2", now + 2000);
MyTask t3 = new MyTask("t3", now + 1500);
MyTask t4 = new MyTask("t4", now + 2500);
MyTask t5 = new MyTask("t5", now + 500);
tasks.put(t1);
tasks.put(t2);
tasks.put(t3);
tasks.put(t4);
tasks.put(t5);
System.out.println(tasks);
for(int i=0; i<5; i++) {
System.out.println(tasks.take());
}
}
}
DelayQueue本質(zhì)上用的是一個PriorityQueue,PriorityQueue是從AbstractQueue繼承的。PriorityQueue特點是它內(nèi)部你往里裝的時候并不是按順序往里裝的,而是內(nèi)部進(jìn)行了一個排序。按照優(yōu)先級,最小的優(yōu)先。它內(nèi)部實現(xiàn)的結(jié)構(gòu)是一個二叉樹,這個二叉樹可以認(rèn)為是堆排序里面的那個最小堆值排在最上面。
public class T07_01_PriorityQueque {
public static void main(String[] args) {
PriorityQueue<String> q = new PriorityQueue<>();
q.add("c");
q.add("e");
q.add("a");
q.add("d");
q.add("z");
for (int i = 0; i < 5; i++) {
System.out.println(q.poll());
}
}
}
七、SynchronousQueue
SynchronousQueue容量為0,就是這個東西它不是用來裝內(nèi)容的,SynchronousQueue是專門用來兩個線程之間傳內(nèi)容的,給線程下達(dá)任務(wù)的,之前講過一個容器叫Exchanger,本質(zhì)上這個容器的概念是一樣的??聪旅娲a,有一個線程起來等著take,里面沒有值一定是take不到的,然后就等著。然后當(dāng)put的時候能取出來,take到了之后能打印出來,最后打印這個容器的size一定是0,打印出aaa來這個沒問題。那當(dāng)把線程注釋掉,在運(yùn)行一下程序就會在這阻塞,永遠(yuǎn)等著。如果add方法直接就報錯,原因是滿了,這個容器為0,你不可以往里面扔?xùn)|西。這個Queue和其他的很重要的區(qū)別就是你不能往里頭裝東西,只能用來阻塞式的put調(diào)用,要求是前面得有人等著拿這個東西的時候你才可以往里裝,但容量為0,其實說白了就是我要遞到另外一個的手里才可以。這個SynchronousQueue看似沒有用,其實不然,SynchronousQueue在線程池里用處特別大,很多的線程取任務(wù),互相之間進(jìn)行任務(wù)的一個調(diào)度的時候用的都是它。文章來源:http://www.zghlxwxcb.cn/news/detail-479260.html
public class T08_SynchronusQueue { //容量為0
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strs = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.put("aaa"); //阻塞等待消費者消費
//strs.put("bbb");
//strs.add("aaa");
System.out.println(strs.size());
}
}
八、TransferQueue
TransferQueue傳遞,實際上是前面這各種各樣Queue的一個組合,它可以給線程來傳遞任務(wù),以此同時不像是SynchronousQueue只能傳遞一個,TransferQueue做成列表可以傳好多個。比較牛X的是它添加了一個方法叫transfer,如果我們用put就相當(dāng)于一個線程來了往里一裝它就走了。transfer就是裝完在這等著,阻塞等有人把它取走我這個線程才回去干我自己的事情。一般使用場景:是我做了一件事情,我這個事情要求有一個結(jié)果,有了這個結(jié)果之后我可以繼續(xù)進(jìn)行我下面的這個事情的時候,比方說我付了錢,這個訂單我付賬完成了,但是我一直要等這個付賬的結(jié)果完成才可以給客戶反饋。文章來源地址http://www.zghlxwxcb.cn/news/detail-479260.html
public class T09_TransferQueue {
public static void main(String[] args) throws InterruptedException {
LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
strs.transfer("aaa");
//strs.put("aaa");
/*new Thread(() -> {
try {
System.out.println(strs.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();*/
}
}
到了這里,關(guān)于高并發(fā)編程:并發(fā)容器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!