響應(yīng)式編程(Reactive Programming)是一種編程范式,旨在處理異步數(shù)據(jù)流和事件流。它通過(guò)使用觀察者模式和函數(shù)式編程的概念,將數(shù)據(jù)流和事件流抽象為可觀察的序列,然后通過(guò)操作這些序列來(lái)實(shí)現(xiàn)各種功能。
在響應(yīng)式編程中,數(shù)據(jù)流和事件流被視為連續(xù)的時(shí)間序列,可以通過(guò)操作符來(lái)轉(zhuǎn)換、過(guò)濾和組合它們。這種編程范式的主要優(yōu)勢(shì)是它可以簡(jiǎn)化異步編程,并提供一種聲明式的方式來(lái)處理數(shù)據(jù)流和事件流。它還可以提高代碼的可讀性和可維護(hù)性,因?yàn)樗鼘?fù)雜的異步邏輯封裝在操作符中,使得代碼更易于理解和修改。
響應(yīng)式編程可以應(yīng)用于各種領(lǐng)域,包括前端開發(fā)、后端開發(fā)、移動(dòng)開發(fā)等。在前端開發(fā)中,響應(yīng)式編程可以用于處理用戶界面的事件流和數(shù)據(jù)流,使得界面能夠動(dòng)態(tài)地響應(yīng)用戶的操作。在后端開發(fā)中,響應(yīng)式編程可以用于處理大量的異步請(qǐng)求和數(shù)據(jù)流,提高系統(tǒng)的吞吐量和響應(yīng)速度。
常見的響應(yīng)式編程框架包括RxJava、RxJS、ReactiveX等。這些框架提供了一系列的操作符和工具,用于處理數(shù)據(jù)流和事件流,并提供了一種簡(jiǎn)潔而強(qiáng)大的方式來(lái)處理異步編程。
當(dāng)前響應(yīng)式編程的典型例子莫過(guò)于最近炙手可熱的ChatGPT的流式輸出了。因?yàn)镃hatGPT請(qǐng)求響應(yīng)時(shí)間較長(zhǎng),如果采用傳統(tǒng)的一直等待全部數(shù)據(jù)就緒,用戶恐怕早就跑光了,而響應(yīng)式方式則不需要等待所有數(shù)據(jù)就緒,而只需要有部分?jǐn)?shù)據(jù)就緒即可輸出,從而極大地提升了用戶體驗(yàn)。下面以此為例,來(lái)說(shuō)明實(shí)現(xiàn)這種效果的原理(開發(fā)語(yǔ)言Java)。
先來(lái)看看上文中提到的的三個(gè)響應(yīng)式編程框架:RxJava、RxJS和ReactiveX。它們是三個(gè)相關(guān)的概念,同時(shí)也是不同平臺(tái)上的實(shí)現(xiàn)。
- RxJava:RxJava是ReactiveX在Java平臺(tái)上的實(shí)現(xiàn),它提供了一套豐富的API和操作符,用于處理異步和事件驅(qū)動(dòng)的編程。RxJava是基于觀察者模式和迭代器模式的,可以用于處理數(shù)據(jù)流、事件流和異步任務(wù)等。
- RxJS:RxJS是ReactiveX在JavaScript平臺(tái)上的實(shí)現(xiàn),它提供了類似于RxJava的API和操作符,用于處理異步和事件驅(qū)動(dòng)的編程。RxJS可以在瀏覽器端和Node.js環(huán)境中使用,可以處理DOM事件、AJAX請(qǐng)求、定時(shí)器等。
- ReactiveX:ReactiveX是一個(gè)跨平臺(tái)的響應(yīng)式編程庫(kù),它提供了一套統(tǒng)一的API和操作符,用于處理異步和事件驅(qū)動(dòng)的編程。ReactiveX的目標(biāo)是提供一種通用的編程模型,使得開發(fā)者可以在不同的平臺(tái)和語(yǔ)言中共享代碼和思想。
在Springboot中,另有WebFlux模塊可供使用,同時(shí)它也可以跟上面的模塊一起使用。說(shuō)起Flux,這里也會(huì)涉及到另一個(gè)概念:Flowable。其實(shí)Flowable和Flux都是響應(yīng)式流的實(shí)現(xiàn),它們有以下關(guān)系:
- Flowable是RxJava的一部分,而Flux是Reactor的一部分。RxJava是一個(gè)用于Java的響應(yīng)式編程庫(kù),而Reactor是一個(gè)用于Java的響應(yīng)式編程框架。
- Flowable是RxJava中的一個(gè)類,它實(shí)現(xiàn)了Reactive-Streams規(guī)范,提供了對(duì)背壓(backpressure)的支持。Flowable可以處理異步和并發(fā)的數(shù)據(jù)流,并且可以控制數(shù)據(jù)流的速率,以避免生產(chǎn)者和消費(fèi)者之間的不匹配。
- Flux是Reactor中的一個(gè)類,它也實(shí)現(xiàn)了Reactive-Streams規(guī)范,提供了類似的功能。Flux可以處理異步和并發(fā)的數(shù)據(jù)流,并且可以控制數(shù)據(jù)流的速率。
- Flowable和Flux都提供了一系列的操作符,可以對(duì)數(shù)據(jù)流進(jìn)行轉(zhuǎn)換、過(guò)濾、映射等操作。這些操作符可以幫助開發(fā)者處理和操作數(shù)據(jù)流,使代碼更加簡(jiǎn)潔和可讀。
跟tRxJava和Reactor密切相關(guān)的開發(fā)庫(kù)之一是WebClien。WebClient是一個(gè)用于發(fā)送HTTP請(qǐng)求的非阻塞的響應(yīng)式客戶端,它是Reactor項(xiàng)目的一部分。
WebClient提供了一種簡(jiǎn)潔、靈活和可組合的方式來(lái)發(fā)送HTTP請(qǐng)求,并處理響應(yīng)。它可以與RxJava和Reactor的異步和響應(yīng)式編程模型無(wú)縫集成,使得在響應(yīng)式應(yīng)用程序中處理HTTP請(qǐng)求變得更加方便和高效。
WebClient可以與RxJava的Flowable一起使用,通過(guò)toFlowable()方法將響應(yīng)轉(zhuǎn)換為Flowable流,從而實(shí)現(xiàn)對(duì)響應(yīng)的處理和操作。
WebClient webClient = WebClient.create();
Flowable<String> response = webClient.get()
.uri("https://example.com")
.retrieve()
.bodyToFlux(String.class)
.toFlowable();
同樣,WebClient也可以與Reactor的Flux一起使用,通過(guò)bodyToFlux()方法將響應(yīng)轉(zhuǎn)換為Flux流,從而實(shí)現(xiàn)對(duì)響應(yīng)的處理和操作。
WebClient webClient = WebClient.create();
Flux<String> response = webClient.get()
.uri("https://example.com")
.retrieve()
.bodyToFlux(String.class);
下面我們將關(guān)注點(diǎn)放在Reactor框架中,在Reactor中,不得不提另一個(gè)跟Flux相對(duì)的概念:Mono。Flux和Mono是Reactor框架中的兩個(gè)關(guān)鍵類,它們都是用于處理響應(yīng)式流的。
- Flux是一個(gè)表示0到N個(gè)元素的響應(yīng)式流。它可以發(fā)出多個(gè)元素,并以異步的方式產(chǎn)生這些元素。Flux可以用于處理多個(gè)值的數(shù)據(jù)流,例如從數(shù)據(jù)庫(kù)查詢結(jié)果、文件讀取等。
- Mono是一個(gè)表示0或1個(gè)元素的響應(yīng)式流。它要么發(fā)出一個(gè)元素,要么不發(fā)出任何元素。Mono可以用于處理單個(gè)值的數(shù)據(jù)流,例如從緩存中獲取數(shù)據(jù)、獲取單個(gè)實(shí)體等。
- Flux和Mono之間有以下關(guān)系:
- Flux可以被轉(zhuǎn)換成Mono。
Flux<Integer> flux = Flux.just(1, 2, 3);
Mono<Integer> mono = flux.single();
-
- Mono可以被轉(zhuǎn)換成Flux。
Mono<Integer> mono = Mono.just(1);
Flux<Integer> flux = mono.flux();
Flux和Mono可以通過(guò)一系列的操作符進(jìn)行轉(zhuǎn)換、過(guò)濾、映射等操作,使得對(duì)響應(yīng)式流的處理變得更加靈活和方便。它們是Reactor框架中的核心類,用于構(gòu)建響應(yīng)式應(yīng)用程序。
webClient可以實(shí)現(xiàn)復(fù)雜的處理邏輯,比如異常處理:
webClient.get()
.uri(url)
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new CustomException("客戶端錯(cuò)誤")))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new CustomException("服務(wù)器錯(cuò)誤")))
.bodyToMono(String.class)
.onErrorResume(throwable -> {
if (throwable instanceof WebClientResponseException) {
WebClientResponseException ex = (WebClientResponseException) throwable;
// 處理響應(yīng)異常
} else {
// 處理其他異常
}
});
在使用 Spring Boot 的 WebClient 時(shí),bodyToMono 和 bodyToFlux 方法都可以用于將響應(yīng)體轉(zhuǎn)換為 Mono 或 Flux 對(duì)象。
bodyToMono 方法用于將響應(yīng)體轉(zhuǎn)換為 Mono 對(duì)象,適用于響應(yīng)體只有一個(gè)元素的情況,例如返回一個(gè) JSON 對(duì)象或者一個(gè)字符串。
bodyToFlux 方法用于將響應(yīng)體轉(zhuǎn)換為 Flux 對(duì)象,適用于響應(yīng)體有多個(gè)元素的情況,例如返回一個(gè) JSON 數(shù)組或者一個(gè)流式數(shù)據(jù)。
因此,當(dāng)我們需要處理的響應(yīng)體只有一個(gè)元素時(shí),應(yīng)該使用 bodyToMono 方法;當(dāng)我們需要處理的響應(yīng)體有多個(gè)元素時(shí),應(yīng)該使用 bodyToFlux 方法。
在 Reactor 中,F(xiàn)lux 流結(jié)束的實(shí)現(xiàn)原理是通過(guò)發(fā)送一個(gè) onComplete 信號(hào)來(lái)通知訂閱者流已經(jīng)結(jié)束。當(dāng) Flux 流中的所有元素都被消費(fèi)完畢時(shí),會(huì)自動(dòng)發(fā)送一個(gè) onComplete 信號(hào)。
例如,當(dāng)我們使用 Flux.range(1, 10) 創(chuàng)建一個(gè)包含 1 到 10 的整數(shù)序列的 Flux 流時(shí),當(dāng)訂閱者訂閱該流并消費(fèi)完所有元素后,會(huì)自動(dòng)發(fā)送一個(gè) onComplete 信號(hào)來(lái)通知訂閱者流已經(jīng)結(jié)束。
在使用 Spring Boot 的 WebClient 時(shí),當(dāng)我們使用 bodyToFlux 方法將響應(yīng)體轉(zhuǎn)換為 Flux 對(duì)象時(shí),如果響應(yīng)體是一個(gè)流式數(shù)據(jù),那么當(dāng)流式數(shù)據(jù)傳輸完畢后,會(huì)自動(dòng)發(fā)送一個(gè) onComplete 信號(hào)來(lái)通知訂閱者流已經(jīng)結(jié)束。
webClient.get()
.uri(url)
.retrieve()
.bodyToFlux(String.class)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
System.out.println("流已結(jié)束");
}
})
.subscribe();
有了這些基礎(chǔ)知識(shí)的準(zhǔn)備,我們?cè)賮?lái)看看ChatGPT的響應(yīng)結(jié)果樣例。OpenAI的聊天接口是:
http://api.openai.com/v1/chat/completitions。
該接口接受這樣的一個(gè)請(qǐng)求數(shù)據(jù)結(jié)構(gòu):ChatCompletionRequest。其中有個(gè)屬性stream 可以設(shè)定是否采用流輸出。默認(rèn)false。
這個(gè)例子是非stream輸出,輸出格式為:ChatCompletionResponse
$ curl https://api.openai.com/v1/chat/completions -H 'Content-Type: application/json' -H "Authorization: Bearer sk-zDxkX0Na0e63B18c9c6bT3BlBkFJf3De387b398749c5bD1d" -d '{"model": "gpt-3.5-turbo","stream":"false","messages": [{"role": "user", "content": "Hello!"}]}'
{"id":"chatcmpl-7tywVQ4vSPzs8yuZy5FqvL0CX07W0","object":"chat.completion","created":1693576659,"model":"gpt-3.5-turbo-0613","choices":[{"index":0,"message":{"role":"assistant","content":"Hello
這個(gè)例子是stream輸出,輸出結(jié)構(gòu)體為:字符串格式的ChatCompletionResponse:
curl https://api.openai.com/v1/chat/completions -H 'Content-Type: application/json' -H "Authorization: Bearer sk-zDxkX0Na0e63B18c9c6bT3BlBkFJf3De387b398749c5bD1d" -d '{"model": "gpt-3.5-turbo","stream":"true","messages": [{"role": "user", "content": "Hello!"}]}'
比較stream和非stream的輸出區(qū)別,有一下幾點(diǎn):
1.非stream 輸出只有一條記錄;stream 有若干條,取決于響應(yīng)內(nèi)容大??;
2. 非stream 輸出包含消耗的tokens數(shù)量,stream 沒(méi)有;
3. 非stream 輸出結(jié)果是json格式的ChatCompletionResponse結(jié)構(gòu),stream 輸出j格式類似:data:str(ChatCompletionResponse),同時(shí)以data:[NONE]結(jié)尾;
結(jié)合上面的知識(shí),我們就能實(shí)現(xiàn)上述功能:
public Publisher<String> generateChatCompletion(ChatCompletionRequest chatCompletionRequest) {
WebClient.ResponseSpec responseSpec = webClient.post()
.uri(this.apiUrl + "/chat/completions").header("Authorization", "Bearer " + this.apiKey)// .accept(MediaType.TEXT_EVENT_STREAM)
.bodyValue(chatCompletionRequest)
.retrieve();
if (chatCompletionRequest.getStream())
return
responseSpec.bodyToFlux(ChatCompletionResponse.class)
.onErrorResume(error -> {
// 異常處理邏輯
logger.error("bodyToFlux error: {}", error);
return Flux.empty();
})
.filter(response -> {
ChatMessage message = response.getChoices().get(0).getMessage();
if (message != null) {
String content = message.getContent();
return StringUtils.isNotEmpty(StringUtils.trim(content));
}
return false;
})
.mapNotNull(response -> {
try {
return objectMapper.writeValueAsString(response);
} catch (JsonProcessingException e) {
logger.error(e);
return null;
}
}).concatWithValues("[DONE]");
else
return
responseSpec.bodyToMono(ChatCompletionResponse.class)
.onErrorResume(error -> {
// 異常處理邏輯
logger.error("bodyToMono error: {}", error);
return Mono.empty();
}).mapNotNull(response -> {
try {
return objectMapper.writeValueAsString(response);
} catch (JsonProcessingException e) {
logger.error(e);
return null;
}
});
}
Publisher是一個(gè)通用的概念,它代表一個(gè)發(fā)布者,可以發(fā)布數(shù)據(jù)或事件。在Spring WebFlux中,F(xiàn)lux和Mono都是Publisher的實(shí)現(xiàn)類。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-693509.html
試用地址:https://chatgpt-discount.zeabur.app文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-693509.html
到了這里,關(guān)于從零開始搭建AI網(wǎng)站(6):如何使用響應(yīng)式編程的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!