這篇文章當(dāng)時(shí)寫得比較匆忙,這里進(jìn)行一下更深入的補(bǔ)充
SSE 技術(shù)不是什么新鮮東西,就是一個(gè) HTTP 請(qǐng)求和響應(yīng),關(guān)鍵就是響應(yīng)這個(gè)環(huán)節(jié),原始的響應(yīng)都是一次性的,普通的響應(yīng)是這樣的:
- Nginx 是一個(gè)靜態(tài)服務(wù)器,所謂靜態(tài)服務(wù)器,就是將一個(gè)靜態(tài)文件按照大小不同情況選擇不同的方式傳輸給瀏覽器,小的文件,一次性發(fā)過去,大的文件則會(huì)采用數(shù)據(jù)塊的方式傳給瀏覽器。
- 再次強(qiáng)調(diào)下沒有什么 http 長鏈接和短鏈接的概念,對(duì)該問題認(rèn)識(shí)不清楚的可以再看我這篇足夠詳細(xì)地文章Http/Websocket協(xié)議的長連接和短連接的錯(cuò)誤認(rèn)識(shí)詳細(xì)解讀_森葉的博客-CSDN博客
- 建立鏈接之后,你所感知的 HTTP 請(qǐng)求完畢就關(guān)掉了,這完全是瀏覽器或者服務(wù)端的一種處理方式,在內(nèi)存極其稀缺的互聯(lián)網(wǎng)早期,及時(shí)地銷毀一個(gè) Socket 對(duì)象都是非常必要的。
- 到達(dá)現(xiàn)在內(nèi)存不值錢的時(shí)代,我們才敢大膽的將一個(gè) Socket 對(duì)象長時(shí)間保持,也就是目前 Websocket 大行其道的時(shí)代,理論上誰不喜歡長時(shí)間雙向通信的 Websocket呢,全雙工是網(wǎng)絡(luò)通信一開始就有的概念。
- HTTP 協(xié)議下的鏈接絕不是只互相通信了一次就結(jié)束了,是會(huì)有多次通信的,例如服務(wù)端向?yàn)g覽器索要賬號(hào)密碼的過程,這就是二次溝通了,瀏覽器將賬號(hào)密碼發(fā)回服務(wù)端,服務(wù)端再繼續(xù)為瀏覽器進(jìn)行服務(wù)。
- 理論上如果沒有任何關(guān)閉機(jī)制存在,只要返回的內(nèi)容沒有結(jié)束符\r\n\r\n存在,瀏覽器都不會(huì)主動(dòng)關(guān)閉這次請(qǐng)求事件。所以這樣來看,HTTP 協(xié)議下的連接具有了長鏈接的基礎(chǔ)了。
- Nginx 在收到后端服務(wù)器發(fā)過來的響應(yīng)報(bào)文時(shí),只要沒有收到結(jié)束符,就不會(huì)主動(dòng)關(guān)閉這次鏈接,會(huì)一直將后端服務(wù)器發(fā)送的數(shù)據(jù)推給瀏覽器,瀏覽器端就能連續(xù)地接到數(shù)據(jù)過去,這就是網(wǎng)頁被稱為流的概念。
Nginx 與后端服務(wù)器之間的通信
- Nginx 通過 socket 連接到后端服務(wù)器,會(huì)有一個(gè) send 函數(shù),將前端請(qǐng)求信息發(fā)送給后端服務(wù)器,同時(shí)用 `onmessage`?函數(shù)接收后端服務(wù)器的返回報(bào)文,平時(shí)一旦
- 后端服務(wù)器收到 nginx 發(fā)送過來的請(qǐng)求,則會(huì)開一個(gè)線程/協(xié)程來處理請(qǐng)求,像 Django 這類同步框架,則是由并發(fā)服務(wù)器uWSGI拉起 Django 執(zhí)行邏輯,獲取數(shù)據(jù),然后將結(jié)果交付給 Nginx,Nginx 收到后將結(jié)果發(fā)送回瀏覽器。
- 針對(duì)異步框架,nginx 的處理機(jī)制是相同的,例如 FastAPI,區(qū)別在于并發(fā)服務(wù)器使用的是 uvicorn,uvicorn 接入 FastAPI,在線程中調(diào)用 fastapi 的入口文件,兵器使用 await 進(jìn)行等待,數(shù)據(jù)處理完成則返回給 Nginx,所以同步/異步框?qū)?nginx 來說是一樣的東西。
- 在處理 websocket協(xié)議時(shí),Nginx 不會(huì)立即關(guān)閉和后端服務(wù)器的連接,后端服務(wù)器的獨(dú)立線程也會(huì)一直開著,所以開多了,性能上就吃不消,例如同時(shí)在線 1000個(gè)人,那么就有 3000?個(gè)協(xié)程在開著,一個(gè)是來自 nginx 和瀏覽器之間的,一個(gè)是 nginx 和后端服務(wù)器的,另外還有后端服務(wù)器啟動(dòng)的1000 個(gè)常駐協(xié)程用來輪詢消息隊(duì)列,發(fā)送給 nginx,所以,用 FastAPI來做 websocket 很耗費(fèi)性能
- 針對(duì) Hyperf 來說要好些,一次處理完后,并不需要建立 while 進(jìn)行循環(huán)等消息,而Hyperf 本身等價(jià)于 uvicorn,等有隊(duì)列消息出現(xiàn)時(shí),是由某一進(jìn)程 trigger 的,同時(shí)從 Hyperf 層面拿 socket給 nginx 發(fā)消息,自始至終只有一個(gè)while 來輪 redis,統(tǒng)一分發(fā)給各個(gè)消費(fèi)者進(jìn)行消費(fèi),消費(fèi)者又能直接獲取 Hyperf從而拿到 socket 給 nginx 發(fā)送消息,因此性能上得到極大的提升。
評(píng)論中有人問是否直接用 StreamResponse 就行了,如果你自己打算封裝一個(gè)StreamResponse,并且可以反復(fù)發(fā)送多次,差不多就實(shí)現(xiàn)了與EventSourceResponse同樣的效果吧!
nginx如果要支持SSE,要調(diào)整一些參數(shù),比較重要的就是要避免緩存,之前 nginx 處理的都是一次性的響應(yīng),例如網(wǎng)頁,所以會(huì)盡量把內(nèi)容緩存起來一次性發(fā)送到客戶端,所以這里要關(guān)掉緩存
conf 配置文件,這里做了修正
http {
...
server {
...
location /sse {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# SSE 連接時(shí)的超時(shí)時(shí)間
proxy_read_timeout 86400s;
# 取消緩沖
proxy_buffering off;
# 關(guān)閉代理緩存
proxy_cache off;
# 禁用分塊傳輸編碼
#chunked_transfer_encoding off
# 反向代理到 SSE 應(yīng)用的地址和端口
proxy_pass http://backend-server;
}
...
}
...
}
-
proxy_http_version 1.1 才支持 keep-alive,保持連接不中斷
-
在
location /sse
塊內(nèi),設(shè)置代理頭部以確保不會(huì)對(duì)SSE響應(yīng)進(jìn)行緩存。Cache-Control
和Content-Type
頭部用于告訴瀏覽器這是一個(gè)SSE連接。 -
使用
proxy_buffering off;
來禁用Nginx的緩沖,以確保SSE響應(yīng)立即被傳遞給客戶端。 -
使用
proxy_pass
指令將SSE請(qǐng)求代理到后端服務(wù)器的SSE端點(diǎn)。請(qǐng)將http://backend_server;
替換為你的后端服務(wù)器地址和SSE端點(diǎn)路徑。 -
設(shè)置連接超時(shí)時(shí)間,以避免不活動(dòng)連接被Nginx關(guān)閉。在這里,我設(shè)置了一個(gè)較長的超時(shí)時(shí)間(3600秒),以便連接可以保持較長時(shí)間。你可以根據(jù)你的需求進(jìn)行調(diào)整。
SSE 和 Websocket 的區(qū)別?
Server-Sent Events(SSE)和WebSocket是兩種用于實(shí)現(xiàn)實(shí)時(shí)通信的不同技術(shù),它們?cè)谀承┓矫嬗邢嗨浦帲泊嬖谝恍╆P(guān)鍵的區(qū)別:
Server-Sent Events (SSE):
-
單向通信:SSE是一種單向通信機(jī)制,其中服務(wù)器向客戶端發(fā)送數(shù)據(jù)??蛻舳私邮辗?wù)器推送的數(shù)據(jù),但不能向服務(wù)器發(fā)送數(shù)據(jù)。這使得它適合用于服務(wù)器向客戶端的實(shí)時(shí)通知或事件推送。
-
基于HTTP:SSE建立在標(biāo)準(zhǔn)的HTTP/HTTPS協(xié)議之上,使用普通的HTTP請(qǐng)求和響應(yīng)來實(shí)現(xiàn)。它不需要特殊的協(xié)議升級(jí)。
-
文本數(shù)據(jù):SSE主要用于發(fā)送文本數(shù)據(jù)。服務(wù)器可以將文本事件推送到客戶端,客戶端通過監(jiān)聽事件來接收數(shù)據(jù)。每個(gè)事件通常包含一個(gè)標(biāo)識(shí)符、數(shù)據(jù)字段和可選的注釋字段。
-
瀏覽器支持:SSE在現(xiàn)代瀏覽器中得到廣泛支持,不需要額外的JavaScript庫或框架??蛻舳耸褂?code>EventSourceAPI來與SSE服務(wù)端通信。
WebSocket:
-
雙向通信:WebSocket是一種雙向通信協(xié)議,允許客戶端和服務(wù)器之間進(jìn)行雙向數(shù)據(jù)交換??蛻舳丝梢韵蚍?wù)器發(fā)送數(shù)據(jù),服務(wù)器也可以主動(dòng)向客戶端推送數(shù)據(jù)。
-
獨(dú)立協(xié)議:WebSocket是一種獨(dú)立的協(xié)議,與HTTP不同。建立WebSocket連接需要進(jìn)行協(xié)議升級(jí),然后在一個(gè)持久連接上進(jìn)行數(shù)據(jù)交換。
-
二進(jìn)制和文本數(shù)據(jù):WebSocket支持二進(jìn)制和文本數(shù)據(jù)的傳輸,因此可以用于多種類型的數(shù)據(jù)交換,包括游戲、實(shí)時(shí)聊天和多媒體流等。
-
瀏覽器支持:WebSocket在現(xiàn)代瀏覽器中得到廣泛支持,同時(shí)也有許多服務(wù)器端和客戶端庫可用于各種編程語言和環(huán)境中。
服務(wù)端推送事件消息
服務(wù)端判斷前端要建立 SSE 連接的識(shí)別標(biāo)識(shí)是 Accept: text/event-stream,該標(biāo)識(shí)代表前端會(huì)接收后端發(fā)過來的事件流
GET /sse-endpoint HTTP/1.1
Host: example.com
Accept: text/event-stream
連接后因?yàn)橄⒅辽賲^(qū)分兩種,message 和 close,所以就把推送消息做成了一種結(jié)構(gòu):
event: message
data: Hello, this is a message!
?如果是 close 事件
event: close
data:
前端接收關(guān)閉事件
const eventSource = new EventSource('/sse-endpoint');
eventSource.onmessage = function(event) {
// 處理事件消息
};
eventSource.onerror = function(event) {
if (event.readyState === EventSource.CLOSED) {
// 服務(wù)器關(guān)閉了連接
console.log('SSE連接已關(guān)閉');
// 可以嘗試重新建立連接
}
};
正式開始
OpenAI 官方給我了一個(gè)超簡單的文檔,還直接用curl的方式搞得,真是能多省就多省,大家可以使用apifox 或者 postman 將curl 轉(zhuǎn)成 fetch 或者 request 等自己能看懂的代碼,當(dāng)然也可以自己自學(xué)一下curl的命令,如果你能訪問OpenAI,可以點(diǎn)下面的鏈接,自己看看
https://platform.openai.com/docs/api-reference/chat/createhttps://platform.openai.com/docs/api-reference/chat/create
?大家如果對(duì)上面的雙語翻譯感興趣,我推薦一個(gè)技術(shù)大佬的免費(fèi)插件,沉浸式翻譯
https://chrome.google.com/webstore/detail/immersive-translate/bpoadfkcbjbfhfodiogcnhhhpibjhbnhhttps://chrome.google.com/webstore/detail/immersive-translate/bpoadfkcbjbfhfodiogcnhhhpibjhbnh
?其中有個(gè) stream 使用講解,stream這個(gè)東西,我之前也沒用過,經(jīng)過學(xué)習(xí)后,發(fā)現(xiàn)這東西一直都存在就是一個(gè)content-type格式,只是我們?cè)瓉頉]有注意過,我們都是用urlencode或者json格式來處理數(shù)據(jù)的,其實(shí)可以以二進(jìn)制的方式,發(fā)過來,然后你再自行處理。
我發(fā)現(xiàn)了一個(gè)大佬,開源了一個(gè)插件,從中窺見了SSE的使用案例,大家有興趣,可以看另外一篇SSE的學(xué)習(xí)案例,這里不對(duì)前端再做深入的討論了
ChatGPT API SSE(服務(wù)器推送技術(shù))和 Fetch 請(qǐng)求 Accept: text/event-stream 標(biāo)頭案例_森葉的博客-CSDN博客在需要接收服務(wù)器實(shí)時(shí)推送的數(shù)據(jù)時(shí),我們可以使用 `fetch()` 方法和 `EventSource` API 進(jìn)行處理。使用 `fetch()` 方法并在請(qǐng)求頭中添加 `Accept: text/event-stream` 可以告訴服務(wù)器我們想要接收 Server-Sent Events (SSE) 格式的數(shù)據(jù)流。`fetch()` 對(duì)流處理有良好的支持,我們可以使用 `body` 屬性來讀取 SSE 消息,同時(shí)也可以利用 `fetch()` 的其他功能如超時(shí)控制、請(qǐng)求重試等。缺點(diǎn)是需要手動(dòng)解析數(shù)據(jù)、https://blog.csdn.net/wangsenling/article/details/130490769Python 端官方提供了openai 庫,這個(gè)也是開源的,大家可以找到看看
GitHub - openai/openai-python: The OpenAI Python library provides convenient access to the OpenAI API from applications written in the Python language.The OpenAI Python library provides convenient access to the OpenAI API from applications written in the Python language. - GitHub - openai/openai-python: The OpenAI Python library provides convenient access to the OpenAI API from applications written in the Python language.https://github.com/openai/openai-python/我沒怎么看,但看起來沒有給stream案例,只是給了request的案例,如果只是request的那其實(shí)就挺簡單了,就沒啥講的了
SSE 的用途
- ?通過 SSE,建立鏈接后,可以將推送消息放進(jìn) redis hash 中,每個(gè)連接對(duì)象定時(shí)掃描自己的 hash 看是否有信息需要推送給前端;
- 利用異步請(qǐng)求 aiohttp 與 chatgpt 建立連接,通過while+生成器+Future的方式實(shí)現(xiàn)異步數(shù)據(jù)的處理。
不用官網(wǎng)的openai庫,根據(jù)開發(fā)文檔,直接發(fā)送request請(qǐng)求也可以,這里給的是一位大佬的請(qǐng)求方式,用的是httpx,大家可以自行學(xué)習(xí)下,知乎有一篇比較文
淺度測評(píng):requests、aiohttp、httpx 我應(yīng)該用哪一個(gè)? - 知乎在 Python 眾多的 HTTP 客戶端中,最有名的莫過于 requests、aiohttp和httpx。在不借助其他第三方庫的情況下,requests只能發(fā)送同步請(qǐng)求;aiohttp只能發(fā)送異步請(qǐng)求;httpx既能發(fā)送同步請(qǐng)求,又能發(fā)送異步請(qǐng)求。所…https://zhuanlan.zhihu.com/p/103711201
核心參數(shù)截圖
?請(qǐng)求主體代碼截圖
?AI Claude 給的講解
?
?這是一個(gè)生成器函數(shù),通過yield函數(shù),yield 很多地方都講得很晦澀難懂,《你不知道的javascript》中非常簡潔地說,這就是一個(gè)return,只是對(duì)于生成器來說,return次數(shù)要進(jìn)行多次,所以搞了一個(gè)yield用來區(qū)分同步函數(shù)的return,而且return的意義還有停止下面的代碼,而返回?cái)?shù)據(jù)的意思,兩者還是有點(diǎn)差異,但是yield就是return,多次返回的return
核心庫EventSourceResponse
from sse_starlette import EventSourceResponse
?
AI給出的EventSourceResponse解讀,自己也可以把EventSourceResponse源碼丟給Claude,讓其看過,給你解讀,都是好方法
下面給下EventSourceResponse的FastAPI簡單案例代碼,讓大家玩起來
import uvicorn
import asyncio
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
times = 0
app = FastAPI()
origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/sse/data")
async def root(request: Request):
event_generator = status_event_generator(request)
return EventSourceResponse(event_generator)
status_stream_delay = 1 # second
status_stream_retry_timeout = 30000 # milisecond
# 其實(shí)就是綁定函數(shù)事件 一直在跑循環(huán)
async def status_event_generator(request):
global times
while True:
if not await request.is_disconnected() == True:
yield {
"event": "message",
"retry": status_stream_retry_timeout,
"data": "data:" + "times" + str(times) + "\n\n"
}
print("alive")
times += 1
await asyncio.sleep(status_stream_delay)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level='info')
大家對(duì)照著上面的講解,就能把代碼搞出來文章來源:http://www.zghlxwxcb.cn/news/detail-480044.html
fetch('http://localhost:8000/sse/data', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
text: "hello"
})
}).then(async response=>{
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader()
while (true) {
let {value, done} = await reader.read();
if (done)
break;
if (value.indexOf("data:") < 0 || value.indexOf('event: ping') >= 0)
continue;
// console.log('Received~~:', value);
let values = value.split("\r\n")
for (let i = 0; i < values.length; i++) {
let _v = values[i].replace("data:", "")
// console.log(_v)
if (_v.trim() === '')
continue
console.log(_v)
}
}
}
).catch(error=>{
console.error(error);
}
);
因?yàn)樽约旱臉I(yè)務(wù)代碼有很多鑒權(quán)和數(shù)據(jù)庫操作,就不便放出來了,大家根據(jù)自己的所需,可以在這個(gè)簡單的代碼基礎(chǔ)上,只要自己寫生成器函數(shù)即可文章來源地址http://www.zghlxwxcb.cn/news/detail-480044.html
到了這里,關(guān)于OpenAI ChatGPT API + FaskAPI SSE Stream 流式周轉(zhuǎn)技術(shù) 以及前端Fetch 流式請(qǐng)求獲取案例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!