系列文章目錄和關(guān)于我
零丶長(zhǎng)輪詢的引入
最近在看工作使用到的配置中心原理,發(fā)現(xiàn)大多數(shù)配置中心在推和拉模型上做的選擇出奇的一致選擇了基于長(zhǎng)輪詢的拉模型
-
基于拉模型的客戶端輪詢的方案
客戶端通過輪詢方式發(fā)現(xiàn)服務(wù)端的配置變更事件。輪詢的頻率決定了動(dòng)態(tài)配置獲取的實(shí)時(shí)性。- 優(yōu)點(diǎn):簡(jiǎn)單、可靠。
- 缺點(diǎn):應(yīng)用增多時(shí),較高的輪詢頻率給整個(gè)配置中心服務(wù)帶來巨大的壓力。
另外,從配置中心的應(yīng)用場(chǎng)景上來看,是一種寫少讀多的系統(tǒng),客戶端大多數(shù)輪詢請(qǐng)求都是沒有意義的,因此這種方案不夠高效。
-
基于推模型的客戶端長(zhǎng)輪詢的方案
基于Http長(zhǎng)輪詢模型,實(shí)現(xiàn)了讓客戶端在沒有發(fā)生動(dòng)態(tài)配置變更的時(shí)候減少輪詢。這樣減少了無意義的輪詢請(qǐng)求量,提高了輪詢的效率;也降低了系統(tǒng)負(fù)載,提升了整個(gè)系統(tǒng)的資源利用率。
一丶何為長(zhǎng)輪詢
長(zhǎng)輪詢
本質(zhì)上是原始輪詢技術(shù)的一種更有效的形式。
它的出現(xiàn)是為了解決:向服務(wù)器發(fā)送重復(fù)請(qǐng)求會(huì)浪費(fèi)資源,因?yàn)楸仨殲槊總€(gè)新傳入的請(qǐng)求建立連接,必須解析請(qǐng)求的 HTTP 頭部,必須執(zhí)行對(duì)新數(shù)據(jù)的查詢,并且必須生成和交付響應(yīng)(通常不提供新數(shù)據(jù))然后必須關(guān)閉連接并清除所有資源。
- 從tomcat服務(wù)器的角度就是客戶端不停請(qǐng)求,每次都得解析報(bào)文封裝成Request,Response對(duì)象,并且占用線程池中的一個(gè)線程。
- 并且每次輪詢都要進(jìn)行tcp握手,揮手,網(wǎng)卡發(fā)起中斷,操作系統(tǒng)處理中斷從內(nèi)核空間拷貝數(shù)據(jù)到用戶空間,一通忙活服務(wù)端返回
配置未修改(配置中心沒有修改配置,客戶端緩存的配置和配置中心一致,所以是白忙活)
長(zhǎng)輪詢是一種服務(wù)器選擇盡可能長(zhǎng)的時(shí)間保持和客戶端連接打開的技術(shù)
,僅在數(shù)據(jù)變得可用或達(dá)到超時(shí)闕值后才提供響應(yīng)
,而不是在給到客戶端的新數(shù)據(jù)可用之前,讓每個(gè)客戶端多次發(fā)起重復(fù)的請(qǐng)求
簡(jiǎn)而言之,就是服務(wù)端并不是立馬寫回響應(yīng),而是hold住一段時(shí)間,如果這段時(shí)間有數(shù)據(jù)需要寫回(例如配置的修改,新配置需要寫回)再寫回,然后瀏覽器再發(fā)送一個(gè)新請(qǐng)求,從而實(shí)現(xiàn)及時(shí)性
,節(jié)省網(wǎng)絡(luò)開銷
的作用。
二丶使用等待喚醒機(jī)制寫一個(gè)簡(jiǎn)單的“長(zhǎng)輪詢”(脫褲子放屁)
package com.cuzzz.springbootlearn.longpull;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@RestController
@RequestMapping("long-pull")
public class MyController implements InitializingBean {
/**
* 處理任務(wù)的線程
*/
private ThreadPoolExecutor processExecutor;
/**
* 等待喚醒的鎖
*/
private static final ReentrantLock lock = new ReentrantLock();
/**
* 當(dāng)請(qǐng)求獲取配置的時(shí)候,在此condition上等待一定時(shí)間
* 當(dāng)修改配置的時(shí)候通過這個(gè)condition 通知其他獲取配置的線程
*/
private static final Condition condition = lock.newCondition();
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response) throws ExecutionException, InterruptedException {
//組轉(zhuǎn)成任務(wù)
Task<String> task = new Task<String>(request, response,
() -> "拿配置" + System.currentTimeMillis());
//提交到線程池
Future<?> submit = processExecutor.submit(task);
//tomcat線程阻塞于此
submit.get();
}
/**
* 模擬修改配置
*
* 喚醒其他獲取配置的線程
*/
@PostMapping
public String post(HttpServletRequest request, HttpServletResponse response) {
lock.lock();
try {
condition.signalAll();
}finally {
lock.unlock();
}
return "OK";
}
static class Task<T> implements Runnable {
private HttpServletResponse response;
/**
* 等待時(shí)長(zhǎng)
*/
private final long timeout;
private Callable<T> task;
public Task(HttpServletRequest request, HttpServletResponse response, Callable<T> task) {
this.response = response;
String time = request.getHeader("time-out");
if (time == null){
//默認(rèn)等待10秒
this.timeout = 10;
}else {
this.timeout = Long.parseLong(time);
}
this.task = task;
}
@Override
public void run() {
lock.lock();
try {
//超市等待
boolean await = condition.await(timeout, TimeUnit.SECONDS);
//超時(shí)
if (!await) {
throw new TimeoutException();
}
//獲取配置
T call = task.call();
//寫回
ServletOutputStream outputStream = response.getOutputStream();
outputStream.write(("沒超時(shí)拿當(dāng)前配置:" + call).getBytes(StandardCharsets.UTF_8));
} catch (TimeoutException | InterruptedException exception) {
//超時(shí)或者線程被中斷
try {
ServletOutputStream outputStream = response.getOutputStream();
T call = task.call();
outputStream.write(("超時(shí)or中斷拿配置:" + call).getBytes(StandardCharsets.UTF_8));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
@Override
public void afterPropertiesSet() {
int cpuNums = Runtime.getRuntime().availableProcessors();
processExecutor
= new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
}
}
使用get方法反問的請(qǐng)求回被提交到線程池進(jìn)行await等待,使用post方法的請(qǐng)求回喚醒這些線程。
但是這個(gè)寫法有點(diǎn)脫褲子放屁
為什么會(huì)出現(xiàn)這種情況,直接提交到線程池異步執(zhí)行不可以么,加入我們刪除上面submit.get
方法會(huì)發(fā)現(xiàn)其實(shí)什么結(jié)果都不會(huì),這是因?yàn)楫惒教峤坏骄€程池后,tomcat已經(jīng)結(jié)束了這次請(qǐng)求,并沒有維護(hù)這個(gè)連接,所以沒有辦法寫回結(jié)果。
如果不刪除這一行,tomcat線程阻塞住我們可以寫回結(jié)果,但是其實(shí)沒有達(dá)到配置使用長(zhǎng)輪詢的初衷——"解放tomcat線程,讓配置中心服務(wù)端可以處理更多請(qǐng)求"。
所以我們現(xiàn)在陷入一個(gè)尷尬的境地,怎么解決昵?看下去
三丶Tomcat Servlet 3.0長(zhǎng)輪詢?cè)?/h2>
1.AsyncContext實(shí)現(xiàn)長(zhǎng)輪詢
package com.cuzzz.springbootlearn.longpull;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@RestController
@RequestMapping("long-pull3")
public class MyController2 {
private static final ScheduledExecutorService procesExecutor
= Executors.newSingleThreadScheduledExecutor();
/**
* 記錄配置改變的map
*/
private static final ConcurrentHashMap<String, String> configCache
= new ConcurrentHashMap<>();
/**
* 記錄長(zhǎng)輪詢的任務(wù)
*/
private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
= new ConcurrentLinkedDeque<>();
static {
//每2秒看一下釋放配置變更,或者任務(wù)超時(shí)
procesExecutor.scheduleWithFixedDelay(() -> {
List<AsyncTask>needRemove = new ArrayList<>();
for (AsyncTask asyncTask : interestQueue) {
if (asyncTask.timeout()) {
asyncTask.run();
needRemove.add(asyncTask);
continue;
}
if (configCache.containsKey(asyncTask.configId)) {
needRemove.add(asyncTask);
asyncTask.run();
}
}
interestQueue.removeAll(needRemove);
}, 1, 2, TimeUnit.SECONDS);
}
static class AsyncTask implements Runnable {
private final AsyncContext asyncContext;
private final long timeout;
private static long startTime;
private String configId;
AsyncTask(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
String timeStr = request.getHeader("time-out");
if (timeStr == null) {
timeout = 10;
} else {
timeout = Long.parseLong(timeStr);
}
//關(guān)注的配置key,應(yīng)該getParameter的,無所謂
this.configId = request.getHeader("config-id");
if (this.configId == null) {
this.configId = "default";
}
//開始時(shí)間
startTime = System.currentTimeMillis();
}
//是否超時(shí)
public boolean timeout() {
return (System.currentTimeMillis() - startTime) / 1000 > timeout;
}
@Override
public void run() {
String result = "開始于" + System.currentTimeMillis() + "--";
try {
if (timeout()) {
result = "超時(shí): " + result;
} else {
result += configCache.get(this.configId);
}
result += "--結(jié)束于:" + System.currentTimeMillis();
ServletResponse response = asyncContext.getResponse();
response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));
//后續(xù)將交給tomcat線程池處理,將給客戶端響應(yīng)
asyncContext.complete();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response) {
//打印處理的tomcate線程id
System.out.println("線程id" + Thread.currentThread().getId());
//添加一個(gè)獲取配置的異步任務(wù)
interestQueue.add(new AsyncTask(asyncContext));
//開啟異步
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
//監(jiān)聽器打印最后回調(diào)的tomcat線程id
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
System.out.println("線程id" + Thread.currentThread().getId());
}
//...剩余其他方法
});
//立馬就會(huì)釋放tomcat線程池資源
System.out.println("tomcat主線程釋放");
}
@PostMapping
public void post(HttpServletRequest request) {
String c = String.valueOf(request.getParameter("config-id"));
if (c.equals("null")){
c = "default";
}
String v = String.valueOf(request.getParameter("value"));
configCache.put(c, v);
}
}
package com.cuzzz.springbootlearn.longpull;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@RestController
@RequestMapping("long-pull3")
public class MyController2 {
private static final ScheduledExecutorService procesExecutor
= Executors.newSingleThreadScheduledExecutor();
/**
* 記錄配置改變的map
*/
private static final ConcurrentHashMap<String, String> configCache
= new ConcurrentHashMap<>();
/**
* 記錄長(zhǎng)輪詢的任務(wù)
*/
private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
= new ConcurrentLinkedDeque<>();
static {
//每2秒看一下釋放配置變更,或者任務(wù)超時(shí)
procesExecutor.scheduleWithFixedDelay(() -> {
List<AsyncTask>needRemove = new ArrayList<>();
for (AsyncTask asyncTask : interestQueue) {
if (asyncTask.timeout()) {
asyncTask.run();
needRemove.add(asyncTask);
continue;
}
if (configCache.containsKey(asyncTask.configId)) {
needRemove.add(asyncTask);
asyncTask.run();
}
}
interestQueue.removeAll(needRemove);
}, 1, 2, TimeUnit.SECONDS);
}
static class AsyncTask implements Runnable {
private final AsyncContext asyncContext;
private final long timeout;
private static long startTime;
private String configId;
AsyncTask(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
String timeStr = request.getHeader("time-out");
if (timeStr == null) {
timeout = 10;
} else {
timeout = Long.parseLong(timeStr);
}
//關(guān)注的配置key,應(yīng)該getParameter的,無所謂
this.configId = request.getHeader("config-id");
if (this.configId == null) {
this.configId = "default";
}
//開始時(shí)間
startTime = System.currentTimeMillis();
}
//是否超時(shí)
public boolean timeout() {
return (System.currentTimeMillis() - startTime) / 1000 > timeout;
}
@Override
public void run() {
String result = "開始于" + System.currentTimeMillis() + "--";
try {
if (timeout()) {
result = "超時(shí): " + result;
} else {
result += configCache.get(this.configId);
}
result += "--結(jié)束于:" + System.currentTimeMillis();
ServletResponse response = asyncContext.getResponse();
response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));
//后續(xù)將交給tomcat線程池處理,將給客戶端響應(yīng)
asyncContext.complete();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response) {
//打印處理的tomcate線程id
System.out.println("線程id" + Thread.currentThread().getId());
//添加一個(gè)獲取配置的異步任務(wù)
interestQueue.add(new AsyncTask(asyncContext));
//開啟異步
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
//監(jiān)聽器打印最后回調(diào)的tomcat線程id
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
System.out.println("線程id" + Thread.currentThread().getId());
}
//...剩余其他方法
});
//立馬就會(huì)釋放tomcat線程池資源
System.out.println("tomcat主線程釋放");
}
@PostMapping
public void post(HttpServletRequest request) {
String c = String.valueOf(request.getParameter("config-id"));
if (c.equals("null")){
c = "default";
}
String v = String.valueOf(request.getParameter("value"));
configCache.put(c, v);
}
}
上面演示利用AsyncContext
tomcat是如何實(shí)現(xiàn)長(zhǎng)輪詢
這種方式的優(yōu)勢(shì)在于:解放了tomcat線程,其實(shí)tomcat的線程只是運(yùn)行了get方法中的代碼,然后立馬可以去其他請(qǐng)求,真正獲取配置更改
的是我們的單線程定時(shí)2秒去輪詢。
2.實(shí)現(xiàn)原理
2.1 tomcat處理一個(gè)請(qǐng)求的流程
-
Connector是客戶端連接到Tomcat容器的服務(wù)點(diǎn),它提供協(xié)議服務(wù)來將引擎與客戶端各種協(xié)議隔離開來
在Connector組件中創(chuàng)建了Http11NioProtocol組件,Http11NioProtocol默認(rèn)持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,并且啟動(dòng)的時(shí)候會(huì)啟動(dòng)一個(gè)線程運(yùn)行Acceptor
-
Acceptor服務(wù)器端監(jiān)聽客戶端的連接,會(huì)啟動(dòng)線程一直執(zhí)行
每接收一個(gè)客戶端連接就輪詢一個(gè)Poller組件,添加到Poller組件的事件隊(duì)列中。,每接收一個(gè)客戶端連接就輪詢一個(gè)Poller組件,添加到Poller組件的事件隊(duì)列中。
-
Poller組件持有多路復(fù)用器selector,poller組件不停從自身的事件隊(duì)列中將事件取出注冊(cè)到自身的多路復(fù)用器上,同時(shí)多路復(fù)用器會(huì)不停的輪詢檢查是否有通道準(zhǔn)備就緒,準(zhǔn)備就緒的通道就可以扔給tomcat線程池處理了。
-
tomcat線程池處理請(qǐng)求
-
這里會(huì)根據(jù)協(xié)議創(chuàng)建不同的Processor處理,這里創(chuàng)建的是Http11Processor,Http11Processor會(huì)使用CoyoteAdapter去解析報(bào)文隨后交給Container去處理請(qǐng)求
-
CoyoteAdapter解析報(bào)文隨后交給Container去處理請(qǐng)求
-
Container會(huì)將Filter和Servlet組裝成FilterChain依次調(diào)用
-
FilterChain會(huì)依次調(diào)用Filter#doFilter,然后調(diào)用Servlet#service方法
至此會(huì)調(diào)用到Servlete#service方法,SpringMVC中的DispatcherServlet會(huì)反射調(diào)用我們controller的方法
-
2.2 AsyncContext 如何實(shí)現(xiàn)異步
2.2.1 request.startAsync() 修改異步狀態(tài)機(jī)狀態(tài)為Starting
AsycContext內(nèi)部持有一個(gè)AsyncStateMachine來管理異步請(qǐng)求的狀態(tài)(有點(diǎn)狀態(tài)模式的意思)
狀態(tài)機(jī)的初始狀態(tài)是AsyncState.DISPATCHED,通過setStarted將狀態(tài)機(jī)的狀態(tài)更新成STARTING
2.2.2 AbstractProtocol啟動(dòng)定時(shí)任務(wù)處理超時(shí)異步請(qǐng)求
Connector啟動(dòng)的時(shí)候觸發(fā)ProtocolHandler的start方法,如下
其中startAsyncTimeout方法會(huì)遍歷waitingProcessors中每一個(gè)Processor的timeoutAsync方法,這里的Processor就是Http11Processor
那么waitProcessors中的Http11Processor是誰塞進(jìn)去的昵?
tomcat線程在執(zhí)行完我們的Servlet代碼后,Http11NioProtocol會(huì)判斷請(qǐng)求狀態(tài),如果為L(zhǎng)ong那么會(huì)塞到waitProcessors集合中。
如果發(fā)現(xiàn)請(qǐng)求超時(shí),那么會(huì)調(diào)用Http11Processor#doTimeoutAsycn
然后由封裝的socket通道socketWrapper以TIMEOUT的事件類型重新提交到tomcat線程池中。
2.2.3 AsyncContext#complete觸發(fā)OPEN_READ事件
可以看到其實(shí)和超時(shí)一樣,只不過超時(shí)是由定時(shí)任務(wù)線程輪詢來判斷,而AsyncContext#complete則是我們業(yè)務(wù)線程觸發(fā)processSocketEvent將后續(xù)處理提交到tomcat線程池中。
四丶長(zhǎng)輪詢的優(yōu)點(diǎn)和缺點(diǎn)
本文學(xué)習(xí)了長(zhǎng)輪詢和tomcat長(zhǎng)輪詢的原理,可以看到這種方式的優(yōu)點(diǎn)
- 瀏覽器長(zhǎng)輪詢的過程中,請(qǐng)求并沒有立即響應(yīng),而是等到超時(shí)或者有需要返回的數(shù)據(jù)(比如配置中心在這個(gè)超時(shí)事件內(nèi)發(fā)送配置的變更)才返回,解決了短輪詢頻繁進(jìn)行請(qǐng)求網(wǎng)絡(luò)開銷的問題,減少了讀多寫少業(yè)務(wù)情景下無意義請(qǐng)求。
- 真是通過這種方式,減少了無意義的請(qǐng)求,而且釋放了tomcat線程池中的線程,使得我們服務(wù)端可以支持更多的客戶端(因?yàn)闃I(yè)務(wù)邏輯是放在其他的線程池執(zhí)行的,而且對(duì)于配置中心來說,可以讓多個(gè)客戶端的長(zhǎng)輪詢請(qǐng)求由一個(gè)線程去處理,原本是一個(gè)請(qǐng)求一個(gè)tomcat線程處理,從而可以支持更多的請(qǐng)求)
當(dāng)然這種方式也是有缺點(diǎn)的
-
hold住請(qǐng)求也是會(huì)消耗資源的,如果1w個(gè)請(qǐng)求同時(shí)到來,我們都需要hold?。ǚ庋b成任務(wù)塞到隊(duì)列)這寫任務(wù)也是會(huì)占用內(nèi)存的,而短輪詢則會(huì)立馬返回,從而時(shí)間資源的釋放
-
請(qǐng)求先后順序無法保證,比如輪詢第五個(gè)客戶端的請(qǐng)求的時(shí)候,出現(xiàn)了配置的變更,這時(shí)候第五個(gè)請(qǐng)求會(huì)被提交到tomcat線程池中,從而早于前面四個(gè)請(qǐng)求得到響應(yīng),這對(duì)于需要嚴(yán)格有序的業(yè)務(wù)場(chǎng)景是有影響的
-
多臺(tái)實(shí)例監(jiān)聽配置中心實(shí)例,出現(xiàn)不一致的情況文章來源:http://www.zghlxwxcb.cn/news/detail-415529.html
比如配置中心四臺(tái)實(shí)例監(jiān)聽配置變更,前三臺(tái)可能響應(yīng)了得到V1的配置,但是輪詢到第四臺(tái)實(shí)例的請(qǐng)求的時(shí)候又發(fā)生了變更可能就得到了v2的配置,這時(shí)候這四臺(tái)配置不一致了。需要保證這種一致性需要我們采取其他的策略,比如配置中心服務(wù)端主動(dòng)udp推,或者加上版本號(hào)保證這四臺(tái)配置一致。文章來源地址http://www.zghlxwxcb.cn/news/detail-415529.html
到了這里,關(guān)于Tomcat長(zhǎng)輪詢?cè)砼c源碼解析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!