一、集群節(jié)點(diǎn)信息如何更新?
EurekaServer節(jié)點(diǎn)啟動(dòng)的時(shí)候,DefaultEurekaServerContext.init()方法調(diào)用PeerEurekaNodes.start()方法,start方法中resolvePeerUrls()會(huì)從配置文件讀取serviceUrl屬性值獲得集群最新節(jié)點(diǎn)信息,通過(guò)updatePeerEurekaNodes()方法將最新節(jié)點(diǎn)信息更新到PeerEurekaNodes類的屬性peerEurekaNodes和peerEurekaNodeUrls中。
PeerEurekaNodes.start()還提供了一個(gè)更新節(jié)點(diǎn)信息的定時(shí)任務(wù),每隔PeerEurekaNodesUpdateIntervalMs (默認(rèn)10min) 時(shí)間執(zhí)行一次。
針對(duì)上述這個(gè)過(guò)程,需要注意的是(網(wǎng)上很多文章表述有誤):peerEurekaNodesUpdateIntervalMs這個(gè)參數(shù)并不是集群節(jié)點(diǎn)注冊(cè)信息的同步間隔時(shí)間。為什么這么說(shuō)呢?我們可以看上圖中的定時(shí)任務(wù)peersUpdateTask做了什么事情即可,其實(shí)只要看updatePeerEurekaNodes(resolvePeerUrls())做了什么即可。
通過(guò)源碼跟蹤:resolvePeerUrls()方法默認(rèn)即執(zhí)行了EndpointUtils.getServiceUrlsFromConfig(),即從配置文件讀取服務(wù)節(jié)點(diǎn)urls信息。updatePeerEurekaNodes()方法則把獲取得到的urls與之前urls信息進(jìn)行對(duì)比,該新增的放到新增list,該刪除的放到刪除list,如果既沒(méi)有新增的節(jié)點(diǎn)也沒(méi)有待刪除的節(jié)點(diǎn),則返回不做任何處理。下面是代碼:
針對(duì)要?jiǎng)h除的節(jié)點(diǎn),處理方式是從節(jié)點(diǎn)列表移除,同時(shí)調(diào)用該節(jié)點(diǎn)的shutDown方法;針對(duì)要新增的節(jié)點(diǎn),則加入到節(jié)點(diǎn)列表中,并創(chuàng)建新的節(jié)點(diǎn),調(diào)用方法createPeerEurekaNode(String peerEurekaNodeUrl);創(chuàng)建節(jié)點(diǎn)最終調(diào)用了構(gòu)造方法:
new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
上述構(gòu)造方法會(huì)從服務(wù)其他節(jié)點(diǎn)獲取注冊(cè)服務(wù)信息同步到新創(chuàng)建的服務(wù)節(jié)點(diǎn)中。
那么接下來(lái)就要問(wèn)第二個(gè)問(wèn)題了。
二、注冊(cè)服務(wù)信息如何在集群節(jié)點(diǎn)中同步?
注冊(cè)服務(wù)信息在集群節(jié)點(diǎn)中的同步有兩種場(chǎng)景:一種是新增服務(wù)節(jié)點(diǎn)如何從其他服務(wù)節(jié)點(diǎn)同步服務(wù)注冊(cè)信息;另一種是穩(wěn)定的集群中,各節(jié)點(diǎn)之間是如何同步服務(wù)注冊(cè)信息。
我們先來(lái)看第一種場(chǎng)景,新增節(jié)點(diǎn)如何從其他服務(wù)節(jié)點(diǎn)同步服務(wù)注冊(cè)信息。
(1)Eureka Server節(jié)點(diǎn)啟動(dòng)時(shí)的服務(wù)同步
Eureka Server啟動(dòng)時(shí),EurekaServerBootStrap調(diào)用PeerAwareInstanceRegistryImpl.syncUp(),同步注冊(cè)服務(wù)信息。
// Copy registry from neighboring eureka node
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
syncUp()進(jìn)行注冊(cè)服務(wù)信息的同步:
/**
* Populates the registry information from a peer eureka node. This
* operation fails over to other nodes until the list is exhausted if the
* communication fails.
*/
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
int count = 0;
?
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
首先會(huì)從EurekaClient.getApplications獲取所有的注冊(cè)服務(wù)信息,然后調(diào)用register()進(jìn)行服務(wù)注冊(cè)。
如果同步失敗,則會(huì)sleep serverConfig.getRegistrySyncRetryWaitMs()后,再次進(jìn)行同步。
同步完成后,調(diào)用PeerAwareInstanceRegistryImpl.openForTraffic()方法,進(jìn)行自我保護(hù)閥值的計(jì)算:
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
至此,Eureka Server啟動(dòng)時(shí)的節(jié)點(diǎn)復(fù)制就進(jìn)行完了。
備注
最大同步重試次數(shù)=serverConfig.getRegistrySyncRetries(),默認(rèn)5次。
同步失敗后每次同步的間隔時(shí)間=serverConfig.getRegistrySyncRetryWaitMs(),默認(rèn)30s。
如果啟動(dòng)時(shí)同步服務(wù)注冊(cè)信息失敗,一段時(shí)間不對(duì)外提供服務(wù)注冊(cè)功能,waitTimeInMsWhenSyncEmpty,默認(rèn)5min。
第二種場(chǎng)景:
(2)Eureka Server接收到Register、renew、cancel時(shí)的服務(wù)同步
處理Register、renew、cancel同步時(shí)比較復(fù)雜,包括好幾個(gè)隊(duì)列的轉(zhuǎn)換處理以及異常處理。這里算是個(gè)簡(jiǎn)圖吧,省略了隊(duì)列的轉(zhuǎn)換和異常處理。
Eureka Server接收到Register、renew或cancel事件后,執(zhí)行向其他節(jié)點(diǎn)同步:
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
?
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
/**
* Replicates all instance changes to peer eureka nodes except for
* replication traffic to this node.
*
*/
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
以Register為例(邏輯是一樣的),Eureka Server循環(huán)遍歷其他節(jié)點(diǎn),然后調(diào)用node.register(info):
/**
* Sends the registration information of {@link InstanceInfo} receiving by
* this node to the peer node represented by this class.
*
* @param info
* the instance information {@link InstanceInfo} of any instance
* that is send to this instance.
* @throws Exception
*/
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
這里執(zhí)行TaskDispatcher的process,實(shí)際底層轉(zhuǎn)到了AcceptorExecutor.process():
acceptorExecutor.process(id, task, expiryTime);
然后acceptorExecutor里會(huì)開(kāi)啟內(nèi)部線程AcceptorRunner進(jìn)行處理,處理邏輯有點(diǎn)復(fù)雜(包括隊(duì)列的轉(zhuǎn)換和異常處理),最終將處理好的結(jié)果放到BlockingQueue> batchWorkQueue中。
TaskExecutors中會(huì)在內(nèi)部開(kāi)啟WokerRunnable線程組(ThreadGroup),循環(huán)poll batchWorkQueue隊(duì)列中的Task,然后調(diào)用InstanceReplicationTask的execute方法,將register事件推送到其他Eureka Server節(jié)點(diǎn)。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-823095.html
public void run() {
try {
while (!isShutdown.get()) {
List<TaskHolder<ID, T>> holders = getWork();
metrics.registerExpiryTimes(holders);
?
List<T> tasks = getTasksOf(holders);
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn("Discarding {} tasks of {} due to permanent error", holders.size(), workerName);
}
metrics.registerTaskResult(result, tasks.size());
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
// Safe-guard, so we never exit this loop in an uncontrolled way.
logger.warn("Discovery WorkerThread error", e);
}
}
register、renew、cancel的處理邏輯一樣,只不過(guò)調(diào)用的同步接口不同罷了。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-823095.html
到了這里,關(guān)于spring eureka集群相關(guān)問(wèn)題的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!