synchronized
原理概述
synchronized實(shí)現(xiàn)原理的關(guān)鍵字由淺入深依次為
字節(jié)碼層面:monitorenter/monitorexit 指令
java對(duì)象層面: Mark Word 對(duì)象頭
JVM層面: CAS、自旋 、 ObjectMonitor(MESA管層模型:cxq,entryList,wait三個(gè)隊(duì)列)
操作系統(tǒng)層面: mutex 鎖
其中 mark word 對(duì)象頭如下圖:
鎖升級(jí)
說(shuō)到鎖升級(jí),我相信很多人都錯(cuò)誤的認(rèn)為升級(jí)過(guò)程是這樣的:初始狀態(tài)無(wú)鎖,第一個(gè)線程進(jìn)來(lái)升級(jí)成偏向鎖,假如偏向鎖還沒(méi)釋放又再有線程進(jìn)來(lái)就會(huì)cas+自旋去獲取輕量級(jí)鎖,如果自旋超過(guò)一定次數(shù)就膨脹成重量級(jí)鎖 。但其實(shí)這種說(shuō)法是不正確的。
這其中有三個(gè)常見的誤區(qū):
誤區(qū)一:初始狀態(tài)不是無(wú)鎖,而是偏向鎖(匿名偏向鎖)。
誤區(qū)二:無(wú)鎖不會(huì)升級(jí)成偏向鎖,只能升級(jí)成輕量級(jí)鎖或者重量級(jí)鎖。
誤區(qū)三:輕量級(jí)獲鎖沒(méi)有自旋,只要一次CAS失敗就會(huì)膨脹成重量級(jí)鎖。自旋是重量級(jí)鎖為了盡可能的不阻塞線程,在實(shí)際阻塞之前做的一些重試操作。
實(shí)際的鎖升級(jí)操作是:初始為偏向鎖(匿名偏向鎖),A線程進(jìn)來(lái)則偏向該線程(即使線程退出了對(duì)象頭仍然偏向A線程)。后面B線程進(jìn)來(lái)就會(huì)發(fā)現(xiàn)該對(duì)象鎖已經(jīng)偏向線程A了,就會(huì)撤銷該對(duì)象鎖的偏向并升級(jí)成輕量級(jí)鎖,輕量級(jí)鎖釋放的時(shí)候又變成無(wú)鎖狀態(tài)。后續(xù)再有線程C進(jìn)來(lái),就由無(wú)鎖直接變成輕量級(jí)鎖,如果在C線程輕量級(jí)鎖釋放鎖之前再有線程D進(jìn)來(lái)就膨脹成重量級(jí)鎖,直到最后都沒(méi)有線程占用鎖就恢復(fù)成無(wú)鎖狀態(tài)。
在理解上面單個(gè)對(duì)象鎖升級(jí)過(guò)程后再來(lái)理解 JVM 對(duì)偏向鎖做的一些優(yōu)化(因?yàn)槠蜴i撤銷是有一定性能開銷的,需要等到另一個(gè)線程到達(dá)安全點(diǎn)才能撤銷):在多個(gè)對(duì)象鎖的情況下,如果所有對(duì)象鎖撤銷偏向總數(shù)達(dá)到批量重偏向閾值(默認(rèn)20)就會(huì)觸發(fā)批量重偏向(將該類epoch +1,且當(dāng)前正在生效的偏向鎖epoch也同步+1,表示偏向鎖進(jìn)入下一代。之前舊的 epoch 則說(shuō)明已過(guò)期,過(guò)期epoch的鎖對(duì)象下次獲鎖時(shí)可以重偏向)。當(dāng)撤銷偏向總數(shù)達(dá)到批量撤銷閾值(默認(rèn)40)就會(huì)觸發(fā)批量偏向撤銷(將該類是否偏向鎖標(biāo)記為0,標(biāo)記該 class 為不可偏向,并且撤銷當(dāng)前正在持有的偏向鎖,后續(xù)new的對(duì)象鎖也不再是偏向鎖,而是無(wú)鎖狀態(tài),表示對(duì)于該 class 直接執(zhí)行輕/重量級(jí)鎖的邏輯。)
口說(shuō)無(wú)憑,下面就結(jié)合案例+源碼來(lái)看看上面說(shuō)的對(duì)不對(duì)。
初始狀態(tài)
前面說(shuō)到初始狀態(tài)是匿名偏向鎖,而不是無(wú)鎖,這里來(lái)驗(yàn)證一下:
可以看到鎖標(biāo)記是101,說(shuō)明這是一個(gè)偏向鎖,再觀察仔細(xì)一點(diǎn)會(huì)發(fā)現(xiàn)這個(gè)偏向鎖沒(méi)有偏向任何一個(gè)線程。相信看到這里大家也明白了匿名偏向鎖就是不偏向任何線程的偏向鎖。
是否開啟偏向鎖是可以配置(jdk6之后默認(rèn)開啟):
XX:+UseBiasedLocking:開啟偏向鎖功能
XX:-UseBiasedLocking:關(guān)閉偏向鎖功能
在一些老的jdk版本中(具體多老,我也沒(méi)去研究,至少jdk8是),偏向鎖存在4s延遲偏向——在JVM啟動(dòng)4s后創(chuàng)建出來(lái)的對(duì)象才會(huì)開啟偏向,不過(guò)這個(gè)延遲也是可以通過(guò)JVM參數(shù)設(shè)置的:
-XX:BiasedLockingStartupDelay=0 將延遲改為0
偏向鎖
偏向鎖獲取/重入
至此可以確認(rèn),鎖初始狀態(tài)是匿名偏向鎖。
那么當(dāng)?shù)谝粋€(gè)線程進(jìn)來(lái)發(fā)生了啥,我們直接看到JVM處理 monitorenter 指令的源碼
bytecodeInterpreter.cpp#BytecodeInterpreter::run#case(_monitorenter)
CASE(_monitorenter): {
//得到棧頂元素,其實(shí)就是鎖記錄的對(duì)象
oop lockee = STACK_OBJECT(-1);
CHECK_NULL(lockee);
//找到一個(gè)該對(duì)象可用的鎖記錄
BasicObjectLock* limit = istate->monitor_base();
BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();
BasicObjectLock* entry = NULL;
while (most_recent != limit ) {
if (most_recent->obj() == NULL) entry = most_recent;
else if (most_recent->obj() == lockee) break;
most_recent++;
}
//一般都可以找到
if (entry != NULL) {
//綁定鎖記錄和對(duì)象
entry->set_obj(lockee);
int success = false;
uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;
markOop mark = lockee->mark();
intptr_t hash = (intptr_t) markOopDesc::no_hash;
// 判斷是否為偏向模式,即 Mark Word 最后三位是否為 101
if (mark->has_bias_pattern()) {
uintptr_t thread_ident;
uintptr_t anticipated_bias_locking_value;
thread_ident = (uintptr_t)istate->thread();
anticipated_bias_locking_value =
(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &
~((uintptr_t) markOopDesc::age_mask_in_place);
// 分支一:如果為0說(shuō)明偏向當(dāng)前線程,且 class 的 epoch 等于 Mark Word 的 epoch,則偏向鎖重入
if (anticipated_bias_locking_value == 0) {
if (PrintBiasedLockingStatistics) {
(* BiasedLocking::biased_lock_entry_count_addr())++;
}
success = true;
}
// 分支二:如果class的最后三位不為101,說(shuō)明class關(guān)閉了偏向模式(批量撤銷導(dǎo)致),則sucess為false,后續(xù)會(huì)嘗試撤銷偏向鎖
else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {
markOop header = lockee->klass()->prototype_header();
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(*BiasedLocking::revoked_lock_entry_count_addr())++;
}
}
// 分支三:如果epoch不相等,說(shuō)明偏向鎖已過(guò)期(批量重偏向?qū)е拢?,則嘗試重偏向
else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {
markOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);
if (hash != markOopDesc::no_hash) {
new_header = new_header->copy_set_hash(hash);
}
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::rebiased_lock_entry_count_addr())++;
}
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
// 分支四:到這個(gè)分支要么是匿名偏向鎖,要么是正在偏向別的線程
else {
markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |
(uintptr_t)markOopDesc::age_mask_in_place |
epoch_mask_in_place));
if (hash != markOopDesc::no_hash) {
header = header->copy_set_hash(hash);
}
markOop new_header = (markOop) ((uintptr_t) header | thread_ident);
DEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)
//如果是匿名偏向就直接偏向當(dāng)前線程
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
//反之就調(diào)用InterpreterRuntime::monitorenter撤銷當(dāng)前偏向鎖并升級(jí)成輕量級(jí)鎖
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
success = true;
}
}
//實(shí)際上只有分支2會(huì)進(jìn)入到里面的代碼,因?yàn)槠渌种ucess都為true
//這段代碼的邏輯其實(shí)主要也是撤銷偏向鎖并升級(jí)成輕量級(jí)鎖,那為什么不和分支4的撤銷偏向鎖寫在一塊?
//我認(rèn)為是因?yàn)榉种?肯定是不同線程不需要考慮輕量級(jí)鎖重入,而這個(gè)需要
if (!success) {
markOop displaced = lockee->mark()->set_unlocked();
entry->lock()->set_displaced_header(displaced);
bool call_vm = UseHeavyMonitors;
if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {
// 如果是輕量級(jí)鎖重入,將 Displaced Mark Word 設(shè)置為 NULL,標(biāo)記這是一次重入,后續(xù)會(huì)對(duì)標(biāo)記做輕量級(jí)鎖的重入邏輯
if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {
entry->lock()->set_displaced_header(NULL);
}
// 反之調(diào)用InterpreterRuntime::monitorenter撤銷偏向鎖并升級(jí)成輕量級(jí)鎖
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
} else {
istate->set_msg(more_monitors);
UPDATE_PC_AND_RETURN(0);
}
}
根據(jù)注釋不難看出,匿名偏向鎖到偏向鎖的過(guò)程就在分支四。
// 分支四:到這個(gè)分支要么是匿名偏向鎖,要么是正在偏向別的線程
else {
//...
//如果是匿名偏向就直接偏向當(dāng)前線程
if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {
if (PrintBiasedLockingStatistics)
(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;
}
//反之就調(diào)用InterpreterRuntime::monitorenter撤銷當(dāng)前偏向鎖并升級(jí)
else {
CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);
}
//...
}
}
其實(shí)就是CAS去替換mark work的thread id,只有原本是匿名偏向的情況下才會(huì)替換成功,如果替換失敗就說(shuō)明已偏向其他線程,就調(diào)用 InterpreterRuntime::monitorenter 撤銷當(dāng)前偏向鎖并升級(jí)
偏向鎖的撤銷/重偏向和升級(jí)
interpreterRuntime.cpp#InterpreterRuntime::monitorenter
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))
//...
//如果開啟了偏向鎖模式,就進(jìn)入fast_enter
if (UseBiasedLocking) {
ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);
}
//反之直接進(jìn)入slow_enter
else {
ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);
}
//...
IRT_END
什么是fast_enter ?什么是slow_enter?
slow_enter就是普通鎖的加鎖,這個(gè)后面再看。先看fast_enter。
這里的普通鎖場(chǎng)景是我自己的術(shù)語(yǔ),特指沒(méi)有偏向鎖情況下的鎖場(chǎng)景
其實(shí)fast_enter最后也調(diào)用了slow_enter,只不過(guò)就是在調(diào)用之前多加了一層偏向鎖的撤銷/重偏向操作(同時(shí)會(huì)統(tǒng)計(jì)撤銷次數(shù),當(dāng)達(dá)到閾值時(shí)觸發(fā)批量重偏向或者批量撤銷的邏輯)。只有成功重偏向了才不進(jìn)入slow_enter,否則都說(shuō)明偏向鎖被撤銷了,鎖狀態(tài)要么是無(wú)鎖要么是輕量級(jí)鎖(根據(jù)偏向線程是否存活來(lái)決定),都需進(jìn)入slow_enter進(jìn)行普通鎖的獲?。ó吘规i的獲取還得繼續(xù)下去)。
synchronizer.cpp#ObjectSynchronizer::fast_enter
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
//再次判斷是否開啟了偏向鎖模式
if (UseBiasedLocking) {
if (!SafepointSynchronize::is_at_safepoint()) {
//撤銷/重偏向
BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
return;
}
} else {
assert(!attempt_rebias, "can not rebias toward VM thread");
BiasedLocking::revoke_at_safepoint(obj);
}
assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
}
slow_enter (obj, lock, THREAD) ;
}
biasedLocking.cpp#BiasedLocking::revoke_and_rebias
BiasedLocking::Condition BiasedLocking::revoke_and_rebias(Handle obj, bool attempt_rebias, TRAPS) {
assert(!SafepointSynchronize::is_at_safepoint(), "must not be called while at safepoint");
markOop mark = obj->mark();
//如果是匿名偏向且attempt_rebias為false,就會(huì)進(jìn)入到這個(gè)分支,撤銷偏向鎖,返回 BIAS_REVOKED
//例如:調(diào)用了hashcode
if (mark->is_biased_anonymously() && !attempt_rebias) {
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
// 如果開啟了偏向模式會(huì)進(jìn)入這個(gè)分支
else if (mark->has_bias_pattern()) {
Klass* k = obj->klass();
markOop prototype_header = k->prototype_header();
//如果class關(guān)閉了偏向模式會(huì)進(jìn)入這個(gè)分支,撤銷偏向鎖,返回 BIAS_REVOKED
if (!prototype_header->has_bias_pattern()) {
markOop biased_value = mark;
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(prototype_header, obj->mark_addr(), mark);
assert(!(*(obj->mark_addr()))->has_bias_pattern(), "even if we raced, should still be revoked");
return BIAS_REVOKED;
}
// 如果epoch已過(guò)期就會(huì)進(jìn)入這個(gè)分支
else if (prototype_header->bias_epoch() != mark->bias_epoch()) {
// 如果參數(shù)允許重偏向,就進(jìn)行重偏向,返回 BIAS_REVOKED_AND_REBIASED
if (attempt_rebias) {
assert(THREAD->is_Java_thread(), "");
markOop biased_value = mark;
markOop rebiased_prototype = markOopDesc::encode((JavaThread*) THREAD, mark->age(), prototype_header->bias_epoch());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(rebiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED_AND_REBIASED;
}
}
// 如果參數(shù)不允許重偏向,就還是撤銷偏向鎖,返回 BIAS_REVOKED
else {
markOop biased_value = mark;
markOop unbiased_prototype = markOopDesc::prototype()->set_age(mark->age());
markOop res_mark = (markOop) Atomic::cmpxchg_ptr(unbiased_prototype, obj->mark_addr(), mark);
if (res_mark == biased_value) {
return BIAS_REVOKED;
}
}
}
}
//如果上述的cas失敗了,就會(huì)更新class的撤銷計(jì)數(shù)器并返回對(duì)應(yīng)標(biāo)識(shí),根據(jù)標(biāo)識(shí)判斷是否需要批量重偏向或批量撤銷
HeuristicsResult heuristics = update_heuristics(obj(), attempt_rebias);
if (heuristics == HR_NOT_BIASED) {
return NOT_BIASED;
}
// 分支一:撤銷單個(gè)偏向鎖的標(biāo)識(shí)
else if (heuristics == HR_SINGLE_REVOKE) {
Klass *k = obj->klass();
markOop prototype_header = k->prototype_header();
//如果要撤銷的偏向鎖就是當(dāng)前線程,直接調(diào)用 revoke_bias 方法撤銷偏向鎖,不需要等到 SafePoint
if (mark->biased_locker() == THREAD && prototype_header->bias_epoch() == mark->bias_epoch()) {
ResourceMark rm;
if (TraceBiasedLocking) {
tty->print_cr("Revoking bias by walking my own stack:");
}
EventBiasedLockSelfRevocation event;
BiasedLocking::Condition cond = revoke_bias(obj(), false, false, (JavaThread*) THREAD, NULL);
((JavaThread*) THREAD)->set_cached_monitor_info(NULL);
assert(cond == BIAS_REVOKED, "why not?");
if (event.should_commit()) {
event.set_lockClass(k);
event.commit();
}
return cond;
}
//反之,將撤銷封裝為任務(wù),提交給 VM 線程執(zhí)行,VM 線程達(dá)到 SafePoint 后會(huì)調(diào)用 revoke_bias 方法
//到達(dá)安全點(diǎn)會(huì)檢測(cè)偏向線程是否存活,如果存活就直接升級(jí)成輕量級(jí)鎖,如果不存活就先撤銷成無(wú)鎖,再由競(jìng)爭(zhēng)線程去升級(jí)成輕量級(jí)鎖
else {
EventBiasedLockRevocation event;
VM_RevokeBias revoke(&obj, (JavaThread*) THREAD);
VMThread::execute(&revoke);
if (event.should_commit() && (revoke.status_code() != NOT_BIASED)) {
event.set_lockClass(k);
event.set_safepointId(SafepointSynchronize::safepoint_counter() - 1);
event.set_previousOwner(revoke.biased_locker());
event.commit();
}
return revoke.status_code();
}
}
// 分支二:批量重偏向與批量撤銷的標(biāo)識(shí)
assert((heuristics == HR_BULK_REVOKE) ||
(heuristics == HR_BULK_REBIAS), "?");
EventBiasedLockClassRevocation event;
VM_BulkRevokeBias bulk_revoke(&obj, (JavaThread*) THREAD,
(heuristics == HR_BULK_REBIAS),
attempt_rebias);
VMThread::execute(&bulk_revoke);
if (event.should_commit()) {
event.set_revokedClass(obj->klass());
event.set_disableBiasing((heuristics != HR_BULK_REBIAS));
event.set_safepointId(SafepointSynchronize::safepoint_counter() - 1);
event.commit();
}
return bulk_revoke.status_code();
}
static HeuristicsResult update_heuristics(oop o, bool allow_rebias) {
//...
//返回批量撤銷標(biāo)識(shí)
if (revocation_count == BiasedLockingBulkRevokeThreshold) {
return HR_BULK_REVOKE;
}
//返回批量重偏向標(biāo)識(shí)
if (revocation_count == BiasedLockingBulkRebiasThreshold) {
return HR_BULK_REBIAS;
}
//返回普通的單個(gè)撤銷標(biāo)識(shí)
return HR_SINGLE_REVOKE;
}
案例驗(yàn)證:
案例只演示了撤銷和升級(jí)。重偏向的場(chǎng)景比較難實(shí)現(xiàn)…
批量重偏向和批量偏向撤銷
試想這么一個(gè)場(chǎng)景,假如現(xiàn)在有100個(gè)對(duì)象鎖已經(jīng)全都偏向線程A,并且A線程已經(jīng)退出了。后續(xù)B線程進(jìn)來(lái)獲取這100個(gè)鎖的時(shí)發(fā)現(xiàn)全都偏向到了A,假如沒(méi)有批量重偏向和批量撤銷的話,就會(huì)老老實(shí)實(shí)撤銷偏向100次。而偏向撤銷是存在一定性能開銷的(需要等到安全點(diǎn)才能撤銷),這種大量撤銷的情況下偏向鎖的性能甚至還不如輕量級(jí)鎖。所以JVM針對(duì)這種場(chǎng)景做了優(yōu)化。
如果一定時(shí)間內(nèi)(默認(rèn)25s)撤銷次數(shù)達(dá)到20,JVM就會(huì)認(rèn)為自己偏向錯(cuò)了,將該類epoch +1,且當(dāng)前正在生效的偏向鎖epoch也同步+1,表示偏向鎖進(jìn)入下一代。之前舊的 epoch 則說(shuō)明已過(guò)期,過(guò)期epoch的鎖對(duì)象下次獲鎖時(shí)可以重偏向。 撤銷次數(shù)達(dá)到40,JVM就會(huì)認(rèn)為此時(shí)偏向鎖不再適用,將該類是否偏向鎖標(biāo)記為0,標(biāo)記該 class 為不可偏向,并且撤銷當(dāng)前正在持有的偏向鎖,后續(xù)new的對(duì)象鎖也不再是偏向鎖,而是無(wú)鎖狀態(tài),表示對(duì)于該 class 直接執(zhí)行輕/重量級(jí)鎖的邏輯。
biasedLocking.cpp#BiasedLocking::Condition::bulk_revoke_or_rebias_at_safepoint
class VM_BulkRevokeBias : public VM_RevokeBias {
//...
virtual void doit() {
// 等待線程達(dá)到 SafePoint 后會(huì)調(diào)用 bulk_revoke_or_rebias_at_safepoint 方法
// bulk_rebias 為 true 代表執(zhí)行批量重偏向邏輯,為 false 表示執(zhí)行批量撤銷邏輯
// attempt_rebias_of_object 代表是否允許重偏向,這里固定為 true
_status_code = bulk_revoke_or_rebias_at_safepoint((*_obj)(), _bulk_rebias, _attempt_rebias_of_object, _requesting_thread);
clean_up_cached_monitor_info();
}
};
static BiasedLocking::Condition bulk_revoke_or_rebias_at_safepoint(oop o,
bool bulk_rebias,
bool attempt_rebias_of_object,
JavaThread* requesting_thread) {
//...
//批量重偏向
if (bulk_rebias) {
if (klass->prototype_header()->has_bias_pattern()) {
// 更新當(dāng)前 class 的 epoch
int prev_epoch = klass->prototype_header()->bias_epoch();
klass->set_prototype_header(klass->prototype_header()->incr_bias_epoch());
int cur_epoch = klass->prototype_header()->bias_epoch();
// 遍歷所有線程的棧,找出當(dāng)前 class 對(duì)應(yīng)的正處于鎖定狀態(tài)的對(duì)象,更新 epoch 值
for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
oop owner = mon_info->owner();
markOop mark = owner->mark();
//更新該類正在使用的偏向鎖對(duì)象的 epoch 與 類的epoch 保持一致
if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
assert(mark->bias_epoch() == prev_epoch || mark->bias_epoch() == cur_epoch, "error in bias epoch adjustment");
owner->set_mark(mark->set_bias_epoch(cur_epoch));
}
}
}
}
// 對(duì)當(dāng)前鎖對(duì)象進(jìn)行重偏向,第二個(gè)參數(shù)為 allow_rebias,表示是否允許重偏向,此時(shí)一般是 true
revoke_bias(o, attempt_rebias_of_object && klass->prototype_header()->has_bias_pattern(), true, requesting_thread);
}
//批量撤銷
else {
if (TraceBiasedLocking) {
ResourceMark rm;
tty->print_cr("* Disabling biased locking for type %s", klass->external_name());
}
// 關(guān)閉當(dāng)前 class 的偏向鎖
klass->set_prototype_header(markOopDesc::prototype());
// 遍歷所有線程的棧,找出當(dāng)前 class 對(duì)應(yīng)的正處于鎖定狀態(tài)的對(duì)象,撤銷偏向鎖
for (JavaThread* thr = Threads::first(); thr != NULL; thr = thr->next()) {
GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(thr);
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
oop owner = mon_info->owner();
markOop mark = owner->mark();
if ((owner->klass() == k_o) && mark->has_bias_pattern()) {
revoke_bias(owner, false, true, requesting_thread);
}
}
}
// 對(duì)當(dāng)前鎖對(duì)象進(jìn)行撤銷,第二個(gè)參數(shù)為 allow_rebias,表示是否允許重偏向,此處固定傳 false
revoke_bias(o, false, true, requesting_thread);
}
if (TraceBiasedLocking) {
tty->print_cr("* Ending bulk revocation");
}
//如果滿足偏向條件,則重偏向于當(dāng)前線程
BiasedLocking::Condition status_code = BiasedLocking::BIAS_REVOKED;
if (attempt_rebias_of_object &&
o->mark()->has_bias_pattern() &&
klass->prototype_header()->has_bias_pattern()) {
markOop new_mark = markOopDesc::encode(requesting_thread, o->mark()->age(),
klass->prototype_header()->bias_epoch());
o->set_mark(new_mark);
status_code = BiasedLocking::BIAS_REVOKED_AND_REBIASED;
if (TraceBiasedLocking) {
tty->print_cr(" Rebiased object toward thread " INTPTR_FORMAT, (intptr_t) requesting_thread);
}
}
assert(!o->mark()->has_bias_pattern() ||
(attempt_rebias_of_object && (o->mark()->biased_locker() == requesting_thread)),
"bug in bulk bias revocation");
return status_code;
}
來(lái)案例驗(yàn)證一下是否真的會(huì)批量重偏向和批量撤銷。
批量重偏向:
批量撤銷:
不貼結(jié)果了,太長(zhǎng)了,結(jié)果注釋在代碼上了,有興趣可以自己運(yùn)行一下。
public static void main(String[] args) throws InterruptedException {
List<Object> list = new ArrayList<>();
//100個(gè)鎖對(duì)象偏向線程A(101個(gè)是因?yàn)椴幌胗孟聵?biāo)0)
new Thread(() -> {
for (int i = 1; i <= 101; i++) {
Object o = new Object();
synchronized (o) {
list.add(o);
}
}
//保活線程A,防止JVM底層復(fù)用線程
while (true) { }
}).start();
Thread.sleep(3000);
//原本偏向線程A
System.out.println("原本偏向線程" + ClassLayout.parseInstance(list.get(1)).toPrintable());
//另一個(gè)線程獲鎖30次
new Thread(() -> {
for (int i = 1; i <= 30; i++) {
Object o = list.get(i);
synchronized (o) {
if (i == 18 || i == 19 || i == 20 || i == 21) {
//18-輕量級(jí)鎖 19-偏向此線程 20-偏向此線程 21-偏向此線程
// 不是默認(rèn)20嗎,為什么第19個(gè)就重偏向了? 我不知道,估計(jì)也是性能的優(yōu)化吧...
System.out.println("第" + i + "個(gè)" + ClassLayout.parseInstance(o).toPrintable());
}
}
}
}).start();
Thread.sleep(3000);
//第31個(gè)沒(méi)有被再次獲鎖,也就是說(shuō)雖然epoch已經(jīng)過(guò)期了,但是沒(méi)有被重偏向,所以也就還是之前的偏向(過(guò)期偏向)
System.out.println("第31個(gè)" + ClassLayout.parseInstance(list.get(31)).toPrintable());
//new object 的鎖對(duì)象也還是匿名偏向
System.out.println("new Object" + ClassLayout.parseInstance(new Object()).toPrintable());
}
public static void main(String[] args) throws InterruptedException {
List<Object> list = new ArrayList<>();
//101個(gè)鎖對(duì)象偏向線程A,并一直持有下標(biāo)0的object
new Thread(() -> {
for (int i = 1; i <= 101; i++) {
Object o = new Object();
synchronized (o) {
list.add(o);
}
}
while (true) { synchronized (list.get(0)) { } }
}).start();
Thread.sleep(3000);
//第0個(gè)偏向A
System.out.println("第0個(gè)" + ClassLayout.parseInstance(list.get(0)).toPrintable());
//B線程獲鎖40次(撤銷18次:撤銷1~18,19~40重偏向到此線程)
new Thread(() -> {
for (int i = 1; i <= 40; i++) { synchronized (list.get(i)) { } }
//?;罹€程,防止JVM底層復(fù)用線程
while (true) { }
}).start();
Thread.sleep(3000);
//第0個(gè)還是偏向A
System.out.println("第0個(gè)" + ClassLayout.parseInstance(list.get(0)).toPrintable());
//C線程獲鎖40次(撤銷22次:1~18是輕量鎖,撤銷19~40)
new Thread(() -> {
for (int i = 1; i <= 40; i++) { synchronized (list.get(i)) { } }
}).start();
Thread.sleep(3000);
//第0個(gè)偏向被撤銷,變成輕量級(jí)鎖
System.out.println("第0個(gè)" + ClassLayout.parseInstance(list.get(0)).toPrintable());
//new object 也不再是匿名偏向鎖而是無(wú)鎖
System.out.println("new Object" + ClassLayout.parseInstance(new Object()).toPrintable());
//第41個(gè)沒(méi)有被動(dòng)過(guò),所以還是過(guò)期偏向
System.out.println("第41個(gè)" + ClassLayout.parseInstance(list.get(41)).toPrintable());
}
偏向鎖的釋放
bytecodeInterpreter.cpp#BytecodeInterpreter::run#case(_monitorexit)
CASE(_monitorexit): {
//...
// 遍歷棧的鎖記錄
while (most_recent != limit ) {
// 判斷鎖記錄關(guān)聯(lián)的 obj 是否為 lockee
if ((most_recent)->obj() == lockee) {
BasicLock* lock = most_recent->lock();
markOop header = lock->displaced_header();
//設(shè)置鎖記錄的obj為null(沒(méi)有修改到mark word的線程id)
most_recent->set_obj(NULL);
//如果不是偏向模式還需要輕/重量級(jí)鎖的釋放
if (!lockee->mark()->has_bias_pattern()) {
bool call_vm = UseHeavyMonitors;
//如果 header != NULL 說(shuō)明不是重入,需要真正解鎖
if (header != NULL || call_vm) {
// CAS替換對(duì)象頭的 Mark Word(輕量級(jí)鎖才去替換,重量級(jí)鎖直接進(jìn)入分支)
if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {
// 將 obj 還原,然后調(diào)用 monitorexit 方法
most_recent->set_obj(lockee);
CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);
}
}
}
UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);
}
// 如果不是關(guān)聯(lián)的 obj,繼續(xù)判斷下一個(gè)鎖記錄
most_recent++;
}
//...
}
可以發(fā)現(xiàn)偏向鎖釋放并沒(méi)有清空mark word偏向的線程id。
至此,偏向鎖就完了。接下來(lái)就是普通鎖場(chǎng)景了。
再次重申,這里的普通鎖場(chǎng)景是我自己的術(shù)語(yǔ),特指沒(méi)有偏向鎖情況下的鎖場(chǎng)景。
輕量級(jí)鎖
輕量級(jí)鎖獲取/重入
銜接前面偏向鎖的內(nèi)容可以知道,輕量級(jí)鎖的獲取可以從 slow_enter 看起。
synchronizer.cpp#ObjectSynchronizer::slow_enter
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
markOop mark = obj->mark();
assert(!mark->has_bias_pattern(), "should not see bias pattern here");
//mark->is_neutral()為true表示是無(wú)鎖,則cas無(wú)鎖->輕量級(jí)鎖(將對(duì)象頭替換為指向當(dāng)前線程棧中的鎖記錄)
if (mark->is_neutral()) {
lock->set_displaced_header(mark);
//沒(méi)有自旋!沒(méi)有自旋!沒(méi)有自旋!
//一次cas失敗就直接進(jìn)入到最下面的膨脹重量級(jí)鎖的語(yǔ)句了。
if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {
TEVENT (slow_enter: release stacklock) ;
return ;
}
}
//否則判斷是否輕量級(jí)鎖重入
else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {
assert(lock != mark->locker(), "must not re-lock the same lock");
assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");
lock->set_displaced_header(NULL);
return;
}
//到這一步說(shuō)明要膨脹成重量級(jí)鎖了
lock->set_displaced_header(markOopDesc::unused_mark());
ObjectSynchronizer::inflate(THREAD,
obj(),
inflate_cause_monitor_enter)->enter(THREAD);
}
相對(duì)于fast_enter的邏輯簡(jiǎn)單多了,但是看到?jīng)]有,輕量級(jí)獲鎖沒(méi)有自旋!輕量級(jí)獲鎖沒(méi)有自旋!輕量級(jí)獲鎖沒(méi)有自旋!一次cas失敗就直接進(jìn)入到最下面的膨脹重量級(jí)鎖的語(yǔ)句了。
輕量級(jí)鎖膨脹
synchronizer.cpp#ObjectSynchronizer::inflate
這個(gè)方法其實(shí)就是為了得到一個(gè)ObjectMonitor對(duì)象對(duì)應(yīng)一個(gè)重量級(jí)鎖。通過(guò)調(diào)用 ObjectMonitor.enter/exit 實(shí)現(xiàn)重量級(jí)鎖的獲取/釋放。
ObjectMonitor * ATTR ObjectSynchronizer::inflate (Thread * Self,
oop object,
const InflateCause cause) {
//...
//自旋直至成功膨脹為重量級(jí)鎖(這個(gè)是膨脹的自旋并不是獲鎖的自旋)
for (;;) {
const markOop mark = object->mark() ;
assert (!mark->has_bias_pattern(), "invariant") ;
//如果已經(jīng)有一個(gè) objectMonitor 直接返回即可
if (mark->has_monitor()) {
ObjectMonitor * inf = mark->monitor() ;
assert (inf->header()->is_neutral(), "invariant");
assert (inf->object() == object, "invariant") ;
assert (ObjectSynchronizer::verify_objmon_isinpool(inf), "monitor is invalid");
return inf ;
}
//如果正在膨脹,讓出cpu16次來(lái)實(shí)現(xiàn)等待的效果,16次之后還沒(méi)膨脹完就park阻塞
if (mark == markOopDesc::INFLATING()) {
TEVENT (Inflate: spin while INFLATING) ;
ReadStableMark(object) ;
continue ;
}
//mark->has_locker()為true 說(shuō)明是輕量級(jí)鎖狀態(tài),則輕量級(jí)鎖->重量級(jí)鎖
if (mark->has_locker()) {
//構(gòu)建一個(gè) ObjectMonitor 對(duì)象并初始化
ObjectMonitor * m = omAlloc (Self) ;
//...
//cas替換對(duì)象的mark為INFLATING
// 為什么使用一個(gè)INFLATING而不是直接設(shè)置monitor呢?
// 這是防止輕量級(jí)鎖膨脹的同時(shí)又解鎖,這時(shí)設(shè)置一個(gè)INFLATING
// 可以讓它c(diǎn)as失敗,進(jìn)入重量級(jí)鎖的釋放流程,而不是直接還原對(duì)象頭,造成hashcode值莫名其妙的改變
markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;
//...
//設(shè)置 ObjectMonitor 的header,owner,object
markOop dmw = mark->displaced_mark_helper() ;
assert (dmw->is_neutral(), "invariant") ;
m->set_header(dmw) ;
m->set_owner(mark->locker());
m->set_object(object);
// 替換對(duì)象的mark為monitor的地址(設(shè)置為重量級(jí)鎖狀態(tài))
guarantee (object->mark() == markOopDesc::INFLATING(), "invariant") ;
object->release_set_mark(markOopDesc::encode(m));
//...
return m ;
}
//mark->has_locker()為fasle 說(shuō)明是無(wú)鎖狀態(tài),則無(wú)鎖->重量級(jí)鎖
//構(gòu)建一個(gè) ObjectMonitor 對(duì)象并初始化和設(shè)置header,owner,object
assert (mark->is_neutral(), "invariant");
ObjectMonitor * m = omAlloc (Self) ;
//...
// cas替換對(duì)象的mark為monitor地址(設(shè)置為重量級(jí)鎖狀態(tài))
if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {
//...
}
//...
return m ;
}
}
總結(jié):
1. 如果已經(jīng)有ObjectMonitor直接返回
2. 如果正在膨脹則讓出CPU16次實(shí)現(xiàn)等待膨脹完成的效果,16次之后阻塞
3. 如果上面兩種情況都不是,則根據(jù)當(dāng)前鎖狀態(tài)走輕量級(jí)鎖->重量級(jí)鎖還是無(wú)鎖->重量級(jí)鎖來(lái)創(chuàng)建ObjectMonitor
案例:
輕量級(jí)鎖釋放
鎖的釋放入口肯定是 bytecodeInterpreter.cpp#BytecodeInterpreter::run#case(_monitorexit) 。上面偏向鎖釋放已分析過(guò)該方法,得知輕量級(jí)鎖釋放會(huì)來(lái)到 InterpreterRuntime::monitorexit (其實(shí)真正做事情的是 ObjectSynchronizer::fast_exit)。
interpreterRuntime.cpp#InterpreterRuntime::monitorexit
IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorexit(JavaThread* thread, BasicObjectLock* elem))
//...
// 調(diào)用 ObjectSynchronizer::slow_exit 方法進(jìn)行解鎖
ObjectSynchronizer::slow_exit(h_obj(), elem->lock(), thread);
//...
synchronizer.cpp#ObjectSynchronizer::slow_exit
void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
//實(shí)際上調(diào)用fast_exit
fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
//...
// 如果 Displaced Mark Word 為空,說(shuō)明可能是鎖重入或鎖膨脹中,直接return
if (dhw == NULL) {
//...
return;
}
mark = object->mark() ;
// 如果 Mark Word 指向當(dāng)前線程鎖指針,通過(guò) CAS 操作恢復(fù) Mark Word,即解鎖操作
if (mark == (markOop) lock) {
assert (dhw->is_neutral(), "invariant") ;
if ((markOop) Atomic::cmpxchg_ptr (dhw, object->mark_addr(), mark) == mark) {
TEVENT (fast_exit: release stacklock) ;
return;
}
}
// 到這一步說(shuō)明已經(jīng)是重量級(jí)鎖,要進(jìn)行重量級(jí)鎖解鎖
ObjectSynchronizer::inflate(THREAD,
object,
inflate_cause_vm_internal)->exit(true, THREAD);
}
輕量級(jí)鎖釋放最重要的一步就是恢復(fù)對(duì)象頭的 mark word ,即恢復(fù)到無(wú)鎖狀態(tài)。
案例:
重量級(jí)鎖
在看鎖膨脹的時(shí)候有提到,膨脹后會(huì)得到一個(gè)ObjectMonitor對(duì)象,通過(guò)ObjectMonitor.enter/exit 方法來(lái)實(shí)現(xiàn)重量級(jí)鎖的獲取/釋放。
重量級(jí)鎖獲取/重入
objectMonitor.cpp#ObjectMonitor::enter
void ATTR ObjectMonitor::enter(TRAPS) {
//...
//CAS重量級(jí)鎖owner指向當(dāng)前線程
cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;
if (cur == NULL) {
//...
return ;
}
//是否重入
if (cur == Self) {
_recursions ++ ;
return ;
}
//是否由輕量級(jí)鎖膨脹過(guò)來(lái)的,是的話 _recursions 置為1
if (Self->is_lock_owned ((address)cur)) {
//...
return ;
}
//TrySpin 自適應(yīng)自旋獲取
if (Knob_SpinEarly && TrySpin (Self) > 0) {
//...
return ;
}
//...
for (;;) {
//獲鎖失敗 EnterI 阻塞線程(方法內(nèi)實(shí)際阻塞前還是會(huì)多次嘗試(自旋)獲鎖)
EnterI (THREAD) ;
//...
}
//...
}
objectMonitor.cpp#ObjectMonitor::EnterI
void ATTR ObjectMonitor::EnterI (TRAPS) {
//...
//TryLock 嘗試獲鎖一次
if (TryLock (Self) > 0) {
//...
return;
}
//...
//TrySpin 自適應(yīng)自旋獲鎖
if (TrySpin (Self) > 0) {
//...
return;
}
//...
//封裝成ObjectWaiter入隊(duì)cxq 入隊(duì)失敗會(huì)再次嘗試獲鎖
//循環(huán):{
// cas入隊(duì)cxq
// TryLock 嘗試獲鎖
// }
ObjectWaiter node(Self) ;
//...
for (;;) {
node._next = nxt = _cxq ;
if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;
if (TryLock (Self) > 0) {
//...
return;
}
}
//...
//阻塞線程。阻塞前,喚醒后都會(huì)嘗試獲鎖
//循環(huán):{
// TryLock 嘗試獲鎖
// park阻塞線程(使用操作系統(tǒng)自帶的mutex阻塞)
// ...線程被喚醒
// TryLock 嘗試獲鎖
// TrySpin 自適應(yīng)自旋獲鎖
// 內(nèi)存屏障
// }
for (;;) {
//TryLock 嘗試獲鎖
if (TryLock (Self) > 0) break ;
assert (_owner != Self, "invariant") ;
//...
// 還是獲鎖失敗,park 阻塞線程
if (_Responsible == Self || (SyncFlags & 1)) {
TEVENT (Inflated enter - park TIMED) ;
Self->_ParkEvent->park ((jlong) RecheckInterval) ;
RecheckInterval *= 8 ;
if (RecheckInterval > 1000) RecheckInterval = 1000 ;
} else {
TEVENT (Inflated enter - park UNTIMED) ;
Self->_ParkEvent->park() ;
}
//...線程被喚醒,TryLock 嘗試獲鎖一次
if (TryLock(Self) > 0) break ;
//TrySpin 自適應(yīng)自旋獲鎖
if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;
//...
//內(nèi)存屏障
OrderAccess::fence() ;
}
//...
// 跳出循環(huán)說(shuō)明成功獲鎖,將當(dāng)前線程的節(jié)點(diǎn)從 cxq 或 EntryList
UnlinkAfterAcquire (Self, &node) ;
//...
return ;
}
總結(jié):
1、cas重量級(jí)鎖指向當(dāng)前線程,是否重入,是否由輕量級(jí)膨脹
2、TrySpin 自適應(yīng)自旋獲鎖(獲鎖其實(shí)就是將重量級(jí)鎖指向當(dāng)前線程)
3、EnterI {
3.1、TryLock 獲鎖
3.2、TrySpin 自適應(yīng)自旋獲鎖
3.3、封裝成ObjectWait節(jié)點(diǎn)并入cxq隊(duì)列
for(;;) {
CAS入隊(duì)cxq
TryLock 獲鎖
}
3.4、調(diào)用pthread_mutex_lock阻塞線程
for(;;) {
TryLock 獲鎖
park
...喚醒后
TryLock 獲鎖
TrySpin 自適應(yīng)自旋獲鎖
內(nèi)存屏障
}
3.5、UnlinkAfterAcquire 將當(dāng)前線程的節(jié)點(diǎn)從 cxq 或 EntryList
}
重量級(jí)鎖釋放
objectMonitor.cpp#ObjectMonitor::exit
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
Thread * Self = THREAD ;
//如果鎖不指向當(dāng)前線程
if (THREAD != _owner) {
//如果當(dāng)前線程是之前持有輕量級(jí)鎖的線程,此時(shí),owner 是指向 Lock Record 的指針
if (THREAD->is_lock_owned((address) _owner)) {
assert (_recursions == 0, "invariant") ;
_owner = THREAD ;
_recursions = 0 ;
OwnerIsThread = 1 ;
}
//其他線程占用鎖,直接返回
else {
//...
return;
}
}
//判斷是否重入
if (_recursions != 0) {
_recursions--; // this is simple recursive enter
TEVENT (Inflated exit - recursive) ;
return ;
}
//...
for (;;) {
assert (THREAD == _owner, "invariant") ;
// 根據(jù)策略,選擇不同的釋放鎖時(shí)機(jī),默認(rèn)為 0
//優(yōu)先釋放鎖放開自旋線程的策略(非公平鎖)
if (Knob_ExitPolicy == 0) {
//先將 owner 設(shè)置為 NULL。此時(shí)正在CAS的線程就可以很快進(jìn)入同步代碼塊就能獲得鎖
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::storeload() ;
// EntryList 和 cxq 都沒(méi)有等待線程,說(shuō)明沒(méi)有線程需要被喚醒,直接返回
// _succ 不為 NULL,說(shuō)明存在繼承人線程,也不需要喚醒,直接返回
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
TEVENT (Inflated exit - complex egress) ;
//因?yàn)榍懊驷尫沛i了,所以這里需要再次獲鎖(如果獲鎖失敗,則直接返回,由新的owner來(lái)喚醒后續(xù)線程)
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
return ;
}
TEVENT (Exit - Reacquired) ;
}
//優(yōu)先喚醒隊(duì)列中線程的策略
else {
//跟上一個(gè)分支唯一的區(qū)別就是釋放鎖的時(shí)機(jī)不一樣
if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {
OrderAccess::release_store_ptr (&_owner, NULL) ;
OrderAccess::storeload() ;
if (_cxq == NULL || _succ != NULL) {
TEVENT (Inflated exit - simple egress) ;
return ;
}
if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {
TEVENT (Inflated exit - reacquired succeeded) ;
return ;
}
TEVENT (Inflated exit - reacquired failed) ;
} else {
TEVENT (Inflated exit - complex egress) ;
}
}
//...
//根據(jù)QMode選擇不同的喚醒模式,默認(rèn)為0
// QMode == 0: 優(yōu)先喚醒 EntryList頭,如果為空,則將 cxq 中的節(jié)點(diǎn)移動(dòng)到 EntryList 中,再去喚醒 EntryList頭
// QMode == 1: 流程同上,不同的是,移動(dòng)節(jié)點(diǎn)的同時(shí),會(huì)反轉(zhuǎn)cxq鏈表
// QMode == 2: 優(yōu)先喚醒 cxq 的頭部節(jié)點(diǎn),如果為空,則喚醒EntryList頭
// QMode == 3: 優(yōu)先將 cxq 的節(jié)點(diǎn)移動(dòng)到 EntryList 尾部,然后去喚醒 EntryList 頭
// QMode == 4: 優(yōu)先將 cxq 的節(jié)點(diǎn)移動(dòng)到 EntryList 頭部,然后去喚醒 EntryList 頭
if (QMode == 2 && _cxq != NULL) {
w = _cxq ;
assert (w != NULL, "invariant") ;
assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
ExitEpilog (Self, w) ;
return ;
}
if (QMode == 3 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
ObjectWaiter * Tail ;
for (Tail = _EntryList ; Tail != NULL && Tail->_next != NULL ; Tail = Tail->_next) ;
if (Tail == NULL) {
_EntryList = w ;
} else {
Tail->_next = w ;
w->_prev = Tail ;
}
}
if (QMode == 4 && _cxq != NULL) {
w = _cxq ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
assert (w != NULL , "invariant") ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
if (_EntryList != NULL) {
q->_next = _EntryList ;
_EntryList->_prev = q ;
}
_EntryList = w ;
}
w = _EntryList ;
if (w != NULL) {
assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
w = _cxq ;
if (w == NULL) continue ;
for (;;) {
assert (w != NULL, "Invariant") ;
ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
if (u == w) break ;
w = u ;
}
TEVENT (Inflated exit - drain cxq into EntryList) ;
assert (w != NULL , "invariant") ;
assert (_EntryList == NULL , "invariant") ;
if (QMode == 1) {
ObjectWaiter * s = NULL ;
ObjectWaiter * t = w ;
ObjectWaiter * u = NULL ;
while (t != NULL) {
guarantee (t->TState == ObjectWaiter::TS_CXQ, "invariant") ;
t->TState = ObjectWaiter::TS_ENTER ;
u = t->_next ;
t->_prev = u ;
t->_next = s ;
s = t;
t = u ;
}
_EntryList = s ;
assert (s != NULL, "invariant") ;
} else {
// QMode == 0 or QMode == 2
_EntryList = w ;
ObjectWaiter * q = NULL ;
ObjectWaiter * p ;
for (p = w ; p != NULL ; p = p->_next) {
guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
p->TState = ObjectWaiter::TS_ENTER ;
p->_prev = q ;
q = p ;
}
}
if (_succ != NULL) continue;
w = _EntryList ;
if (w != NULL) {
guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
ExitEpilog (Self, w) ;
return ;
}
}
}
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
//...
// 將 owner 設(shè)置為 NULL 釋放鎖
OrderAccess::release_store_ptr (&_owner, NULL) ;
//內(nèi)存屏障
OrderAccess::fence() ;
//...
//unpark喚醒
Trigger->unpark() ;
//...
}
總結(jié):
1、先判斷是否owner指向當(dāng)前線程,是否當(dāng)前線程膨脹的輕量級(jí)鎖,是否重入
2、根據(jù)不同的 Knob_ExitPolicy 釋放鎖時(shí)機(jī)策略,來(lái)決定優(yōu)先放開自旋線程還是優(yōu)先喚醒隊(duì)列線程
3、根據(jù)不同的 QMode 喚醒模型來(lái)決定具體喚醒哪一個(gè)線程(無(wú)論哪種模式喚醒前都會(huì)釋放鎖并加內(nèi)存屏障)
QMode = 0:優(yōu)先喚醒 EntryList頭,如果為空,則將 cxq 中的節(jié)點(diǎn)移動(dòng)到 EntryList 中,再去喚醒 EntryList頭
QMode = 1: 流程同上,不同的是,移動(dòng)節(jié)點(diǎn)的同時(shí),會(huì)反轉(zhuǎn)cxq鏈表
QMode = 2: 優(yōu)先喚醒 cxq 的頭部節(jié)點(diǎn),如果為空,則喚醒EntryList頭
QMode = 3: 優(yōu)先將 cxq 的節(jié)點(diǎn)移動(dòng)到 EntryList 尾部,然后去喚醒 EntryList 頭
QMode = 4: 優(yōu)先將 cxq 的節(jié)點(diǎn)移動(dòng)到 EntryList 頭部,然后去喚醒 EntryList 頭
為什么需要cxq和entryList兩個(gè)隊(duì)列?
我認(rèn)為是因?yàn)槿绻挥靡粋€(gè)隊(duì)列的話出入隊(duì)操作大概率會(huì)發(fā)生沖突。用兩個(gè)隊(duì)列從宏觀上來(lái)看可以粗略的認(rèn)為入隊(duì)在cxq,出隊(duì)在entryList。
重量級(jí)鎖的降級(jí)
先看這么一個(gè)案例
上面的案例驗(yàn)證了重量級(jí)鎖釋放后鎖狀態(tài)還是重量級(jí)鎖(owner指向null),并沒(méi)有降級(jí)到無(wú)鎖。那為什么無(wú)競(jìng)爭(zhēng)后會(huì)變成無(wú)鎖呢?
因?yàn)镴VM在全局安全點(diǎn)執(zhí)行清理任務(wù)時(shí)會(huì)觸發(fā)鎖的降級(jí)來(lái)恢復(fù)閑置 ObjectMonitor 鎖對(duì)象對(duì)應(yīng)的 markword 對(duì)象頭并重置 ObjectMonitor 等待復(fù)用。
safepoint.cpp#SafepointSynchronize::do_cleanup_tasks
//全局安全點(diǎn)的清理任務(wù)
void SafepointSynchronize::do_cleanup_tasks() {
//...
//觸發(fā)重量級(jí)鎖降級(jí)
ObjectSynchronizer::deflate_idle_monitors();
//...
}
synchronizer.cpp#ObjectSynchronizer::deflate_idle_monitors
void ObjectSynchronizer::deflate_idle_monitors() {
//...
// 遍歷所有現(xiàn)存 ObjectMonitor
else for (ObjectMonitor* block = gBlockList; block != NULL; block = next(block)) {
assert(block->object() == CHAINMARKER, "must be a block header");
nInCirculation += _BLOCKSIZE ;
for (int i = 1 ; i < _BLOCKSIZE; i++) {
ObjectMonitor* mid = &block[i];
oop obj = (oop) mid->object();
//obj為null說(shuō)明還未分配,跳過(guò)
if (obj == NULL) {
guarantee (!mid->is_busy(), "invariant") ;
continue ;
}
// 調(diào)用 ObjectSynchronizer::deflate_monitor 方法嘗試降級(jí)
deflated = deflate_monitor(mid, obj, &FreeHead, &FreeTail);
//...
}
}
//...
}
synchronizer.cpp#ObjectSynchronizer::deflate_monitor
bool ObjectSynchronizer::deflate_monitor(ObjectMonitor* mid, oop obj,
ObjectMonitor** FreeHeadp, ObjectMonitor** FreeTailp) {
//...
if (mid->is_busy()) {
//...
} else {
//...
// 將鎖對(duì)象的 Mark Word 設(shè)置為無(wú)鎖狀態(tài)(001)
obj->release_set_mark(mid->header());
//...
// 將 monitor 放到空閑鏈表中,等待釋放
if (*FreeHeadp == NULL) *FreeHeadp = mid;
if (*FreeTailp != NULL) {
ObjectMonitor * prevtail = *FreeTailp;
assert(prevtail->FreeNext == NULL, "cleaned up deflated?");
prevtail->FreeNext = mid;
}
*FreeTailp = mid;
deflated = true;
}
return deflated;
}
輕量級(jí)鎖釋放的時(shí)候也會(huì)變成無(wú)鎖狀態(tài),但我個(gè)人認(rèn)為這個(gè)過(guò)程不叫鎖的降級(jí),只是輕量級(jí)鎖釋放中的一個(gè)步驟而已。鎖降級(jí)是指調(diào)用了 deflate_xxx 方法。畢竟 deflate 是可以是降低下降的意思,與之對(duì)立的是鎖膨脹 inflate。
其他
鎖粗化、鎖消除
//鎖粗化:因?yàn)槭乔昂髎ynchronized是lock對(duì)象,所以會(huì)粗化成一個(gè)synchronized來(lái)括住這兩個(gè)同步塊
public class LockCoarseningExample {
private Object lock = new Object();
public void doSomething() {
synchronized (lock) { // 第一個(gè)同步塊
//...
}
synchronized (lock) { // 第二個(gè)同步塊
//..
}
}
}
//鎖消除:這里的str拼接不會(huì)被其他線程訪問(wèn)(沒(méi)有線程逃逸),可以進(jìn)行鎖消除
public class LockEliminationExample {
public void doSomething() {
StringBuilder str = new StringBuilder();
for (int i = 0; i < 1000; i++) {
str.append("Value " + i);
}
}
}
調(diào)用hashcode、wait/notify對(duì)Synchronized鎖狀態(tài)的影響
同步代碼塊內(nèi)調(diào)用hashcode會(huì)立馬變成重量級(jí)鎖,同步代碼塊外調(diào)用會(huì)把偏向鎖撤銷變成無(wú)鎖。
調(diào)用wait會(huì)變成重量級(jí)鎖,調(diào)用notify會(huì)把偏向鎖變成輕量級(jí)鎖。
從源碼上看,notify也調(diào)用了ObjectSynchronizer::inflate應(yīng)該也會(huì)膨脹成重量級(jí)鎖。
但是實(shí)際測(cè)試打印的結(jié)果是輕量級(jí)鎖,我猜測(cè)可能是因?yàn)檫@種情況的重量級(jí)鎖很快就被降級(jí)了。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-700683.html
注:運(yùn)行JDK版本是11,本文參考的JVM源碼版本是jdk8u-hotspot文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-700683.html
到了這里,關(guān)于Java多線程篇(1)——深入分析synchronized的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!