一、SSE概念
SSE(Server Sent Event),直譯為服務(wù)器發(fā)送事件,顧名思義,也就是客戶端可以獲取到服務(wù)器發(fā)送的事件。我們常見的 http 交互方式是客戶端發(fā)起請(qǐng)求,服務(wù)端響應(yīng),然后一次請(qǐng)求完畢;但是在 sse 的場(chǎng)景下,客戶端發(fā)起請(qǐng)求,連接一直保持,服務(wù)端有數(shù)據(jù)就可以返回?cái)?shù)據(jù)給客戶端,這個(gè)返回可以是多次間隔的方式
二、SSE應(yīng)用場(chǎng)景
在web端消息推送功能中,由于傳統(tǒng)的HTTP協(xié)議是由客戶端主動(dòng)發(fā)起請(qǐng)求,服務(wù)端才會(huì)響應(yīng)?;镜腶jax輪詢技術(shù)便是如此。而在SSE中,瀏覽發(fā)送一個(gè)請(qǐng)求給服務(wù)端,通過(guò)響應(yīng)頭中的Content-Type:text/event-stream等向客戶端聲名這是一個(gè)長(zhǎng)連接,發(fā)送的是流數(shù)據(jù),這樣客戶端就不會(huì)關(guān)閉連接,一直等待服務(wù)端發(fā)送數(shù)據(jù)。
如果服務(wù)器返回的數(shù)據(jù)中包含了事件標(biāo)識(shí)符,瀏覽器會(huì)記錄最后一次接收的事件的標(biāo)識(shí)符。如果與服務(wù)器的連接中斷,當(dāng)瀏覽器再次進(jìn)行連接時(shí),會(huì)通過(guò)頭來(lái)聲明最后一次接收的事件的標(biāo)識(shí)符。服務(wù)器端可以通過(guò)瀏覽器發(fā)送的事件標(biāo)識(shí)符來(lái)確定從哪個(gè)事件來(lái)繼續(xù)連接
三、前端使用方法、問(wèn)題
1、get方式
使用eventsource完成get請(qǐng)求
缺點(diǎn):客戶端無(wú)法通過(guò)一個(gè)get請(qǐng)求完成數(shù)據(jù)傳遞
參考文檔:
EventSource - Web API 接口參考 | MDN
實(shí)現(xiàn)流程:
- 后端提供了兩個(gè)接口,一個(gè)是:post,用以完成前端信息的傳遞,我這邊是做大語(yǔ)言模型的,所以包括了模型必要參數(shù)、問(wèn)題等;二、get接口,完成流式輸出的接口,配置相應(yīng)的具名事件、請(qǐng)求頭等
- 前端通過(guò)調(diào)用post接口拿到本次會(huì)話id,將id攜帶在get請(qǐng)求里,完成信息傳遞
- 前端處理SSE流式返回
代碼實(shí)現(xiàn):
const eventSourceRef = useRef<any>(null)
const contact = async (messageData: any) => {
eventSourceRef.current = new EventSource(
`${API_BASE}/v1/model/stream?id=${id}`,
)
if (!eventSourceRef.current) return
// 監(jiān)聽 SSE 事件,因?yàn)楹蠖硕x了具名事件,所以這兒要用addEventListener監(jiān)聽,而不是onmessage
eventSourceRef.current.addEventListener('add', function (e: any) {
// 處理數(shù)據(jù)展示
})
eventSourceRef.current.addEventListener('finish', function (e: any) {
// 結(jié)束標(biāo)識(shí)finish
eventSourceRef.current.close() // 關(guān)閉連接
})
eventSourceRef.current.addEventListener('error', function (e: any) {
if (e.status === 401) {
// 用戶登錄狀態(tài)失效處理
}
// error報(bào)錯(cuò)處理
console.log('Error occurred:', e)
// 關(guān)閉連接
eventSourceRef.current.close()
})
}
2、post方式
使用fetch-event-source完成連接,僅需一個(gè)接口,支持添加請(qǐng)求頭
缺點(diǎn):在瀏覽器返回的text/eventstream里看不到具體返回,無(wú)法進(jìn)行預(yù)覽
參考文檔:
@microsoft/fetch-event-source
實(shí)現(xiàn)流程:
- 后端提供一個(gè)接口,支持前端傳參、流式返回
- 前端通過(guò)fetch-event-source,完成傳參、請(qǐng)求頭添加等
- 處理返回?cái)?shù)據(jù)
具體實(shí)現(xiàn):
const eventSourceRef = useRef<any>(null)
const [abortController, setAbortController] = useState(new AbortController())
// 通信事件
const contact = async (messageData: any) => {
messageData = { ...messageData, do_stream: modelArg.do_stream } // 請(qǐng)求參數(shù)
receivedDataRef.current = ''
const token: string = getLocal('AUTHCODE') || ''
fetchEventSource(`${MAAS_API_BASE}/v1/model_api/invoke`, {
method: 'POST',
// 添加請(qǐng)求頭
headers: {
Authorization: token,
'Content-Type': 'application/json',
},
// 傳參必須保證是json
body: JSON.stringify(messageData),
// abortController.signal 提供了一個(gè)信號(hào)對(duì)象給 fetchEventSource 函數(shù)。
// 如果在任何時(shí)候你想取消正在進(jìn)行的 fetch 操作,你可以調(diào)用
// abortController.abort()。這會(huì)發(fā)出關(guān)聯(lián)任務(wù)的信號(hào),你可以使用
// AbortController 的信號(hào)來(lái)檢查異步操作是否已被取消。
signal: abortController.signal,
openWhenHidden: true, // 切換標(biāo)簽頁(yè)時(shí)連接不關(guān)閉
async onopen(resp) {
// 處理登錄失效
if (resp.status === 401) {
message.warning('登錄過(guò)期')
return
}
},
onmessage(msg: any) {
const eventType = msg.event // 監(jiān)聽event的具名事件
switch (eventType) {
case 'add':
// 流式輸出事件,add每次會(huì)返回具體字符,前端負(fù)責(zé)拼接展示
break
case 'finish':
setStatu('finish') // 結(jié)束標(biāo)識(shí)
break
case 'error':
if (msg.status === 401) {
message.warning('登錄過(guò)期')
}
console.log('Error occurred:', e)
break
}
},
onerror(err) {
throw err // 連接遇到http錯(cuò)誤時(shí),如跨域等,必須要throw才能停止,不然會(huì)一直重連
},
onclose() {},
})
}
// 終止連接方法,比如在切換模型時(shí),你可能有必要終止上一次連接來(lái)避免問(wèn)答串聯(lián)
const closeSSE = () => {
abortController.abort()
setAbortController(new AbortController())
}
3、一種接口同時(shí)兼容流式/非流式
同上post方法
fetchEventSource(sseUrl, {
method: 'POST',
headers,
signal: abortController.signal,
body: JSON.stringify(customInferData),
openWhenHidden: true,
/**
*在onopen階段處理
第一步:判斷resp.headers.get('content-type'),如果不包含text/event-stream,
則代表非流式
第二步:需要在onopen階段處理非流式返回,即json返回,讀取json返回并渲染,注意異常也要處理
第三步:
*/
async onopen(resp) {
const contentype = resp.headers.get('content-type') || ''
console.log('contentype =>', contentype)
console.log('resp.ok =>', resp.ok)
if (resp.ok && !contentype.includes('text/event-stream')) {
// 讀取json數(shù)據(jù)
const responseData = await resp.json()
if (responseData.code !== 0) {
// 報(bào)錯(cuò)處理+關(guān)閉連接
} else {
//處理數(shù)據(jù)渲染+關(guān)閉連接
stopSession()
}
} else if (resp.status === 401) {
message.warning('登錄過(guò)期')
// 報(bào)錯(cuò)處理+關(guān)閉連接
stopSession()
}
},
onmessage(msg: any) {
const eventType = msg.event
const messages: any = cloneDeep(chatState.sessionMessages)
let lastMessage: any = messages[messages.length - 1] || {}
switch (eventType) {
case 'add':
lastMessage = {
...lastMessage,
text: `${lastMessage.text}${msg.data || ' '}`,
loading: false,
}
messages.splice(messages.length - 1, 1, lastMessage)
chatAction.updateSessionMessages(messages)
break
case 'finish':
console.log('finish lastMessage =>', lastMessage)
chatAction.updateSessionStatu(SessionStatuTypes.ready)
chatAction.updateContext(msg.data)
break
case 'info':
{
const messages: any = cloneDeep(chatState.sessionMessages)
let lastMessage: any = messages[messages.length - 1] || {}
lastMessage = {
referenceDocs: JSON.parse(msg.data).reference_by_docs,
...lastMessage,
}
messages.splice(messages.length - 1, 1, lastMessage)
chatAction.updateSessionMessages(messages)
}
break
case 'error':
if (msg.status === 401) {
chatAction.updateSessionStatu(SessionStatuTypes.ready)
} else {
errorItemFn(msg?.msg || msg?.data || '抱歉,暫無(wú)法回答問(wèn)題')
}
break
}
},
onerror(err: any) {
errorItemFn(err?.msg || '抱歉,暫無(wú)法回答該問(wèn)題')
console.log('eventSource error: ', `${err}`)
throw err // 連接遇到http錯(cuò)誤時(shí),如跨域等,必須要throw才能停止,不然會(huì)一直重連
},
onclose() {
console.log('eventSource close')
},
})
// 終止會(huì)話
const stopSession = () => {
abortController.abort()
setAbortController(new AbortController())
}
四、常見問(wèn)題匯總
1、無(wú)法添加請(qǐng)求頭
應(yīng)用fetch-event-source解決
2、一個(gè)方法需要同時(shí)兼容流式和非流式
應(yīng)用fetch-event-source在onopen階段處理非流式輸出,如報(bào)錯(cuò)、接口json返回等文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-832115.html
3、遇到跨域時(shí)候,請(qǐng)求一直連接
應(yīng)用fetch-event-source在監(jiān)聽具名事件時(shí),如error,將錯(cuò)誤throw err,否則無(wú)法中斷連接文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-832115.html
4、fetch方法如何終止
const stopSession = () => {
abortController.abort()
setAbortController(new AbortController())
}
到了這里,關(guān)于SSE實(shí)現(xiàn)消息實(shí)時(shí)推送,前端漸進(jìn)式學(xué)習(xí)、實(shí)踐,真香的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!