?? Java學(xué)習(xí):社區(qū)快速通道
?? 深入淺出RocketMQ設(shè)計(jì)思想:深入淺出RocketMQ設(shè)計(jì)思想
?? 絕對(duì)不一樣的職場(chǎng)干貨:大廠最佳實(shí)踐經(jīng)驗(yàn)指南
?? 最近更新:2023年6月4日?? 點(diǎn)贊 ?? 收藏 ?留言 ?? 都是我最大的動(dòng)力!
通過(guò)本文你可以學(xué)習(xí)到:
- 常見的7種負(fù)載均衡策略思想
- 自旋鎖的使用方式
- 防御性編程
負(fù)載均衡策略
RandomRule
該策略會(huì)從當(dāng)前可用的服務(wù)節(jié)點(diǎn)中,隨機(jī)挑選一個(gè)節(jié)點(diǎn)訪問(wèn),使用了yield+自旋的方式做重試,還采用了嚴(yán)格的防御性編程。
RoundRobinRule
該策略會(huì)從一個(gè)節(jié)點(diǎn)一步一步地向后選取節(jié)點(diǎn),如下圖所示:
在多線程環(huán)境下,兩個(gè)請(qǐng)求同時(shí)訪問(wèn)這個(gè)Rule也不會(huì)讀取到相同節(jié)點(diǎn):這靠的是RandomRobinRule底層的自旋鎖+CAS的同步操作。
CAS+自旋鎖這套組合技是高并發(fā)下最廉價(jià)的線程安全手段,因?yàn)檫@套操作不需要鎖定系統(tǒng)資源。但缺點(diǎn)是,自旋鎖如果遲遲不能釋放,將會(huì)帶來(lái)CPU資源的浪費(fèi),因?yàn)樽孕旧聿⒉粫?huì)執(zhí)行任何業(yè)務(wù)邏輯,而是單純的使CPU空轉(zhuǎn)。所以通常情況下會(huì)對(duì)自旋鎖的旋轉(zhuǎn)次數(shù)做一個(gè)限制,比如JDK中synchronize
底層的鎖升級(jí)策略,就對(duì)自旋次數(shù)做了動(dòng)態(tài)調(diào)整。
while (true) {
// cas操作
if (cas(expected, update)) {
// 業(yè)務(wù)邏輯代碼
// break或退出return
}
}
Eureka為了防止服務(wù)下線被重復(fù)調(diào)用,就使用AtomicBoolean的CAS方法做同步控制;
奈飛提供的SpringCloud組件有特別多用到CAS的地方,感興趣的小伙伴們可以發(fā)現(xiàn)一下
RetryRule
RetryRule是一個(gè)類似裝飾器模式的規(guī)則,裝飾器相當(dāng)于一層套一層的套娃,每一層都會(huì)加上一層獨(dú)特的功能。
經(jīng)典的裝飾器模式示意圖:
借助上面的思路,RetryRule
就是給其他負(fù)載均衡策略加上重試功能。在RetryRule
里還藏著一個(gè)subRule
,這才是真正被執(zhí)行的負(fù)載均衡策略,RetryRule
正是要為它添加重試功能(如果初始化時(shí)沒(méi)指定subRule
,將默認(rèn)使用RoundRibinRule
)。
WeightedResponseTimeRule
這個(gè)規(guī)則繼承自RoundRibbonRule
,他會(huì)根據(jù)服務(wù)節(jié)點(diǎn)的響應(yīng)時(shí)間計(jì)算權(quán)重,響應(yīng)時(shí)間越長(zhǎng)權(quán)重就越低,響應(yīng)越快則權(quán)重越高,權(quán)重的高低決定了機(jī)器被選中概率的高低。也就是說(shuō),響應(yīng)時(shí)間越小的機(jī)器,被選中的概率越大。
服務(wù)器剛啟動(dòng)的時(shí)候,對(duì)各個(gè)服務(wù)節(jié)點(diǎn)采樣不足,因此會(huì)采用輪詢策略,當(dāng)積累到一定的樣本時(shí)候,才會(huì)切換到
WeightedResponseTimeRule
模式。
BestAvailableRule
在過(guò)濾掉故障服務(wù)以后,它會(huì)基于過(guò)去30分鐘的統(tǒng)計(jì)結(jié)果選取當(dāng)前并發(fā)量最小的服務(wù)節(jié)點(diǎn)作為目標(biāo)地址。如果統(tǒng)計(jì)結(jié)果尚未生成,則采用輪詢的方式選定節(jié)點(diǎn)。
AvailabilityFilteringRule
這個(gè)規(guī)則底層依賴RandomRobinRule
來(lái)選取節(jié)點(diǎn),但必須要滿足它的最低要求的節(jié)點(diǎn)才會(huì)被選中。如果節(jié)點(diǎn)滿足了要求,無(wú)論其響應(yīng)時(shí)間或者當(dāng)前并發(fā)量是什么,都會(huì)被選中。
每次AvailabilityFilteringRule
都會(huì)請(qǐng)求RobinRule
挑選一個(gè)節(jié)點(diǎn),然后對(duì)這個(gè)節(jié)點(diǎn)做以下兩步檢查:
- 是否處于熔斷狀態(tài)
- 節(jié)點(diǎn)當(dāng)前的請(qǐng)求連接數(shù)超過(guò)閾值,超過(guò)了則表示節(jié)點(diǎn)目前太忙
如果被選中的server
掛了,那么AFR會(huì)自動(dòng)重試(最多10次),讓RobinRule
重新選擇一個(gè)服務(wù)節(jié)點(diǎn)
ZoneAvoidanceRule
這個(gè)過(guò)濾器包含了組合過(guò)濾條件,分別是Zone級(jí)別和可用性級(jí)別。
-
Zone Filter: Zone可以理解為機(jī)房所屬的大區(qū)域,這里會(huì)對(duì)這個(gè)Zone下面所有的服務(wù)節(jié)點(diǎn)進(jìn)行健康情況過(guò)濾。
-
可用性過(guò)濾: 這里和
AvailabilityFilteringRule
的驗(yàn)證過(guò)程很像,會(huì)過(guò)濾掉當(dāng)前并發(fā)量較大,或者處于熔斷狀態(tài)的服務(wù)節(jié)點(diǎn)。
Ribbon 負(fù)載均衡策略源碼
RandomRule源碼
先從RandomRule
看起,核心的方法是:
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}
int index = chooseRandomInt(serverCount);
server = upList.get(index);
if (server == null) {
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
server = null;
Thread.yield();
}
return server;
}
在RandomRule
里方法的入?yún)?code>key沒(méi)有用到,所以可以先暫時(shí)忽略
while
循環(huán)邏輯是如果server
為空,則找到一個(gè)可用的server
if (Thread.interrupted()) {
return null;
}
如果線程暫停了,則直接返回空(防御性編程)
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();
allList
存儲(chǔ)的是所有的服務(wù),upList
存儲(chǔ)的是可運(yùn)行狀態(tài)的服務(wù)
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}
服務(wù)中心上沒(méi)有server
注冊(cè),則返回空
int index = chooseRandomInt(serverCount);
server = upList.get(index);
隨機(jī)選擇一個(gè)server
其中,chooseRandomInt的邏輯如下:
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
返回0到serverCount
中間的任意一個(gè)值
java中的隨機(jī)是可以預(yù)測(cè)到結(jié)果的,真隨機(jī)數(shù)一般會(huì)摻雜一些不可預(yù)測(cè)的數(shù)據(jù),比如當(dāng)前cpu的溫度
回到RandomRule
的choose
方法:
如果發(fā)現(xiàn)隨機(jī)選擇的server
為空表示此時(shí)serverList
正在被修正,此時(shí)讓出線程資源,進(jìn)行下一次循環(huán),對(duì)應(yīng)最開始的防御性編程
if (server == null) {
Thread.yield();
continue;
}
if (server.isAlive()) {
return (server);
}
如果server
可用直接return
server = null;
Thread.yield();
如果不可用則server
置為空,下一次循環(huán)會(huì)選一個(gè)新的,最后讓出資源。
所以該方法每次進(jìn)入下一次循環(huán)時(shí)都會(huì)讓出線程。
RoundRobinRule源碼
接下來(lái)看RoundRobinRule
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
while
循環(huán)里面有一個(gè)計(jì)數(shù)器,如果重試10次依然沒(méi)有結(jié)果返回就不重試了。
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
reachableServers
就是up
狀態(tài)的server
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
沒(méi)有可用服務(wù)器則返回空
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
選擇哪個(gè)下標(biāo)的server
,進(jìn)入incrementAndGetModulo
方法
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
使用了自旋鎖,nextServerCyclicCounter
是一個(gè)線程安全的數(shù)字。
if (server == null) {
Thread.yield();
continue;
}
如果獲取到的server
為空則讓出資源,繼續(xù)下一次循環(huán)
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
server
是正常的則返回
server = null;
最后沒(méi)有讓出線程資源,因?yàn)橹卦?0次后就退出循環(huán)了
BestAvailableRule源碼
接下來(lái)看BestAvailableRule
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}
if (loadBalancerStats == null) {
return super.choose(key);
}
如果loadBalancerStats
為空則調(diào)用父類的choose
方法,父類方法直接委托給RoundRobinRule
來(lái)完成choose
。
for
循環(huán)里先從loadBalancerStats
中獲取到當(dāng)前服務(wù)的狀態(tài)
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
public ServerStats getSingleServerStat(Server server) {
return getServerStats(server);
}
protected ServerStats getServerStats(Server server) {
try {
return serverStatsCache.get(server);
} catch (ExecutionException e) {
ServerStats stats = createServerStats(server);
serverStatsCache.asMap().putIfAbsent(server, stats);
return serverStatsCache.asMap().get(server);
}
}
這里是從緩存中獲取server的stats
,如果獲取失敗則默認(rèn)創(chuàng)建一個(gè)stats
并添加到緩存中,然后從cache
中再獲取一次。
隨后判斷是否處于熔斷狀態(tài)
if (!serverStats.isCircuitBreakerTripped(currentTime)) {...}
public boolean isCircuitBreakerTripped(long currentTime) {
long circuitBreakerTimeout = getCircuitBreakerTimeout();
if (circuitBreakerTimeout <= 0) {
return false;
}
return circuitBreakerTimeout > currentTime;
}
首先獲得熔斷的TimeOut
(表示截止到未來(lái)某個(gè)時(shí)間熔斷終止),如果大于當(dāng)前時(shí)間說(shuō)明處于熔斷狀態(tài)。
熔斷的TimeOut
由下面方法計(jì)算得到:
private long getCircuitBreakerTimeout() {
long blackOutPeriod = getCircuitBreakerBlackoutPeriod();
if (blackOutPeriod <= 0) {
return 0;
}
return lastConnectionFailedTimestamp + blackOutPeriod;
}
返回上一次連接失敗的時(shí)間戳 + blackOutPeriod
其中又調(diào)用了
private long getCircuitBreakerBlackoutPeriod() {
int failureCount = successiveConnectionFailureCount.get();
int threshold = connectionFailureThreshold.get();
if (failureCount < threshold) {
return 0;
}
int diff = (failureCount - threshold) > 16 ? 16 : (failureCount - threshold);
int blackOutSeconds = (1 << diff) * circuitTrippedTimeoutFactor.get();
if (blackOutSeconds > maxCircuitTrippedTimeout.get()) {
blackOutSeconds = maxCircuitTrippedTimeout.get();
}
return blackOutSeconds * 1000L;
}
failureCount
是失敗的個(gè)數(shù),從一個(gè)計(jì)數(shù)器里獲得,閾值從一個(gè)緩存的屬性中獲得,之后計(jì)算兩個(gè)的差值,再根據(jù)緩存中的一些屬性計(jì)算最終的秒數(shù),最后乘以1000返回。
回到BestAvailableRule
的choose
方法,只有不處于熔斷狀態(tài)才能繼續(xù)走后面的流程
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
選出連接數(shù)最小的服務(wù)器
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
最后返回
核心是找到一個(gè)最輕松的服務(wù)器。
RetryRule源碼
查看RetryRule
源碼:
public Server choose(ILoadBalancer lb, Object key) {
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
Server answer = null;
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
InterruptTask task = new InterruptTask(deadline
- System.currentTimeMillis());
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
task.cancel();
}
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
}
long requestTime = System.currentTimeMillis();
long deadline = requestTime + maxRetryMillis;
先記錄當(dāng)前時(shí)間和deadline
,在截止時(shí)間之前可以一直重試。
answer = subRule.choose(key);
方法里面是由subRule
來(lái)實(shí)現(xiàn)具體的負(fù)載均衡邏輯,這里默認(rèn)類型是RoundRobinRule
如果選到的是空或者選到的不是up的,且時(shí)間在ddl之前則進(jìn)入重試邏輯:
while (!Thread.interrupted()) {
answer = subRule.choose(key);
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}
}
如果線程中斷了就中斷重試。之后重新選擇服務(wù)器,如果又沒(méi)選到則把資源讓出去,下一次while
循環(huán)再選,在while
循環(huán)之前會(huì)起一個(gè)任務(wù)
InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis());
到了截止時(shí)間之后,程序會(huì)中斷重試的流程文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-470285.html
task.cancel();
最后返回文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-470285.html
if ((answer == null) || (!answer.isAlive())) {
return null;
} else {
return answer;
}
到了這里,關(guān)于Ribbon 負(fù)載均衡策略 —— 圖解、源碼級(jí)解析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!