一、背景
接到一個需求,實現(xiàn)方案時需要提供一個HTTP接口,接口需要hold住5-8秒,輪詢查詢數(shù)據(jù)庫,一旦數(shù)據(jù)庫中值有變化,取出變化的值進行處理,處理完成后返回響應(yīng)。這不就是長輪詢嗎,如何優(yōu)雅的實現(xiàn)呢?
在這之前先簡單介紹下長連接和短連接
HTTP長鏈接(Keep-Alive)
-
概念: HTTP長鏈接是指客戶端與服務(wù)器在一次TCP連接上可以傳輸多個HTTP請求和響應(yīng)。在請求完成后,連接不會立即關(guān)閉,而是保持開放狀態(tài),等待可能的后續(xù)請求。
-
優(yōu)勢:
- 減少延遲: 長鏈接避免了每次請求都需要重新建立TCP連接的開銷,降低了通信延遲。
- 減少資源占用: 不需要頻繁地打開和關(guān)閉連接,減少了服務(wù)器資源的占用。
-
應(yīng)用場景:
- 實時性要求高的應(yīng)用: 長鏈接適用于需要實時性響應(yīng)的應(yīng)用,例如即時通訊、實時更新等。
- 資源加載優(yōu)化: 在Web開發(fā)中,適用于多個資源(如圖片、樣式表、腳本)的同時加載。
HTTP短連接
-
概念: HTTP短連接是指每個HTTP請求都需要建立一個新的TCP連接,請求完成后立即關(guān)閉連接。
-
優(yōu)勢:
- 簡單: 短連接模式相對簡單,易于理解和實現(xiàn)。
- 更好的控制: 對于某些資源密集型的應(yīng)用,短連接可以更好地控制資源的釋放。
-
應(yīng)用場景:
- 低并發(fā)場景: 當(dāng)并發(fā)請求數(shù)較低時,短連接可能更適用,因為它避免了長鏈接的開銷。
- 資源密集型應(yīng)用: 對于服務(wù)器資源消耗較大的應(yīng)用,短連接可能更容易控制資源的釋放。
何為輪詢
所謂輪詢,即是在一個循環(huán)周期內(nèi)不斷發(fā)起請求來得到數(shù)據(jù)的機制。只要有請求的的地方,都可以實現(xiàn)輪詢,譬如各種事件驅(qū)動模型。它的長短是在于某次請求的返回周期。
1. 短輪詢
短輪詢指的是在循環(huán)周期內(nèi),不斷發(fā)起請求,每一次請求都立即返回結(jié)果,根據(jù)新1日數(shù)據(jù)對比決定是否使用這個結(jié)果。
2. 長輪詢
而長輪詢及是在請求的過程中,若是服務(wù)器端數(shù)據(jù)并沒有更新,那么則將這個連接掛起,直到服務(wù)器推送新的數(shù)據(jù),再返回,然后再進入循環(huán)周期。
長短輪詢和長短連接的區(qū)別
1. 第一個區(qū)別是決定的方式,
一個TCP連接是否為長連接,是通過設(shè)置HTTP的Connection Header來決定的,而且是需要兩邊都設(shè)置才有效。而一種輪詢方式是否為長輪詢,是根據(jù)服務(wù)端的處理方式來決定的,與客戶端沒有關(guān)系。
2. 第二個區(qū)別就是實現(xiàn)的方式
連接的長短是通過協(xié)議來規(guī)定和實現(xiàn)的。而輪詢的長短,是服務(wù)器通過編程的方式手動掛起請求來實現(xiàn)的。
二、方案設(shè)計
在 Spring 中,AsyncContext 是用于支持異步處理的一個重要的特性。它允許我們在 servlet 請求處理過程中,將長時間運行的操作放在一個單獨的線程中執(zhí)行,而不會阻塞其他請求的處理。
AsyncContext 在以下兩種情況下特別有用:
-
長時間運行的操作:當(dāng)我們需要執(zhí)行一些耗時的操作,例如網(wǎng)絡(luò)請求、數(shù)據(jù)庫查詢或其他 I/O 操作時,通過將這些操作放在一個新的線程中,可以避免阻塞 servlet 容器中的線程,提高應(yīng)用的并發(fā)性能。
-
推送異步響應(yīng):有時候,我們可能需要推送異步產(chǎn)生的響應(yīng),而不是等到所有操作都完成后再下發(fā)響應(yīng)。通過 AsyncContext,我們可以在任何時間點上觸發(fā)異步響應(yīng),將結(jié)果返回給客戶端。
使用 AsyncContext 的步驟如下:
- 在 servlet 中啟用異步模式:在 servlet 中,通過調(diào)用?
startAsync()
?方法,可以獲取到當(dāng)前請求的 AsyncContext 對象,從而啟用異步處理模式。
HttpServletRequest request = ...;
AsyncContext asyncContext = request.startAsync();
- 指定異步任務(wù):通過調(diào)用 AsyncContext 對象的?
start()
?方法,在新的線程中執(zhí)行需要異步處理的任務(wù)。
asyncContext.start(() -> {
// 異步任務(wù)邏輯
});
- 提交響應(yīng):在異步任務(wù)完成后,可以調(diào)用 AsyncContext 對象的?
complete()
?方法,以表示異步操作完成。
asyncContext.complete();
需要注意的是,我們在使用 AsyncContext 時需要特別注意線程安全。由于異步任務(wù)在單獨的線程中執(zhí)行,所以可能存在并發(fā)問題。因此,在編寫異步任務(wù)邏輯時,需要注意線程安全性,使用合適的同步措施。
另外,AsyncContext 也支持超時設(shè)置、錯誤處理、事件監(jiān)聽等功能,這些可以通過相應(yīng)的方法和回調(diào)進行配置??梢愿鶕?jù)具體的需求使用這些功能來優(yōu)化異步處理的邏輯。文章來源:http://www.zghlxwxcb.cn/news/detail-702762.html
總結(jié)來說,Spring 的 AsyncContext 提供了方便的異步處理機制,可以提高應(yīng)用的并發(fā)性能,并支持推送異步響應(yīng),使得應(yīng)用更具有響應(yīng)性和可伸縮性。文章來源地址http://www.zghlxwxcb.cn/news/detail-702762.html
三、方案1
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.concurrent.*;
@RestController
@RequestMapping("/api/test")
@Slf4j
public class AsyncTestController {
@Resource
private RedisTemplate<String, String> redisTemplate;
private final ExecutorService timeoutChecker = new ThreadPoolExecutor(1,1,1000,TimeUnit.SECONDS,new ArrayBlockingQueue<>(1000));
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
// private static boolean result = false;
@PostMapping("/async")
public void async(HttpServletRequest request, HttpServletResponse response) {
// 創(chuàng)建AsyncContext
AsyncContext asyncContext = request.startAsync(request, response);
// 設(shè)置處理超時時間8s
asyncContext.setTimeout(8000L);
// asyncContext監(jiān)聽
AsyncTestListener asyncListener = new AsyncTestListener(redisTemplate,asyncContext);
asyncContext.addListener(asyncListener);
// 定時處理業(yè)務(wù),處理成功后asyncContext.complete();完成異步請求
asyncContext.start(asyncListener);
}
// 模擬業(yè)務(wù)處理完成
@PostMapping("/set")
public ResultModel notify(String key, String value) {
redisTemplate.opsForValue().set(key, value);
return ResultModel.success();
}
@PostMapping("/get")
public ResultModel get(String key) {
String s = redisTemplate.opsForValue().get(key);
return ResultModel.success(s);
}
@PostMapping("/del")
public ResultModel del(String key) {
redisTemplate.delete(key);
return ResultModel.success();
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import java.io.IOException;
@Slf4j
public class AsyncTestListener implements AsyncListener,Runnable {
boolean isComplete;
private RedisTemplate<String, String> redisTemplate;
private AsyncContext asyncContext;
public JdAsyncTestListener(RedisTemplate<String, String> redisTemplate, AsyncContext asyncContext) {
this.redisTemplate = redisTemplate;
this.asyncContext = asyncContext;
}
@Override
public void run() {
try {
while(true){
if(isComplete){
log.info("已經(jīng)退出");
break;
}
boolean b = redisTemplate.opsForValue().get(1) != null;
log.info("獲取標(biāo)志位:"+b);
Thread.sleep(300);
if (b) {
asyncContext.getResponse().getWriter().print(1);
asyncContext.complete();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
log.info("結(jié)束了");
isComplete = true;
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
log.info("超時了");
isComplete = true;
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
}
}
四、方案2
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.dubbo.config.annotation.Reference;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@Validated
@RestController
@RequestMapping("/api/test")
@Slf4j
public class TestController {
@Resource
private RedisTemplate<String, String> redisTemplate;
private final ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(10, threadFactory);
private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d").build();
private static boolean result = false;
private final boolean isTimeout = false;
/**
* 消息
*
* @return
*/
@PostMapping("/test")
public void callback(@RequestBody TestLongPollRequest testLongPollRequest, HttpServletRequest request, HttpServletResponse response) {
// 創(chuàng)建AsyncContext
AsyncContext asyncContext = request.startAsync(request, response);
String test = LongPollRequest.getctomerId();
// 設(shè)置處理超時時間8s
asyncContext.setTimeout(8000L);
// asyncContext監(jiān)聽
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent asyncEvent) throws IOException {
log.info("onComplete={}", asyncEvent);
}
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException {
log.info("onTimeout={}", asyncEvent);
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("code", "500"); asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
asyncContext.complete();
}
@Override
public void onError(AsyncEvent asyncEvent) throws IOException {
log.info("onError={}", asyncEvent);
}
@Override
public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
log.info("onStartAsync={}", asyncEvent);
}
});
// 定時處理業(yè)務(wù),處理成功后asyncContext.complete();完成異步請求
timeoutChecker.scheduleAtFixedRate(() -> {
try {
String redisKey = getRedisKey(customerId);
String redisValue = redisTemplate.opsForValue().get(redisKey);
result = StringUtils.isNotBlank(redisValue);
if (result) {
//todo 長輪詢查詢數(shù)據(jù)庫。通過customerId查詢
send(test, redisValue);
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
map.put("code", "200");
map.put("msg", redisValue);
asyncContext.getResponse().getWriter().print(JSON.toJSONString(map));
asyncContext.complete();
}
} catch (IOException e) {
e.printStackTrace();
}
}, 0, 100L, TimeUnit.MILLISECONDS);
}
/**
* 發(fā)送消息
*/
private void send(String test, String content) {
}
}
到了這里,關(guān)于AsyncContext優(yōu)雅實現(xiàn)HTTP長輪詢接口的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!