一、負(fù)載均衡概述
支持輪詢、隨機(jī)、一致性hash和最小活躍數(shù)等。
1、輪詢
① sequences:內(nèi)部的序列計(jì)數(shù)器
② 服務(wù)器接口方法權(quán)重一樣:(sequences+1)%服務(wù)器的數(shù)量=(決定調(diào)用)哪個(gè)服務(wù)器的服務(wù)。
③ 服務(wù)器接口方法權(quán)重不一樣:找到最大權(quán)重(權(quán)重?cái)?shù))%(sequences+1),然后找出權(quán)重比該取模后的值大服務(wù)器列表,最后進(jìn)行【①】所述。
2、隨機(jī)
統(tǒng)計(jì)服務(wù)器上該接口的方法權(quán)重總和,然后對這個(gè)總和隨機(jī)nextInt一下,看生成的隨機(jī)數(shù)落到哪個(gè)段內(nèi),就調(diào)用哪個(gè)服務(wù)器上的該服務(wù)。
3、一致性hash
保證了同樣的請求(參數(shù))將會落到同一臺服務(wù)器上。
4、最小活躍數(shù)
每個(gè)接口和接口方法都對應(yīng)一個(gè)RpcStatus,記錄它們的活躍數(shù)、失敗等相關(guān)統(tǒng)計(jì)信息,調(diào)用時(shí)活躍數(shù)+1,調(diào)用結(jié)束時(shí)活躍數(shù)-1,所以活躍值越大,表明該提供者服務(wù)器的該接口方法耗時(shí)越長,而消費(fèi)能力強(qiáng)的提供者接口往往活躍值很低。最少活躍負(fù)載均衡保證了“慢”提供者能接收到更少的服務(wù)器調(diào)用。
二、負(fù)載均衡策略配置
1、多注冊中心集群負(fù)載均衡
2、Cluster Invoker
支持的選址策略如下(dubbo2.7.5+ 版本,具體使用請參見文檔)
2-1、指定優(yōu)先級
<!-- 來自 preferred=“true” 注冊中心的地址將被優(yōu)先選擇,
只有該中心無可用地址時(shí)才 Fallback 到其他注冊中心 -->
<dubbo:registry address="zookeeper://${zookeeper.address1}" preferred="true" />
2-2、同zone優(yōu)先
<!-- 選址時(shí)會和流量中的 zone key 做匹配,流量會優(yōu)先派發(fā)到相同 zone 的地址 -->
<dubbo:registry address="zookeeper://${zookeeper.address1}" zone="beijing" />
2-3、權(quán)重輪詢
<!-- 來自北京和上海集群的地址,將以 10:1 的比例來分配流量 -->
<dubbo:registry id="beijing" address="zookeeper://${zookeeper.address1}" weight=”100“ />
<dubbo:registry id="shanghai" address="zookeeper://${zookeeper.address2}" weight=”10“ />
三、負(fù)載均衡解讀
(1)負(fù)載均衡:AbstractClusterInvoker.invoke(final Invocation invocation)方法
@Override
public Result invoke(final Invocation invocation) throws RpcException {
//...... 省略代碼
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
/** 獲取負(fù)載均衡的實(shí)現(xiàn)方法,未配置時(shí)默認(rèn)random */
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
return ExtensionLoader.getExtensionLoader(LoadBalance.class)
.getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation),
LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class)
.getExtension(DEFAULT_LOADBALANCE);
}
}
(2)實(shí)現(xiàn)入口:AbstractClusterInvoker.doSelect(…)
① 在dubbo中,所有負(fù)載均衡均繼承 AbstractLoadBalance 類,該類實(shí)現(xiàn)了LoadBalance接口,并封裝了一些公共的邏輯。
/** 1. LoadBalance.java接口 */
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
/** 2. AbstractLoadBalance.java 抽象類 */
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
// 如果invokers列表中僅一個(gè)Invoker,直接返回即可,無需進(jìn)行負(fù)載均衡
if (invokers.size() == 1) {
return invokers.get(0);
}
// 調(diào)用doSelect方法進(jìn)行負(fù)載均衡,該方法為抽象方法,由子類實(shí)現(xiàn)
return doSelect(invokers, url, invocation);
}
protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
/** 2-1. 公共方法,權(quán)重計(jì)算邏輯 */
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight;
URL url = invoker.getUrl();
// Multiple registry scenario, load balance among multiple registries.
if (REGISTRY_SERVICE_REFERENCE_PATH.equals(url.getServiceInterface())) {
weight = url.getParameter(REGISTRY_KEY + "." + WEIGHT_KEY, DEFAULT_WEIGHT);
} else {
// 從url中獲取權(quán)重 weight配置值
weight = url.getMethodParameter(invocation.getMethodName(), WEIGHT_KEY, DEFAULT_WEIGHT);
if (weight > 0) {
// 獲取服務(wù)提供者啟動時(shí)間戳
long timestamp = invoker.getUrl().getParameter(TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 計(jì)算服務(wù)提供者運(yùn)行時(shí)長
long uptime = System.currentTimeMillis() - timestamp;
if (uptime < 0) {
return 1; // 未啟動直接返回權(quán)重為1
}
// 獲取服務(wù)預(yù)熱時(shí)間,默認(rèn)為10分鐘
int warmup = invoker.getUrl().getParameter(WARMUP_KEY, DEFAULT_WARMUP);
// 如果服務(wù)運(yùn)行時(shí)間小于預(yù)熱時(shí)間,則重新計(jì)算服務(wù)權(quán)重,即降級
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight((int)uptime, warmup, weight);
}
}
}
}
return Math.max(weight, 0);
}
// 2-2. 重新計(jì)算服務(wù)權(quán)重
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 計(jì)算權(quán)重,下面代碼邏輯上形似于 (uptime / warmup) * weight
// 隨著服務(wù)運(yùn)行時(shí)間 uptime 增大,權(quán)重計(jì)算值 ww 會慢慢接近配置值 weight
int ww = (int) ( uptime / ((float) warmup / weight));
return ww < 1 ? 1 : (Math.min(ww, weight));
}
注:
select 方法的邏輯比較簡單,首先會檢測 invokers 集合的合法性,然后再檢測 invokers 集合元素?cái)?shù)量。如果只包含一個(gè) Invoker,直接返回該 Inovker 即可。如果包含多個(gè) Invoker,此時(shí)需要通過負(fù)載均衡算法選擇一個(gè) Invoker。具體的負(fù)載均衡算法由子類實(shí)現(xiàn)。
權(quán)重的計(jì)算主要用于保證當(dāng)服務(wù)運(yùn)行時(shí)長小于服務(wù)預(yù)熱時(shí)間時(shí),對服務(wù)進(jìn)行降權(quán),避免讓服務(wù)在啟動之初就處于高負(fù)載狀態(tài)。服務(wù)預(yù)熱是一個(gè)優(yōu)化手段,與此類似的還有 JVM 預(yù)熱。主要目的是讓服務(wù)啟動后“低功率”運(yùn)行一段時(shí)間,使其效率慢慢提升至最佳狀態(tài)。
配置方式(默認(rèn)100):dubbo.provider.weight=300dubbo.provider.weight=300
② RandomLoadBalance 加權(quán)隨機(jī)負(fù)載均衡
算法思路:根據(jù)權(quán)重比,隨機(jī)選擇哪臺服務(wù)器,如:servers=[A,B,C],weights = [5, 3, 2],權(quán)重和為10,調(diào)用A的次數(shù)約有50%,B有30%,C有20%。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// 判斷是否需要權(quán)重負(fù)載均衡
if (!needWeightLoadBalance(invokers,invocation)){
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
// Every invoker has the same weight?
boolean sameWeight = true;
// the maxWeight of every invokers, the minWeight = 0 or the maxWeight of the last invoker
int[] weights = new int[length];
// The sum of weights
int totalWeight = 0;
// ① 計(jì)算總權(quán)重,totalWeight;② 檢測每個(gè)服務(wù)提供者的權(quán)重是否相同
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// Sum
totalWeight += weight;
// save for later use
// 如果權(quán)重分別為5,3,2,則weights[0]=5,weights[1]=8,weights[2]=10
weights[i] = totalWeight;
// 判斷每個(gè)服務(wù)權(quán)重是否相同,如果不相同則sameWeight置為false
if (sameWeight && totalWeight != weight * (i + 1)) {
sameWeight = false;
}
}
// 各提供者服務(wù)權(quán)重不一樣時(shí),計(jì)算隨機(jī)數(shù)落在哪個(gè)區(qū)間上
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
// 隨機(jī)獲取一個(gè)[0, totalWeight]區(qū)間內(nèi)的隨機(jī)的數(shù)字
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
// weights[0]=5,offset=[0, 5)
// weights[1]=8,offset=[5, 8)
// weights[2]=10,offset=[8, 10)
if (offset < weights[i]) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果所有服務(wù)提供者權(quán)重值相同,此時(shí)直接隨機(jī)返回一個(gè)即可
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
注:RandomLoadBalance的算法比較簡單,多次請求后,能夠按照權(quán)重進(jìn)行“均勻“分配。調(diào)用次數(shù)少時(shí),可能產(chǎn)生的隨機(jī)數(shù)比較集中,此缺點(diǎn)并不嚴(yán)重,可以忽略。它是一個(gè)高效的負(fù)載均衡實(shí)現(xiàn),因此Dubbo選擇它作為缺省實(shí)現(xiàn)。
③ LeastActiveLoadBalance 加權(quán)最小活躍數(shù)負(fù)載均衡
活躍調(diào)用數(shù)越小,表明該服務(wù)提供者效率越高,單位時(shí)間內(nèi)可處理更多的請求。此時(shí)應(yīng)優(yōu)先將請求分配給該服務(wù)提供者。
在具體實(shí)現(xiàn)中,每個(gè)服務(wù)提供者對應(yīng)一個(gè)活躍數(shù) active。初始情況下,所有服務(wù)提供者活躍數(shù)均為0。每收到一個(gè)請求,活躍數(shù)加1,完成請求后則將活躍數(shù)減1。
除了最小活躍數(shù),LeastActiveLoadBalance 在實(shí)現(xiàn)上還引入了權(quán)重值。所以準(zhǔn)確的來說,LeastActiveLoadBalance 是基于加權(quán)最小活躍數(shù)算法實(shí)現(xiàn)的。文章來源:http://www.zghlxwxcb.cn/news/detail-650748.html
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// The least active value of all invokers
// 最小活躍數(shù)
int leastActive = -1;
// The number of invokers having the same least active value (leastActive)
// 具有相同“最小活躍數(shù)”的服務(wù)提供者
int leastCount = 0;
// The index of invokers having the same least active value (leastActive)
// leastIndexes 用于記錄具有相同“最小活躍數(shù)”的 Invoker 在 invokers 列表中的下標(biāo)信息
int[] leastIndexes = new int[length];
// the weight of every invokers
int[] weights = new int[length];
// The sum of the warmup weights of all the least active invokers
int totalWeight = 0;
// The weight of the first least active invoker
// 第一個(gè)最小活躍數(shù)的 Invoker 權(quán)重值,用于與其他具有相同最小活躍數(shù)的 Invoker 的權(quán)重進(jìn)行對比,
// 以檢測是否“所有具有相同最小活躍數(shù)的 Invoker 的權(quán)重”均相等
int firstWeight = 0;
// Every least active invoker has the same weight value?
// 表示各服務(wù)的權(quán)重是否相同
boolean sameWeight = true;
// Filter out all the least active invokers
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
// Get the active number of the invoker
// 獲取invoker對應(yīng)的活躍數(shù)
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
// Get the weight of the invoker's configuration. The default value is 100.
// 獲取權(quán)重
int afterWarmup = getWeight(invoker, invocation);
// save for later use
weights[i] = afterWarmup;
// If it is the first invoker or the active number of the invoker is less than the current least active number
// 發(fā)現(xiàn)更小的活躍數(shù),重新開始
if (leastActive == -1 || active < leastActive) {
// Reset the active number of the current invoker to the least active number
// 使用當(dāng)前活躍數(shù) active 更新最小活躍數(shù) leastActive
leastActive = active;
// Reset the number of least active invokers
leastCount = 1;
// Put the first least active invoker first in leastIndexes
// 記錄當(dāng)前下標(biāo)值到 leastIndexes 中
leastIndexes[0] = i;
// Reset totalWeight
totalWeight = afterWarmup;
// Record the weight the first least active invoker
firstWeight = afterWarmup;
// Each invoke has the same weight (only one invoker here)
sameWeight = true;
// If current invoker's active value equals with leaseActive, then accumulating.
// 當(dāng)前 Invoker 的活躍數(shù) active 與最小活躍數(shù) leastActive 相同
} else if (active == leastActive) {
// Record the index of the least active invoker in leastIndexes order
leastIndexes[leastCount++] = i;
// Accumulate the total weight of the least active invoker
// 累加權(quán)重
totalWeight += afterWarmup;
// If every invoker has the same weight?
// 檢測當(dāng)前 Invoker 的權(quán)重與 firstWeight 是否相等,不相等則將 sameWeight 置為 false
if (sameWeight && afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// Choose an invoker from all the least active invokers
// 1. 當(dāng)只有一個(gè) Invoker 具有最小活躍數(shù),此時(shí)直接返回該 Invoker 即可
if (leastCount == 1) {
// If we got exactly one invoker having the least active value, return this invoker directly.
return invokers.get(leastIndexes[0]);
}
// 2. 有多個(gè) Invoker 具有相同的最小活躍數(shù),但它們之間的權(quán)重不同
if (!sameWeight && totalWeight > 0) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on
// totalWeight.
// 隨機(jī)生成一個(gè) [0, totalWeight) 之間的數(shù)字
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < leastCount; i++) {
// 獲取權(quán)重?cái)?shù)組的下標(biāo)
int leastIndex = leastIndexes[i];
// 隨機(jī)權(quán)重 - 具有最小活躍數(shù)的 Invoker 的權(quán)重值
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) { // 與RandomLoadBalance一致,權(quán)重越大調(diào)用的次數(shù)越多
return invokers.get(leastIndex);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
// 如果權(quán)重相同或權(quán)重為0時(shí),隨機(jī)返回一個(gè) Invoker
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
④ ConsistentHashLoadBalance
cache-1、cache-2、cache-3、cache-4分別為不同的節(jié)點(diǎn)
根據(jù)IP或者其他信息為緩存節(jié)點(diǎn)生成一個(gè)hash,并將這個(gè)hash投射到[0,2^32 - 1] 的圓環(huán)上。當(dāng)有查詢或?qū)懭胝埱髸r(shí),則為緩存項(xiàng)的key生成一個(gè)hash值。然后查找第一個(gè)大于或等于該hash值的緩存節(jié)點(diǎn),并到這個(gè)節(jié)點(diǎn)中查詢或?qū)懭刖彺骓?xiàng)。
如果當(dāng)前節(jié)點(diǎn)掛了,則在下一次查詢或?qū)懭刖彺鏁r(shí),為緩存項(xiàng)查找另一個(gè)大于或其hash值的緩存節(jié)點(diǎn)即可。
如下圖,每個(gè)節(jié)點(diǎn)在圓環(huán)上占據(jù)一個(gè)位置,如果緩存項(xiàng)的key的hash值小于緩存節(jié)點(diǎn)hash值,則到該緩存節(jié)點(diǎn)中存儲或讀取緩存項(xiàng)。
比如下面綠色點(diǎn)對應(yīng)的緩存項(xiàng)將會被存儲到 cache-2 節(jié)點(diǎn)中。由于 cache-3 掛了,原本應(yīng)該存到該節(jié)點(diǎn)中的緩存項(xiàng)最終會存儲到 cache-4 節(jié)點(diǎn)中。
⑤ RoundRobinLoadBalance
輪詢:是指將請求輪流分配給每臺服務(wù)器。
例如:有三臺服務(wù)器A、B、C,我們將第一個(gè)請求分配給A服務(wù)器,第二個(gè)請求分配給B服務(wù)器,第三個(gè)請求分配給C服務(wù)器,第四個(gè)請求再次分配給A服務(wù)器,如此循環(huán),這個(gè)過程叫做輪詢(輪詢是一種無狀態(tài)負(fù)載均衡算法)。適用于每臺服務(wù)器性能相近的場景下。
輪詢加權(quán):對每臺性能不一樣的服務(wù)器進(jìn)行加權(quán)處理,如服務(wù)器A、B、C的權(quán)重分別為5:2:1時(shí),在8次請求中,服務(wù)器A將收到5次請求、B收到2次請求、C收到一次請求(請求次數(shù)越多,每臺服務(wù)器得到的請求數(shù),接近服務(wù)器的權(quán)重比)。文章來源地址http://www.zghlxwxcb.cn/news/detail-650748.html
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// key = [group/]path[:version].methodName;注:path = com.jyt.*.service.接口名
String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
// 獲取key對應(yīng)值,如果key的值不存在,則將第二個(gè)參數(shù)的返回值存入并返回
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
Invoker<T> selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
// 遍歷服務(wù)提供者
for (Invoker<T> invoker : invokers) {
// dubbo://11.1.1.109:21001/com.jyt.*.service.類名
String identifyString = invoker.getUrl().toIdentityString();
// 獲取當(dāng)前服務(wù)提供者的權(quán)重
int weight = getWeight(invoker, invocation);
// 根據(jù)identifyString獲取當(dāng)前提供者對應(yīng)的權(quán)重,如果不存在則使用第二個(gè)參數(shù)返回值,并返回
WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {
WeightedRoundRobin wrr = new WeightedRoundRobin();
wrr.setWeight(weight);
return wrr;
});
// 如果提供者的權(quán)重被修改了,則更新weightedRoundRobin的權(quán)重值
if (weight != weightedRoundRobin.getWeight()) {
// weight changed
weightedRoundRobin.setWeight(weight);
}
// current加上weight并獲取結(jié)果(初始的current為0)
long cur = weightedRoundRobin.increaseCurrent();
/**
* 如果A服務(wù)權(quán)重weight=500,B權(quán)重weight=100時(shí),totalWeight = 600,初始cur=0;服務(wù)調(diào)用場景
* 第一次:A服務(wù)器:cur=weight + curA = 500,cur > maxCurrent,所以maxCurrent = curA = 500
* B服務(wù)器:cur=weight + curB = 100 <= maxCurrent(500為true);故走A服務(wù)器,即curA = curA - 600 = -100
*
* 第二次:A服務(wù)器:cur=weight + curA = 400,cur > maxCurrent,所以maxCurrent = curA = 400
* B服務(wù)器:cur=weight + curB = 200 <= maxCurrent(400為true);故走A服務(wù)器,即curA = curA - 600 = -200
*
* 第三次:A服務(wù)器:cur=weight + curA = 300,cur > maxCurrent,所以maxCurrent = curA = 300
* B服務(wù)器:cur=weight + curB = 300 <= maxCurrent(300為true);故走A服務(wù)器,即curA = curA - 600 = -300
*
* 第四次:A服務(wù)器:cur=weight + curA = 200,cur > maxCurrent,所以maxCurrent = curA = 200
* B服務(wù)器:cur=weight + curB = 400 <= maxCurrent(200為false);故走B服務(wù)器,即curB = curB - 600 = -200
*
* 第五次:A服務(wù)器:cur=weight + curA = 700,cur > maxCurrent,所以maxCurrent = curA = 700
* B服務(wù)器:cur=weight + curB = -100 <= maxCurrent(700為true);故走A服務(wù)器,即curA = curA - 600 = 100
*
* 第六次:A服務(wù)器:cur=weight + curA = 600,cur > maxCurrent,所以maxCurrent = curA = 600
* B服務(wù)器:cur=weight + curB = 0 <= maxCurrent(600為true);故走A服務(wù)器,即curA = curA - 600 = 0
*
* ... ... 如此循環(huán):A、A、A、B、A、A
*/
weightedRoundRobin.setLastUpdate(now);
// 判斷是否比最大的值大
if (cur > maxCurrent) {
// 如果大,則將當(dāng)前服務(wù)提供者置為本次服務(wù)提供者
maxCurrent = cur;
selectedInvoker = invoker;
selectedWRR = weightedRoundRobin;
}
// 權(quán)重累計(jì)
totalWeight += weight;
}
// 當(dāng)兩者大小不一致時(shí),map中可能會存在一些已經(jīng)下線的服務(wù),本次剔除一些很久節(jié)點(diǎn)信息
if (invokers.size() != map.size()) {
map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
}
// 如果存在選擇好的提供者,則改變他的current值 - totalWeight;
if (selectedInvoker != null) {
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
// should not happen here
return invokers.get(0);
}
到了這里,關(guān)于中間件(二)dubbo負(fù)載均衡介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!