一、Lock鎖
1. 可重入鎖 ReentrantLock
我們可以使用?ReentrantLock 來替代 Synchronized鎖,實(shí)現(xiàn)方法為:
Lock lock = new ReentrantLock();
void m2 (){
try{
// 加鎖
lock.lock();
for(int i=0;i<10000;i++){
count.incrementAndGet(); // 相當(dāng)于線程安全的 count++
}
}catch (Exception e){
e.printStackTrace();
}finally {
// 釋放鎖
lock.unlock();
}
}
使用Synchronized是自動(dòng)解鎖的。但是使用lock鎖,必須使用try-catch-finally包裹,在try中加鎖,在finally中釋放鎖。
2. ReadWriteLock 讀寫鎖
允許同一時(shí)刻多個(gè)讀線程訪問,但所有寫線程均被阻塞。讀寫分離,并發(fā)性提升。
java中實(shí)現(xiàn)類的讀寫鎖為 ReentrantReadWriteLock 類。
import java.util.Random;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read (Lock lock){
try {
lock.lock();
// 模擬讀操作
Thread.sleep(1000);
System.out.println("read over!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write (Lock lock,int v){
try {
lock.lock();
// 模擬寫操作
Thread.sleep(1000);
value = v;
System.out.println("write over!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
// 使用 互斥鎖(排他鎖) lock,需要一個(gè)一個(gè)執(zhí)行線程
Runnable readR = () -> read(lock);
Runnable writeR = () -> write(lock,new Random().nextInt());
for(int i=0;i<18;i++) new Thread(readR).start();
for(int i=0;i<2;i++) new Thread(writeR).start();
}
}
這里我們使用 互斥鎖 lock 來去模擬讀寫操作時(shí)會(huì)發(fā)現(xiàn),每個(gè)線程一個(gè)一個(gè)執(zhí)行,只有當(dāng)前線程執(zhí)行完畢,下一個(gè)線程搶到資源才繼續(xù)執(zhí)行。
如果我們使用讀寫鎖進(jìn)行模擬操作,會(huì)發(fā)現(xiàn),讀操作幾乎是在一瞬間全部執(zhí)行完畢,沒有等待。
public static void main(String[] args) {
// 使用 讀寫鎖 readWriteLock,所有的讀寫操作并發(fā)執(zhí)行,讀寫不需要等待
Runnable readLockR = () -> read(readLock);
Runnable writeLockR = () -> write(writeLock,new Random().nextInt());
for(int i=0;i<18;i++) new Thread(readLockR).start();
for(int i=0;i<2;i++) new Thread(writeLockR).start();
}
3. Lock提供了Synchronized 不具備的特性:
(1)我們可以使用 tryLock 進(jìn)行嘗試鎖定,不管鎖定與否,方法都將繼續(xù)執(zhí)行可以根據(jù) tryLock 的返回值來判定是否鎖定,也可以指定tryLock的時(shí)間,由于tryLock(time)拋出異常,所以要注意unclock的釋放。
(2)使用 lock.lockInterruptibly(); 方法,可以將當(dāng)前獲得鎖的線程中斷,拋出異常并釋放鎖,需要配合 ti.interrupt(); 方法使用。
Thread ti = new Thread(() -> {
try {
lock.lockInterruptibly();
index.compareAndSet(10, 11);
index.compareAndSet(11, 10);
System.out.println(Thread.currentThread().getName()+": 10->11->10");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
ti.interrupt();
(3)初始化一個(gè)公平鎖
static Lock lock = new ReentrantLock(true);
參數(shù)為 true表示為公平鎖。當(dāng)前所有線程在等待隊(duì)列中,這個(gè)時(shí)候來了一個(gè)新的線程,如果是公平鎖,則它會(huì)去查看是否有隊(duì)列在等待,如果有,則讓這個(gè)新的線程也加入隊(duì)列等待。如果不是公平鎖,這個(gè)線程會(huì)去搶占資源。
二 、CountDownLath
CountDownLath:允許其他多個(gè)線程等待當(dāng)前l(fā)atch線程,只有當(dāng)前線程的latch計(jì)數(shù)器為0時(shí),才放行,讓其他線程執(zhí)行。
latch.countDown():每執(zhí)行一次 latch.countDown,計(jì)算器減一,直到為0。
latch.await(): latch.await相當(dāng)于一道門,只有 latch 的計(jì)數(shù)器為0的時(shí)候才放行。
import java.util.concurrent.CountDownLatch;
public class CountDownLathTest {
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
// 定義一個(gè)CountDownLatch,給定計(jì)數(shù)器為 threads.length
CountDownLatch latch = new CountDownLatch(threads.length);
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 100; j++) result += j;
// 每執(zhí)行一次 latch.countDown,計(jì)算器減一,直到為0
latch.countDown();
System.out.println(Thread.currentThread().getName());
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
// latch.await相當(dāng)于一道門,只有 latch 的計(jì)數(shù)器為0的時(shí)候才放行
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("latch計(jì)數(shù)器為0,放行了") ;
}
public static void main(String[] args) {
usingCountDownLatch();
}
}
CountDownLath 的作用和 join() 作用一樣,都是讓其他線程等待,先執(zhí)行指定線程。
private static void usingJoin() {
Thread[] threads = new Thread[100];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
int result = 0;
for (int j = 0; j < 100; j++) result += j;
System.out.println(Thread.currentThread().getName());
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
// 將線程循環(huán)放入,確保執(zhí)行順序
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("join順序線程執(zhí)行完畢,到我了") ;
}
三、CyclicBarrier
CyclicBarrier:可循環(huán)使用屏障,定義一個(gè)指定瓶頸的CyclicBarrier,當(dāng)調(diào)用 await方法到達(dá)這個(gè)瓶頸后,就去執(zhí)行相應(yīng)的瓶頸方法。
cyclicBarrier.await():如果沒有達(dá)到計(jì)數(shù)器瓶頸,就等待
cyclicBarrier.reset():使計(jì)數(shù)器重置
public static void main(String[] args) {
// CyclicBarrier 第二個(gè)參數(shù)作用是,當(dāng)達(dá)到瓶頸 20 后,去做一些事情
CyclicBarrier cyclicBarrier = new CyclicBarrier(20, new Runnable() {
@Override
public void run() {
System.out.println("達(dá)到了瓶頸,重置");
}
});
for (int i=0;i<100;i++){
int finalI = i;
new Thread(() -> {
try {
// 每執(zhí)行一次cyclicBarrier.await(),相當(dāng)與 cyclicBarrier的計(jì)數(shù)器+1,直到等于20,執(zhí)行瓶頸方法
cyclicBarrier.await();
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
CyclicBarrier使用場(chǎng)景:
? ? ? ? 1. 限流,當(dāng)請(qǐng)求數(shù)量達(dá)到一定數(shù)量時(shí),進(jìn)行限流。
? ? ? ? 2. 復(fù)雜操作數(shù)據(jù)庫、網(wǎng)絡(luò)、文件
? ? ? ? 3. 并發(fā)執(zhí)行線程-操作線程-操作
?四、Semaphore 信號(hào)量
Semaphore :用來控制同時(shí)訪問資源的線程數(shù)量。
可以指定是否為公平鎖。
公平鎖:獲取鎖時(shí),如果獲取的順序符合請(qǐng)求的絕對(duì)時(shí)間順序,則為公平鎖,F(xiàn)IFO。
// 允許 n 個(gè)線程同時(shí)執(zhí)行
Semaphore semaphore = new Semaphore(1);
// 可以指定是否為公平鎖
Semaphore semaphore2 = new Semaphore(2,true);
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("T1 running ...");
Thread.sleep(200);
System.out.println("T1 running ...");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("T2 running ...");
Thread.sleep(200);
System.out.println("T2 running ...");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
T1 running ...
T1 running ...
T2 running ...
T2 running ...
如果我們把??new Semaphore(1); 改為?new Semaphore(2); 執(zhí)行結(jié)果:
T1 running ...
T2 running ...
T1 running ...
T2 running ...
可以看到,為1時(shí),T2只能等 T1執(zhí)行完畢,才開始執(zhí)行。為2時(shí),T1、T2可以同時(shí)執(zhí)行。
主要使用場(chǎng)景為限流,類比高速收費(fèi)站,5個(gè)車道2個(gè)收費(fèi)站。
?五、Exchanger 交換器
?Exchanger:用于線程通信(線程交換數(shù)據(jù))的工具類,提供一個(gè)同步點(diǎn),當(dāng)?shù)谝粋€(gè)線程執(zhí)行了 exchanger()方法,它會(huì)一直等待下一個(gè)線程也執(zhí)行 exchanger()方法,然后進(jìn)行交換數(shù)據(jù)??梢栽O(shè)置最大等待時(shí)間。
import java.util.concurrent.Exchanger;
public class ExchangerTest {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(() ->{
String s = "T1";
try {
// 設(shè)置多大等待時(shí)間
// s = exchanger.exchange(s,2000, TimeUnit.SECONDS);
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+s);
},"t1").start();
new Thread(() ->{
String s = "T2";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" "+s);
},"t2").start();
}
}
t2 T1
t1 T2
使用場(chǎng)景如:游戲中倆個(gè)人交換裝備。?
?六、LockSupport 工具
?LockSupport:定義了一組靜態(tài)方法,提供了最基本的線程阻塞和喚醒功能。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class LockSupportTest {
public static void main(String[] args) {
Thread t = new Thread(() ->{
for(int i =0;i<10;i++){
System.out.println(i);
if(i == 5){
// 阻塞當(dāng)前線程
LockSupport.park();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
// 喚醒指定線程
// LockSupport.unpark(t);
}
}
0
1
2
3
4
5
?由于 LockSupport.park(); 當(dāng)循環(huán)到5的時(shí)候,阻塞當(dāng)前線程。
將 LockSupport.unpark(t); 打開后,喚醒線程,完整打印。
0
1
2
3
4
5
6
7
8
9
當(dāng)我們查看 part()方法的實(shí)現(xiàn)后發(fā)現(xiàn),這個(gè)方法是 Unsafe 類實(shí)現(xiàn)的。還記得之前 CAS 中說到的這個(gè)類嗎?
????????所有以 AtomXXX開頭的類,底層都是使用cas方法,并是通過 Unsafe類實(shí)現(xiàn)的。
Unsafe類的出現(xiàn)等于c/c++的指針,給 java語言賦予了原來 C/C++實(shí)現(xiàn)的指針方法。
/**
* Disables the current thread for thread scheduling purposes unless the
* permit is available.
*
* <p>If the permit is available then it is consumed and the call
* returns immediately; otherwise the current thread becomes disabled
* for thread scheduling purposes and lies dormant until one of three
* things happens:
*
* <ul>
*
* <li>Some other thread invokes {@link #unpark unpark} with the
* current thread as the target; or
*
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
*
* <li>The call spuriously (that is, for no reason) returns.
* </ul>
*
* <p>This method does <em>not</em> report which of these caused the
* method to return. Callers should re-check the conditions which caused
* the thread to park in the first place. Callers may also determine,
* for example, the interrupt status of the thread upon return.
*/
public static void park() {
UNSAFE.park(false, 0L);
}
翻譯:文章來源:http://www.zghlxwxcb.cn/news/detail-461735.html
出于線程調(diào)度目的禁用當(dāng)前線程,除非允許可用。
如果許可證可用,則將其使用,呼叫立即返回;否則,當(dāng)前線程將出于線程調(diào)度目的而被禁用并處于休眠狀態(tài),直到發(fā)生以下三種情況之一:
其他一些線程以當(dāng)前線程作為目標(biāo)進(jìn)行調(diào)用 unpark ;或者
其他線程 中斷 當(dāng)前線程;或
虛假調(diào)用(即無緣無故)返回。
此方法 不會(huì) 報(bào)告哪些導(dǎo)致該方法返回。調(diào)用方應(yīng)重新檢查導(dǎo)致線程首先停放的條件。例如,調(diào)用方還可以確定線程在返回時(shí)的中斷狀態(tài)文章來源地址http://www.zghlxwxcb.cn/news/detail-461735.html
到了這里,關(guān)于多線程基礎(chǔ)(三)JUC并發(fā)包:Lock鎖、CountDownLath、CyclicBarrier、Semaphore、LockSupport的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!