WebSocket協(xié)議定義了兩種類型的消息(文本和二進(jìn)制),但其內(nèi)容未作定義。該協(xié)議定義了一種機(jī)制,供客戶端和服務(wù)器協(xié)商在WebSocket之上使用的子協(xié)議(即更高級別的消息傳遞協(xié)議),以定義各自可以發(fā)送何種消息、格式是什么、每個消息的內(nèi)容等等。子協(xié)議的使用是可選的,但無論如何,客戶端和服務(wù)器都需要就定義消息內(nèi)容的一些協(xié)議達(dá)成一致。
一、概覽
STOMP(Simple Text Oriented Messaging Protocol)最初是為腳本語言(如Ruby、Python和Perl)創(chuàng)建的,用于連接到企業(yè) message broker。它被設(shè)計用來解決常用信息傳遞模式的一個最小子集。STOMP可以通過任何可靠的雙向流媒體網(wǎng)絡(luò)協(xié)議使用,如TCP和WebSocket。盡管STOMP是一個面向文本的協(xié)議,但消息的 payload 可以是文本或二進(jìn)制。
STOMP是一個基于框架的協(xié)議,其框架是以HTTP為模型的。下面列出了STOMP框架的結(jié)構(gòu):
COMMAND
header1:value1
header2:value2
Body
客戶端可以使用?SEND
?或?SUBSCRIBE
?命令來發(fā)送或訂閱消息,以及描述消息內(nèi)容和誰應(yīng)該收到它的?destination
?header。這就實現(xiàn)了一個簡單的發(fā)布-訂閱機(jī)制,你可以用它來通過 broker 向其他連接的客戶端發(fā)送消息,或者向服務(wù)器發(fā)送消息以請求執(zhí)行某些工作。
當(dāng)你使用Spring的STOMP支持時,Spring WebSocket 應(yīng)用程序充當(dāng)客戶的STOMP broker。消息被路由到?@Controller
?消息處理方法或簡單的內(nèi)存中 broker,該 broker 跟蹤訂閱并將消息廣播給訂閱用戶。你也可以將Spring配置為與專門的STOMP broker(如RabbitMQ、ActiveMQ等)合作,進(jìn)行消息的實際廣播。在這種情況下,Spring維護(hù)與 broker 的TCP連接,向其轉(zhuǎn)發(fā)消息,并將消息從它那里傳遞給連接的WebSocket客戶端。因此,Spring web 應(yīng)用可以依靠統(tǒng)一的基于HTTP的 security、通用驗證和熟悉的編程模型來處理消息。
下面的例子顯示了一個客戶端訂閱接收股票報價,服務(wù)器可能會定期發(fā)布這些報價(例如,通過一個預(yù)定任務(wù),通過?SimpMessagingTemplate
?向 broker 發(fā)送消息):
SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
下面的例子顯示了一個客戶端發(fā)送了一個交易請求,服務(wù)器可以通過?@MessageMapping
?方法來處理:
SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}
執(zhí)行后,服務(wù)器可以向客戶廣播交易確認(rèn)信息和細(xì)節(jié)。
在STOMP規(guī)范中,destination 的含義是故意不透明的。它可以是任何字符串,而且完全由STOMP服務(wù)器來定義它們所支持的 destination 的語義和語法。然而,destination 是非常常見的,它是類似路徑的字符串,其中?/topic/..
?意味著發(fā)布-訂閱(一對多),/queue/
?意味著點對點(一對一)的消息交換。
STOMP服務(wù)器可以使用?MESSAGE
?命令向所有訂閱者廣播信息。下面的例子顯示了一個服務(wù)器向一個訂閱的客戶發(fā)送一個股票報價:
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}
一個服務(wù)器不能發(fā)送未經(jīng)請求的消息。所有來自服務(wù)器的消息必須是對特定客戶訂閱的回應(yīng),而且服務(wù)器消息的?subscription
?header 必須與客戶端?subscription
?的?id
?header 相匹配。
前面的概述是為了提供對STOMP協(xié)議最基本的理解。我們建議閱讀協(xié)議的全部?規(guī)范。
二、?好處
使用STOMP作為子協(xié)議可以讓Spring框架和Spring Security提供更豐富的編程模型,而不是使用原始WebSockets。關(guān)于HTTP與原始TCP的對比,以及它如何讓Spring MVC和其他Web框架提供豐富的功能,也可以提出同樣的觀點。以下是一個好處清單:
-
不需要發(fā)明一個自定義的消息傳輸協(xié)議和消息格式。
-
STOMP客戶端,包括Spring框架中的一個?Java客戶端,都是可用的。
-
你可以(選擇性地)使用消息代理(如RabbitMQ、ActiveMQ和其他)來管理訂閱和廣播消息。
-
應(yīng)用邏輯可以組織在任何數(shù)量的?
@Controller
?實例中,消息可以根據(jù)STOMP destination header 被路由到它們,而不是用一個給定連接的單一?WebSocketHandler
?來處理原始WebSocket消息。 -
你可以使用 Spring Security 來保護(hù)基于 STOMP destination 和消息類型的消息。
三、 啟用 STOMP
spring-messaging
?和?spring-websocket
?模塊中提供了基于WebSocket的STOMP的支持。一旦你有了這些依賴,你就可以通過?SockJS Fallback?在WebSocket上暴露一個STOMP端點,如下面的例子所示:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.enableSimpleBroker("/topic", "/queue");
}
}
/portfolio ?是WebSocket(或SockJS)客戶端需要連接以進(jìn)行WebSocket握手的端點的HTTP URL。 |
destination header 以?/app ?開頭的STOMP消息被路由到?@Controller ?類中的?@MessageMapping ?方法。 |
使用內(nèi)置的消息 broker 進(jìn)行訂閱和廣播,將 destination header 以?/topic ?或?/queue ?開頭的消息路由到 broker。 |
下面的例子顯示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio">
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/topic, /queue"/>
</websocket:message-broker>
</beans>
對于內(nèi)置的簡單 broker,/topic ?和?/queue ?前綴沒有任何特殊含義。它們只是一個慣例,用來區(qū)分發(fā)布/訂閱和點對點的消息傳遞(即許多訂閱者和一個消費者)。當(dāng)你使用外部 broker 時,請檢查該 broker 的STOMP頁面,以了解它支持什么樣的STOMP destination 和前綴。 |
要從瀏覽器連接,對于SockJS,你可以使用?sockjs-client。對于STOMP,許多應(yīng)用程序都使用了?jmesnil/stomp-websocket?庫(也稱為stomp.js),該庫功能完整,已在生產(chǎn)中使用多年,但已不再維護(hù)。目前,?JSteunou/webstomp-client?是該庫的最積極的維護(hù)和發(fā)展的繼承者。下面的示例代碼是基于它的:
var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}
另外,如果你通過WebSocket(沒有SockJS)連接,你可以使用以下代碼:
var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}
請注意,前面的例子中的?stompClient
?不需要指定?login
?和?passcode
?header。即使它這樣做了,它們在服務(wù)器端也會被忽略(或者說,被覆蓋)。關(guān)于認(rèn)證的更多信息,請參見?連接到 Broker?和?認(rèn)證。
更多示例代碼見:
-
使用WebSocket構(gòu)建交互式wen應(yīng)用程序?—?入門指南。
-
股票證券交易?—?示例應(yīng)用。
四、 WebSocket 服務(wù)器
要配置底層的WebSocket服務(wù)器,Server 配置?中的信息適用。然而,對于Jetty,你需要通過?StompEndpointRegistry
?設(shè)置?HandshakeHandler
?和?WebSocketPolicy
:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(
new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
五、 消息流
一旦STOMP端點被暴露,Spring應(yīng)用程序就成為連接客戶端的STOMP broker。本節(jié)描述了服務(wù)器端的消息流。
spring-messaging
?模塊包含了對消息傳遞應(yīng)用的基礎(chǔ)支持,這些支持源于?Spring Integration,后來被提取并整合到Spring框架中,以便在許多?Spring projects?和應(yīng)用場景中更廣泛地使用。下面的列表簡要地描述了一些可用的消息傳遞(messaging)抽象:
-
Message: 一個消息的簡單表示,包括 header 和 payload。
-
MessageHandler: 處理消息。
-
MessageChannel: 發(fā)送消息,使生產(chǎn)者和消費者之間實現(xiàn)松散耦合。
-
SubscribableChannel: 帶有?
MessageHandler
?訂閱者的?MessageChannel
。 -
ExecutorSubscribableChannel:?
SubscribableChannel
,使用一個?Executor
?來傳遞消息。
Java 配置(即?@EnableWebSocketMessageBroker
)和 XML 命名空間配置(即?<websocket:message-broker>
)都使用前面的組件來組裝一個消息工作流。下圖顯示了啟用簡單內(nèi)置消息 broker 時使用的組件:
前面的圖顯示了三個信息通道(message channel):
-
clientInboundChannel
: 用于傳遞從WebSocket客戶端收到的消息。 -
clientOutboundChannel
: 用于向WebSocket客戶端發(fā)送服務(wù)器信息。 -
brokerChannel
: 用于從服務(wù)器端的應(yīng)用程序代碼中向消息 broker 發(fā)送消息。
下圖顯示了當(dāng)外部 broker(如 RabbitMQ)被配置為管理訂閱和廣播消息時使用的組件:
前面兩張圖的主要區(qū)別是使用 “broker relay (中繼)”,通過TCP將消息向上傳遞到外部STOMP broker,并從 broker 向下傳遞消息到訂閱的客戶。
當(dāng)從WebSocket連接中接收到消息時,它們會被解碼為STOMP幀,變成Spring?Message
?表示,并被發(fā)送到?clientInboundChannel
?進(jìn)行進(jìn)一步處理。例如,destination header 以?/app
?開頭的 STOMP 消息可以被路由到注解 controller 中的?@MessageMapping
?方法,而?/topic
?和?/queue
?消息可以直接被路由到消息 broker。
一個處理來自客戶端的STOMP消息的注解的?@Controller
?可以通過?brokerChannel
?向消息代理發(fā)送消息,而代理則通過?clientOutboundChannel
?將消息廣播給匹配的訂閱者。同樣的 controller 也可以在響應(yīng)HTTP請求時做同樣的事情,所以客戶端可以執(zhí)行一個HTTP POST,然后一個?@PostMapping
?方法可以發(fā)送一個消息給消息 broker,以廣播給訂閱的客戶端。
我們可以通過一個簡單的例子來追蹤這個流程??紤]下面這個例子,它設(shè)置了一個服務(wù)器:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Controller
public class GreetingController {
@MessageMapping("/greeting")
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
}
前面的例子支持以下流程:
-
客戶端連接到?
http://localhost:8080/portfolio
,一旦建立WebSocket連接,STOMP幀就開始在上面流動。 -
客戶端發(fā)送一個?
SUBSCRIBE
?幀,destination header 為?/topic/greeting
。一旦收到并解碼,該消息被發(fā)送到?clientInboundChannel
,然后被路由到消息 broker,該 broker 存儲客戶端的訂閱。 -
客戶端發(fā)送一個?
SEND
?幀到?/app/greeting
。/app
?前綴有助于將其路由到有注解的 controller 。在?/app
?前綴被剝離后,剩余的?/greeting
?部分的 destination 被映射到?GreetingController
?的?@MessageMapping
?方法。 -
從?
GreetingController
?返回的值被轉(zhuǎn)化為Spring?Message
,其 payload 基于返回值和默認(rèn)的 destination header?/topic/greeting
(從輸入 destination 派生,用?/topic
?代替?/app
)。產(chǎn)生的消息被發(fā)送到?brokerChannel
?并由消息 broker 處理。 -
消息 broker 找到所有匹配的訂閱者,并通過客戶端?
OutboundChannel
?向每個訂閱者發(fā)送一個?MESSAGE
?幀,從那里消息被編碼為STOMP幀并在WebSocket連接上發(fā)送。
下一節(jié)將提供關(guān)于注解方法的更多細(xì)節(jié),包括支持的參數(shù)和返回值的種類。
六、 注解式Controller
應(yīng)用程序可以使用注解的?@Controller
?類來處理來自客戶端的消息。這樣的類可以聲明?@MessageMapping
、@SubscribeMapping
?和?@ExceptionHandler
?方法,如以下主題所述:
-
@MessageMapping
-
@SubscribeMapping
-
@MessageExceptionHandler
@MessageMapping
你可以使用?@MessageMapping
?來注解那些根據(jù) destination 路由消息的方法。它在方法層面和類型層面都被支持。在類型層面上,@MessageMapping
?被用來表達(dá) controller 中所有方法的共享映射。
默認(rèn)情況下,映射值是Ant風(fēng)格的路徑模式(例如?/thing*
,/thing/**
),包括對模板變量的支持(例如,/thing/{id}
)。這些值可以通過?@DestinationVariable
?方法參數(shù)進(jìn)行引用。應(yīng)用程序也可以切換到點狀分隔的目標(biāo)約定來進(jìn)行映射,正如在?使用點作為分隔符?中解釋的那樣。
支持的方法參數(shù)
下表描述了方法的參數(shù):
方法參數(shù) | 說明 |
---|---|
|
為了獲得完整的信息。 |
|
用于訪問? |
|
用于通過類型化的訪問器方法訪問 header。 |
|
用于訪問消息的 payload,由配置的? 這個注解的存在不是必須的,因為默認(rèn)情況下,如果沒有其他參數(shù)被匹配,它就會被假定。 你可以用? |
|
用于訪問一個特定的 header 值—?如果有必要,同時使用? |
|
用于訪問消息中的所有 header。這個參數(shù)必須是可以分配給? |
|
用于訪問從消息 destination 提取的模板變量。必要時,值被轉(zhuǎn)換為聲明的方法參數(shù)類型。 |
|
反映在WebSocket HTTP握手時登錄的用戶。 |
返回值
默認(rèn)情況下,@MessageMapping
?方法的返回值通過匹配的?MessageConverter
?被序列化為一個 payload,并作為一個?Message
?發(fā)送到?brokerChannel
,從那里被廣播給訂閱者。出站消息的 destination 與入站消息的 destination 相同,但前綴為?/topic
。
你可以使用?@SendTo
?和?@SendToUser
?注解來定制輸出消息的 destination。@SendTo
?是用來定制目標(biāo) destination 或指定多個 destination 的。?@SendToUser
?用來指導(dǎo)輸出消息只給與輸入消息相關(guān)的用戶。參見?User Destination。
你可以在同一個方法上同時使用?@SendTo
?和?@SendToUser
,而且在類的層面上也支持這兩種注解,在這種情況下,它們作為類中方法的默認(rèn)值。然而,請記住,任何方法級的?@SendTo
?或?@SendToUser
?注解都會覆蓋類級的任何此類注解。
消息可以被異步處理,@MessageMapping
?方法可以返回?ListenableFuture
、?CompletableFuture
?或?CompletionStage
。
請注意,@SendTo
?和?@SendToUser
?只是一種便利,相當(dāng)于使用?SimpMessagingTemplate
?來發(fā)送消息。如果有必要,對于更高級的場景,?@MessageMapping
?方法可以直接使用?SimpMessagingTemplate
。這可以代替返回值,也可以在返回值之外進(jìn)行。參見?發(fā)送消息。
@SubscribeMapping
@SubscribeMapping
?與?@MessageMapping
?類似,但只將映射范圍縮小到訂閱信息。它支持與?@MessageMapping
?相同的?方法參數(shù)。然而對于返回值,默認(rèn)情況下,消息被直接發(fā)送到客戶端(通過?clientOutboundChannel
,對訂閱的響應(yīng)),而不是發(fā)送到 broker(通過?brokerChannel
,作為廣播給匹配的訂閱)。添加?@SendTo
?或?@SendToUser
?會重寫這一行為,并代替發(fā)送至 broker。
這在什么時候是有用的?假設(shè) broker 被映射到?/topic
?和?/queue
,而應(yīng)用 controller 被映射到?/app
。在這種設(shè)置下,broker 存儲了所有對?/topic
?和?/queue
?的訂閱,這些訂閱是為了重復(fù)廣播,而應(yīng)用程序不需要參與。客戶端也可以訂閱一些?/app
?的 destination,controller 可以在不涉及 broker 的情況下返回一個值,而不需要再次存儲或使用該訂閱(實際上是一個一次性的請求-回復(fù) exchange)。這方面的一個用例是在啟動時用初始數(shù)據(jù)填充一個用戶界面。
這在什么時候沒有用?不要試圖將 broker 和 controller 映射到同一個目標(biāo)前綴,除非你想讓兩者獨立處理消息,包括訂閱,因為某些原因。入站消息是平行處理的。不能保證一個 broker 或一個 controller 先處理一個給定的消息。如果目標(biāo)是在訂閱被存儲并準(zhǔn)備好進(jìn)行廣播時得到通知,如果服務(wù)器支持的話,客戶端應(yīng)該要求得到一個 receipt (簡單的 broker 不支持)。例如,使用Java?STOMP client,你可以做以下事情來添加一個 receipt:
@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// During initialization..
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// When subscribing..
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(receiptHeaders -> {
// Subscription ready...
});
一個服務(wù)器端的選擇是在?brokerChannel
?上?注冊?一個?ExecutorChannelInterceptor
,并實現(xiàn)?afterMessageHandled
?方法,該方法在消息(包括訂閱)被處理后被調(diào)用。
@MessageExceptionHandler
一個應(yīng)用程序可以使用?@MessageExceptionHandler
?方法來處理來自?@MessageMapping
?方法的異常。你可以在注解本身中聲明異常,如果你想獲得對異常實例的訪問,也可以通過方法參數(shù)來聲明。下面的例子通過一個方法參數(shù)聲明了一個異常:
@Controller
public class MyController {
// ...
@MessageExceptionHandler
public ApplicationError handleException(MyException exception) {
// ...
return appError;
}
}
@MessageExceptionHandler
?方法支持靈活的方法簽名,支持與?@MessageMapping?方法相同的方法參數(shù)類型和返回值。
通常情況下,@MessageExceptionHandler
?方法適用于它們所聲明的?@Controller
?類(或類層次結(jié)構(gòu))。如果你想讓這些方法在全局范圍內(nèi)應(yīng)用(跨控制器),你可以在一個標(biāo)有?@ControllerAdvice
?的類中聲明它們。這與Spring MVC中的類似?支持相當(dāng)。
七、 發(fā)送消息
如果你想從應(yīng)用程序的任何部分向連接的客戶端發(fā)送消息,怎么辦?任何應(yīng)用程序組件都可以向?brokerChannel
?發(fā)送消息。最簡單的方法是注入一個?SimpMessagingTemplate
?并使用它來發(fā)送消息。通常情況下,你會按類型注入它,如下例所示:
@Controller
public class GreetingController {
private SimpMessagingTemplate template;
@Autowired
public GreetingController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path="/greetings", method=POST)
public void greet(String greeting) {
String text = "[" + getTimestamp() + "]:" + greeting;
this.template.convertAndSend("/topic/greetings", text);
}
}
然而,你也可以通過它的名字(brokerMessagingTemplate
)來限定它,如果存在另一個相同類型的bean。
八、Simple Broker
內(nèi)置的簡單 message broker 處理來自客戶端的訂閱請求,將其存儲在內(nèi)存中,并將消息廣播給有匹配目的地的連接客戶端。broker 支持類似路徑的 destination,包括對 Ant 風(fēng)格的 destination 模式的訂閱。
應(yīng)用程序也可以使用點分割的(而不是斜線分割的)destination。參見?使用點作為分隔符。 |
如果配置了一個任務(wù)調(diào)度器(task scheduler),simple broker 就支持?STOMP 心跳。要配置一個調(diào)度器,你可以聲明你自己的?TaskScheduler
?Bean,并通過?MessageBrokerRegistry
?來設(shè)置它?;蛘撸憧梢允褂迷趦?nèi)置 WebSocket 配置中自動聲明的那個,但是,你將’需要?@Lazy
?以避免內(nèi)置 WebSocket 配置和你的?WebSocketMessageBrokerConfigurer
?之間的循環(huán)。例如:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private TaskScheduler messageBrokerTaskScheduler;
@Autowired
public void setMessageBrokerTaskScheduler(@Lazy TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(new long[] {10000, 20000})
.setTaskScheduler(this.messageBrokerTaskScheduler);
// ...
}
}
九、 外部 Broker
簡單的 broker 很適合入門,但它只支持STOMP命令的一個子集(它不支持acks、receipts和其他一些功能),依賴于一個簡單的消息發(fā)送循環(huán),并且不適合集群化。作為一種選擇,你可以升級你的應(yīng)用程序,以使用全功能的消息代理。
請參閱你所選擇的消息代理(如?RabbitMQ、?ActiveMQ?等)的 STOMP 文檔,安裝該代理,并在啟用 STOMP 支持的情況下運行它。然后你可以在Spring配置中啟用STOMP broker 中繼(而不是簡單的 broker)。
下面的配置示例啟用了一個全功能的 broker:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
}
下面的例子顯示了前述例子的XML配置等效:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio" />
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
</beans>
前面配置中的STOMP broker 中繼器是一個Spring?MessageHandler,它通過將消息轉(zhuǎn)發(fā)給外部 message broker 來處理消息。為此,它建立了與 broker 的TCP連接,將所有消息轉(zhuǎn)發(fā)給 broker ,然后將從 broker 那里收到的所有消息通過WebSocket會話轉(zhuǎn)發(fā)給客戶。從本質(zhì)上講,它充當(dāng)了一個 "中轉(zhuǎn)站",在兩個方向上轉(zhuǎn)發(fā)消息。
將?io.projectreactor.netty:reactor-netty ?和?io.netty:netty-all ?依賴項添加到你的項目中,用于 TCP 連接管理。 |
此外,應(yīng)用組件(如HTTP請求處理方法、業(yè)務(wù)服務(wù)等)也可以向 broker 中繼發(fā)送消息,如?發(fā)送消息?中所述,以向訂閱的WebSocket客戶端廣播消息。
實際上,broker 中繼實現(xiàn)了穩(wěn)健和可擴(kuò)展的消息廣播。
十、連接到 Broker
STOMP broker 中轉(zhuǎn)站保持著一個與 broker 的單一 "系統(tǒng)" TCP連接。這個連接只用于從服務(wù)器端應(yīng)用程序發(fā)出的消息,不用于接收消息。你可以為這個連接配置STOMP憑證(即STOMP框架?login
?和?passcode
?header)。這在XML命名空間和Java配置中都被暴露為?systemLogin
?和?systemPasscode
?屬性,默認(rèn)值為?guest
?和?guest
。
STOMP broker 中繼器還為每個連接的 WebSocket 客戶端創(chuàng)建一個單獨的TCP連接。你可以配置 STOMP 憑證,這些憑證用于代表客戶創(chuàng)建的所有TCP連接。這在XML命名空間和Java配置中都被暴露為?clientLogin
?和?clientPasscode
?屬性,默認(rèn)值為?guest
?和?guest
。
STOMP broker 中繼總是在它代表客戶轉(zhuǎn)發(fā)給 broker 的每個?CONNECT ?幀上設(shè)置?login ?和?passcode ?header。因此,WebSocket客戶端不需要設(shè)置這些 header 信息。它們會被忽略。正如?認(rèn)證?部分所解釋的,WebSocket客戶端應(yīng)依靠HTTP認(rèn)證來保護(hù)WebSocket端點并建立客戶端身份。 |
STOMP broker 中繼也通過 "系統(tǒng)" TCP連接向 message broker 發(fā)送和接收心跳。你可以配置發(fā)送和接收心跳的時間間隔(默認(rèn)為10秒)。如果與 broker 的連接丟失,broker 中繼會繼續(xù)嘗試重新連接,每5秒一次,直到成功。
任何Spring Bean都可以實現(xiàn)?ApplicationListener<BrokerAvailabilityEvent>
?來接收與 broker 的 "系統(tǒng)" 連接丟失和重新建立時的通知。例如,當(dāng)沒有活躍的 "系統(tǒng)" 連接時,一個廣播股票報價的股票報價服務(wù)可以停止嘗試發(fā)送消息。
默認(rèn)情況下,STOMP broker 中繼器總是連接到同一主機(jī)和端口,并在失去連接時根據(jù)需要重新連接。如果你希望提供多個地址,在每次嘗試連接時,你可以配置一個地址 supplier,而不是一個固定的主機(jī)和端口。下面的例子顯示了如何做到這一點:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
你也可以用?virtualHost
?屬性來配置STOMP broker 中繼。這個屬性的值被設(shè)置為每個?CONNECT
?幀的?host
?header,可能很有用(例如,在云環(huán)境中,建立TCP連接的實際主機(jī)與提供基于云的STOMP服務(wù)的主機(jī)不同)。
十一、 使用點作為分隔符
當(dāng)消息被路由到?@MessageMapping
?方法時,它們會與?AntPathMatcher
?匹配。默認(rèn)情況下,pattern 被期望使用斜線(/
)作為分隔符。這在web應(yīng)用中是一個很好的慣例,與HTTP URLs類似。然而,如果你更習(xí)慣于信息傳遞的慣例,你可以切換到使用點(.
)作為分隔符。
下面的例子顯示了如何在Java配置中這樣做:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
下面的例子顯示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
<websocket:stomp-endpoint path="/stomp"/>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
<bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
<constructor-arg index="0" value="."/>
</bean>
</beans>
之后,controller 可以在?@MessageMapping
?方法中使用點(.
)作為分隔符,如下圖所示:
@Controller
@MessageMapping("red")
public class RedController {
@MessageMapping("blue.{green}")
public void handleGreen(@DestinationVariable String green) {
// ...
}
}
客戶端現(xiàn)在可以向?/app/red.blue.green123
?發(fā)送消息。
在前面的例子中,我們沒有改變 “broker relay” 上的前綴,因為這些完全取決于外部 message broker。請看你使用的 broker 的STOMP文檔頁面,看看它對 destination header 支持什么約定。
另一方面,“simple broker” 確實依賴于配置的?PathMatcher
,所以,如果你切換分隔符,這種變化也適用于 broker 和 broker 將消息中的 destination 與訂閱中的 pattern 相匹配的方式。
十二、認(rèn)證
每個基于WebSocket的STOMP 消息會話都從一個HTTP請求開始。這可以是一個升級到WebSocket的請求(即WebSocket握手),或者在SockJS回退的情況下,是一系列的SockJS HTTP傳輸請求。
許多Web應(yīng)用程序已經(jīng)有了認(rèn)證和授權(quán),以確保HTTP請求的安全。通常情況下,用戶通過Spring Security使用某種機(jī)制進(jìn)行認(rèn)證,如登錄頁面、HTTP基本認(rèn)證或其他方式。認(rèn)證用戶的 security context 被保存在HTTP會話中,并與同一基于cookie的會話中的后續(xù)請求相關(guān)。
因此,對于WebSocket握手或SockJS的HTTP傳輸請求,通常已經(jīng)有一個可通過?HttpServletRequest#getUserPrincipal()
?訪問的認(rèn)證用戶。Spring會自動將該用戶與為其創(chuàng)建的WebSocket或SockJS會話聯(lián)系起來,隨后通過 user header 通過該會話傳輸?shù)乃蠸TOMP消息聯(lián)系起來。
簡而言之,一個典型的web應(yīng)用程序除了已經(jīng)做的安全工作外,不需要做任何其他事情。用戶在HTTP請求層面通過 security context 進(jìn)行認(rèn)證,該 security context 通過基于cookie的HTTP會話(然后與為該用戶創(chuàng)建的WebSocket或SockJS會話相關(guān)聯(lián))進(jìn)行維護(hù),并導(dǎo)致在流經(jīng)應(yīng)用程序的每個?Message
?上印上一個 user header。
STOMP協(xié)議在?CONNECT
?幀上確實有?login
?和?passcode
?header。這些頭信息最初是為通過TCP的STOMP而設(shè)計的,并且是需要的。然而,對于通過WebSocket的STOMP,默認(rèn)情況下,Spring忽略了STOMP協(xié)議級別的 authentication header,并假定用戶已經(jīng)在HTTP傳輸級別進(jìn)行了認(rèn)證。我們期望WebSocket或SockJS會話包含認(rèn)證的用戶。
十三、 Token 認(rèn)證
Spring Security OAuth?提供了對基于令牌的安全支持,包括 JSON Web Token(JWT)。你可以在Web應(yīng)用程序中使用它作為認(rèn)證機(jī)制,包括上一節(jié)所述的基于WebSocket的STOMP交互(也就是通過基于cookie的會話來維護(hù)身份)。
同時,基于cookie的會話并不總是最合適的(例如,在不維護(hù)服務(wù)器端會話的應(yīng)用中,或者在移動應(yīng)用中,通常使用 header 信息進(jìn)行認(rèn)證)。
WebSocket協(xié)議,RFC 6455?"沒有規(guī)定服務(wù)器在WebSocket握手過程中對客戶端進(jìn)行認(rèn)證的任何特定方式"。然而,在實踐中,瀏覽器客戶端只能使用標(biāo)準(zhǔn) authentication header(即 basic HTTP authentication)或cookies,不能(例如)提供自定義 header。同樣地,SockJS的JavaScript客戶端也沒有提供與SockJS傳輸請求一起發(fā)送HTTP header 的方法。見?sockjs-client issue 196。相反,它確實允許發(fā)送查詢參數(shù),你可以用它來發(fā)送 token,但這也有自己的缺點(例如,token 可能會無意中與服務(wù)器日志中的URL一起被記錄)。
前面的限制是針對基于瀏覽器的客戶端,不適用于基于Spring Java 的 STOMP client,該客戶端確實支持通過WebSocket和SockJS請求發(fā)送 header 信息。 |
因此,希望避免使用cookies的應(yīng)用程序在HTTP協(xié)議層面上可能沒有任何好的替代認(rèn)證。與其使用cookies,他們可能更喜歡在STOMP消息協(xié)議層面用 header 進(jìn)行認(rèn)證。這樣做需要兩個簡單的步驟:
-
在連接時使用STOMP客戶端來傳遞 authentication header 信息。
-
用一個?
ChannelInterceptor
?來處理 authentication header。
下一個例子使用服務(wù)器端配置來注冊一個自定義認(rèn)證攔截器。請注意,攔截器只需要在 CONNECT?Message
?上進(jìn)行認(rèn)證和設(shè)置 user header。Spring 會記錄并保存認(rèn)證的用戶,并將其與同一會話中的后續(xù)STOMP消息相關(guān)聯(lián)。下面的例子展示了如何注冊一個自定義認(rèn)證攔截器:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Authentication user = ... ; // access authentication header(s)
accessor.setUser(user);
}
return message;
}
});
}
}
此外,請注意,當(dāng)你使用Spring Security的消息授權(quán)時,目前你需要確保認(rèn)證?ChannelInterceptor
?配置的順序在Spring Security的前面。要做到這一點,最好是在自己的?WebSocketMessageBrokerConfigurer
?實現(xiàn)中聲明自定義攔截器,并標(biāo)明?@Order(Ordered.HIGHEST_PRECEDENCE + 99)
。
十四、 授權(quán)
Spring Security提供了?WebSocket 子協(xié)議授權(quán),它使用?ChannelInterceptor
?來根據(jù)消息中的 user header 授權(quán)。此外,Spring Session還提供了?WebSocket integration,確保用戶的HTTP會話不會過期,而WebSocket會話仍在活動。
十五、 User Destination
應(yīng)用程序可以發(fā)送針對特定用戶的消息,Spring的STOMP支持為此 destination 識別以?/user/
?為前綴的 destination。例如,一個客戶端可能會訂閱?/user/queue/position-updates
?destination。UserDestinationMessageHandler
?處理這個 destination,并將其轉(zhuǎn)換為用戶會話的唯一 destination(如?/queue/position-updates-user123
)。這為訂閱一個通用命名的 destination 提供了便利,同時,確保不會與訂閱同一 destination 的其他用戶發(fā)生沖突,以便每個用戶都能收到獨特的股票位置更新。
當(dāng)使用用戶 destination 時,重要的是要配置 broker 和應(yīng)用程序的 destination 前綴,如?啟用 STOMP, 所示,否則 broker 將處理 "/user" 前綴的消息,而這些消息只應(yīng)該由?UserDestinationMessageHandler ?處理。 |
在發(fā)送方面,消息可以被發(fā)送到一個 destination,如?/user/{username}/queue/position-updates
,這又被?UserDestinationMessageHandler
?翻譯成一個或多個destination,每個與用戶相關(guān)的會話都有一個。這讓應(yīng)用程序中的任何組件都可以發(fā)送針對特定用戶的消息,而不一定要知道他們的名字和通用destination。這也是通過一個注解和一個消息模板來支持的。
一個消息處理方法可以通過?@SendToUser
?注解將消息發(fā)送給與被處理的消息相關(guān)的用戶(在類級上也支持共享一個共同的 destination),如下例所示:
@Controller
public class PortfolioController {
@MessageMapping("/trade")
@SendToUser("/queue/position-updates")
public TradeResult executeTrade(Trade trade, Principal principal) {
// ...
return tradeResult;
}
}
如果用戶有一個以上的會話,默認(rèn)情況下,所有訂閱給定 destination 的會話都是目標(biāo)。然而,有時可能需要只針對發(fā)送被處理消息的會話。你可以通過將?broadcast
?屬性設(shè)置為?false
?來做到這一點,正如下面的例子所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handleAction() throws Exception{
// raise MyBusinessException here
}
@MessageExceptionHandler
@SendToUser(destinations="/queue/errors", broadcast=false)
public ApplicationError handleException(MyBusinessException exception) {
// ...
return appError;
}
}
雖然用戶 destination 通常意味著有一個經(jīng)過認(rèn)證的用戶,但這并不是嚴(yán)格的要求。不與認(rèn)證用戶相關(guān)聯(lián)的WebSocket會話可以訂閱用戶 destination。在這種情況下,?@SendToUser ?注解的行為與?broadcast=false ?時完全相同(即只針對發(fā)送被處理消息的會話)。 |
你可以從任何應(yīng)用組件向用戶 destination 發(fā)送消息,例如,通過注入由Java配置或XML命名空間創(chuàng)建的?SimpMessagingTemplate
。(如果需要用?@Qualifier
?來限定,bean的名字是?brokerMessagingTemplate
)。下面的例子顯示了如何做到這一點:
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
}
當(dāng)你將用戶 destination 與外部消息 broker 一起使用時,你應(yīng)該查看關(guān)于如何管理非活動隊列的 broker 文檔,以便在用戶會話結(jié)束時,所有獨特的用戶隊列都被刪除。例如,當(dāng)你使用諸如?/exchange/amq.direct/position-updates ?等 destination 時,RabbitMQ 會創(chuàng)建自動刪除隊列。因此,在這種情況下,客戶端可以訂閱到?/user/exchange/amq.direct/position-updates 。同樣地,ActiveMQ也有清除非活動 destination 的?配置選項。 |
在一個多應(yīng)用服務(wù)器的情況下,一個用戶 destination 可能會因為用戶連接到一個不同的服務(wù)器而保持未解析。在這種情況下,你可以配置一個 destination 來廣播未解析的消息,以便其他服務(wù)器有機(jī)會嘗試。這可以通過Java配置中?MessageBrokerRegistry
?的?userDestinationBroadcast
?屬性和XML中?message-broker
?元素的?user-destination-broadcast
?屬性完成。
十六、 消息的順序
來自 broker 的消息被發(fā)布到客戶端?OutboundChannel
,從那里被寫入WebSocket會話。由于該通道由?ThreadPoolExecutor
?支持,消息在不同的線程中被處理,而客戶端收到的結(jié)果序列可能與發(fā)布的確切順序不一致。
如果這是一個問題,請啟用?setPreservePublishOrder
?標(biāo)志,如下例所示:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
protected void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
}
}
下面的例子顯示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker preserve-publish-order="true">
<!-- ... -->
</websocket:message-broker>
</beans>
當(dāng)該標(biāo)志被設(shè)置時,同一客戶端會話中的消息會被一個一個地發(fā)布到?clientOutboundChannel
?上,這樣就保證了發(fā)布的順序。請注意,這將產(chǎn)生一個小的性能開銷,所以你應(yīng)該只在需要時才啟用它。
十七、事件
幾個?ApplicationContext
?事件(event)被發(fā)布,并可以通過實現(xiàn)Spring的?ApplicationListener
?接口來接收:
-
BrokerAvailabilityEvent
: 指示 broker 何時可用或不可用。雖然 “simple” broker 在啟動時立即可用,并在應(yīng)用程序運行時保持可用,但STOMP “broker relay” 可能會失去與全功能 broker 的連接(例如,如果 broker 被重新啟動)。broker 中繼有重新連接的邏輯,當(dāng)它回來時重新建立與 broker 的 "系統(tǒng)" 連接。因此,只要狀態(tài)從連接變?yōu)閿嚅_,反之亦然,就會發(fā)布這個事件。使用?SimpMessagingTemplate
?的組件應(yīng)該訂閱這個事件,并避免在 broker 不可用時發(fā)送消息。在任何情況下,他們應(yīng)該準(zhǔn)備好在發(fā)送消息時處理?MessageDeliveryException
。 -
SessionConnectEvent
: 當(dāng)收到一個新的 STOMP CONNECT 時發(fā)布,表示一個新的客戶端會話的開始。該事件包含代表連接的消息,包括會話ID、用戶信息(如果有的話),以及客戶端發(fā)送的任何自定義頭信息。這對于跟蹤客戶端會話是很有用的。訂閱此事件的組件可以用?SimpMessageHeaderAccessor
?或?StompMessageHeaderAccessor
?來包裝所包含的消息。 -
SessionConnectedEvent
: 在?SessionConnectEvent
?發(fā)生后不久,當(dāng) broker 發(fā)送一個 STOMP CONNECTED 幀作為對 CONNECT 的響應(yīng)時發(fā)布。在這一點上,STOMP會話可被視為完全建立。 -
SessionSubscribeEvent
: 當(dāng)收到一個新的 STOMP SUBSCRIBE 時發(fā)布。 -
SessionUnsubscribeEvent
: 當(dāng)收到一個新的 STOMP UNSUBSCRIBE 時發(fā)布。 -
SessionDisconnectEvent
: 在STOMP會話結(jié)束時發(fā)布。DISCONNECT 可能是由客戶端發(fā)送的,也可能是在WebSocket會話關(guān)閉時自動生成的。在某些情況下,該事件會在每個會話中發(fā)布一次以上。對于多個斷開事件,組件應(yīng)該是冪等的。
當(dāng)你使用一個全功能的 broker 時,如果 broker 暫時不可用,STOMP 的 “broker relay” 會自動重新連接 "系統(tǒng)" 的連接。然而,客戶端連接不會自動重新連接。假設(shè)心跳被啟用,客戶端通常會在10秒內(nèi)注意到 broker 沒有回應(yīng)??蛻舳诵枰獙崿F(xiàn)他們自己的重新連接邏輯。 |
十入、 攔截
事件?為STOMP連接的生命周期提供通知,但不是為每個客戶端消息提供通知。應(yīng)用程序也可以注冊一個?ChannelInterceptor
?來攔截任何消息和處理鏈中的任何部分。下面的例子顯示了如何攔截來自客戶端的入站消息:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new MyChannelInterceptor());
}
}
自定義?ChannelInterceptor
?可以使用?StompHeaderAccessor
?或?SimpMessageHeaderAccessor
?來訪問消息的信息,如下圖所示:
public class MyChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getStompCommand();
// ...
return message;
}
}
應(yīng)用程序也可以實現(xiàn)?ExecutorChannelInterceptor
,它是?ChannelInterceptor
?的一個子接口,在處理消息的線程中具有回調(diào)功能。?ChannelInterceptor
?對于發(fā)送到 channel 的每個消息都會被調(diào)用一次,而?ExecutorChannelInterceptor
?則在訂閱通道消息的每個?MessageHandler
?的線程中提供鉤子(hook)。
注意,與前面描述的?SessionDisconnectEvent
?一樣,DISCONNECT 消息可以來自客戶端,也可以在WebSocket會話關(guān)閉時自動生成。在某些情況下,攔截器可以為每個會話攔截該消息一次以上。對于多個斷開連接事件,組件應(yīng)該是冪等的。
十九、 STOMP 客戶端
Spring提供了一個基于 WebSocket 的 STOMP 客戶端和一個基于 TCP 的 STOMP 客戶端。
首先,你可以創(chuàng)建并配置?WebSocketStompClient
,如下例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的例子中,你可以用?SockJsClient
?取代?StandardWebSocketClient
,因為它也是?WebSocketClient
?的一個實現(xiàn)。SockJsClient
?可以使用WebSocket或基于HTTP的傳輸作為后退。更多詳情,請參見?SockJsClient。
接下來,你可以建立一個連接并為 STOMP 會話提供一個 handler,如下例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
當(dāng)會話可以使用時,handler 會被通知,如下例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦會話建立,任何 payload 都可以被發(fā)送,并通過配置的?MessageConverter
?進(jìn)行序列化,如下例所示:
session.send("/topic/something", "payload");
你也可以對 destination 進(jìn)行訂閱。subscribe
?方法需要一個 handler 來處理訂閱上的消息,并返回一個?Subscription
?handle,你可以用它來取消訂閱。對于每個收到的消息,handler 可以指定 payload 應(yīng)該被反序列化的目標(biāo)?Object
?類型,如下面的例子所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
為了啟用STOMP心跳,你可以用一個?TaskScheduler
?來配置?WebSocketStompClient
,并可選擇自定義心跳間隔(10 秒寫不活動,會導(dǎo)致發(fā)送心跳,10 秒讀不活動,會關(guān)閉連接)。
WebSocketStompClient
?只在不活動的情況下發(fā)送心跳,即沒有其他消息被發(fā)送時。當(dāng)使用外部 broker時,這可能是一個挑戰(zhàn),因為具有非 broker destination 的消息代表活動,但實際上并沒有轉(zhuǎn)發(fā)給 broker。在這種情況下,你可以在初始化?外部 Broker?時配置一個?TaskScheduler
,以確保在只有非 broker destination 的消息被發(fā)送時,心跳也被轉(zhuǎn)發(fā)到 broker。
當(dāng)你使用?WebSocketStompClient ?進(jìn)行性能測試以模擬來自同一臺機(jī)器的數(shù)千個客戶端時,請考慮關(guān)閉心跳,因為每個連接都會安排自己的心跳任務(wù),而這對于在同一臺機(jī)器上運行的大量客戶端來說并不優(yōu)化。 |
STOMP協(xié)議還支持 receipt,客戶端必須添加一個?receipt
?頭,服務(wù)器在處理完發(fā)送或訂閱后會用一個 RECEIPT 幀來回應(yīng)。為了支持這一點,StompSession
?提供了?setAutoReceipt(boolean)
?功能,使每一個后續(xù)的發(fā)送或訂閱事件都會添加一個?receipt
?頭。另外,你也可以手動添加一個?receipt
?頭到?StompHeaders
?中。發(fā)送和訂閱都會返回一個?Receiptable
?的實例,你可以用它來注冊接收成功和失敗的回調(diào)。對于這個功能,你必須用一個?TaskScheduler
?和?receipt
?過期前的時間量(默認(rèn)為15秒)來配置客戶端。
請注意,StompSessionHandler
?本身就是一個?StompFrameHandler
,這讓它除了處理信息處理中的異常的?handleException
?回調(diào)和處理包括?ConnectionLostException
?在內(nèi)的傳輸級錯誤的?handleTransportError
?之外,還可以處理ERROR幀。
二十、WebSocket Scope
每個WebSocket會話都有一個 attributes map。該 map 作為 header 附在入站的客戶端消息上,可以從 controller 方法中訪問,如下例所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handle(SimpMessageHeaderAccessor headerAccessor) {
Map<String, Object> attrs = headerAccessor.getSessionAttributes();
// ...
}
}
你可以在?websocket
?scope 內(nèi)聲明一個Spring管理的Bean。你可以將 WebSocket scope 的 Bean 注入 controller 和在?clientInboundChannel
?上注冊的任何通道攔截器中。這些通常是 singleton,生命周期比任何單獨的 WebSocket 會話長。因此,你需要為 WebSocket scope 的 Bean 使用 scope proxy 模式,如下例所示:
@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
@PostConstruct
public void init() {
// Invoked after dependencies injected
}
// ...
@PreDestroy
public void destroy() {
// Invoked when the WebSocket session ends
}
}
@Controller
public class MyController {
private final MyBean myBean;
@Autowired
public MyController(MyBean myBean) {
this.myBean = myBean;
}
@MessageMapping("/action")
public void handle() {
// this.myBean from the current WebSocket session
}
}
與任何自定義 scope 一樣,Spring在第一次從 controller 訪問 MyBean 時初始化一個新的?MyBean
?實例,并將該實例存儲在WebSocket會話屬性中。隨后會返回相同的實例,直到會話結(jié)束。如前面的例子所示,WebSocket scope 的Bean有所有Spring生命周期方法被調(diào)用。
二十一、 性能
談到性能,沒有銀彈。許多因素都會影響它,包括消息的大小和數(shù)量,應(yīng)用程序方法是否執(zhí)行需要阻塞的工作,以及外部因素(如網(wǎng)絡(luò)速度和其他問題)。本節(jié)的目的是提供一個可用配置選項的概述,以及關(guān)于如何推理擴(kuò)展的一些想法。
在一個消息傳遞的應(yīng)用程序中,消息是通過通道傳遞的,用于由線程池支持的異步執(zhí)行。配置這樣一個應(yīng)用程序需要對通道和消息流有很好的了解。因此,我們建議回顧一下?消息流。
最明顯的地方是配置支持?clientInboundChannel
?和?clientOutboundChannel
?的線程池。默認(rèn)情況下,兩者都被配置為可用處理器數(shù)量的兩倍。
如果注解方法中的消息處理主要是由CPU約束的,那么?clientInboundChannel
?的線程數(shù)量應(yīng)該保持與處理器的數(shù)量接近。如果它們所做的工作更多的是IO-bound,需要在數(shù)據(jù)庫或其他外部系統(tǒng)上阻塞或等待,那么線程池的大小可能需要增加。
一個常見的混淆點是,配置核心池大?。ɡ?,10)和最大池大?。ɡ?,20)的結(jié)果是線程池有10到20個線程。事實上,如果容量保持在? 請參閱? |
在?clientOutboundChannel
?方面,它是向WebSocket客戶端發(fā)送消息的全部內(nèi)容。如果客戶處于告速網(wǎng)絡(luò)上,線程數(shù)應(yīng)保持接近可用處理器的數(shù)量。如果他們速度慢或帶寬低,他們需要更長的時間來消費消息,給線程池帶來負(fù)擔(dān)。因此,增加線程池的大小成為必要。
雖然?clientInboundChannel
?的工作量是可以預(yù)測的—?畢竟它是基于應(yīng)用程序所做的事情—?但如何配置 "clientOutboundChannel" 則比較困難,因為它是基于應(yīng)用程序無法控制的因素。出于這個原因,有兩個額外的屬性與消息的發(fā)送有關(guān):sendTimeLimit
?和?sendBufferSizeLimit
。你可以使用這些方法來配置在向客戶端發(fā)送消息時允許發(fā)送多長時間以及可以緩沖多少數(shù)據(jù)。
一般的想法是,在任何時候,只有一個線程可以用來向客戶端發(fā)送。同時,所有額外的消息都會被緩沖,你可以使用這些屬性來決定允許發(fā)送一個消息需要多長時間,以及在此期間有多少數(shù)據(jù)可以被緩沖。關(guān)于其他重要的細(xì)節(jié),請參見javadoc和XML schema 的文檔。
下面的例子顯示了一個可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
}
// ...
}
下面的例子顯示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport send-timeout="15000" send-buffer-size="524288" />
<!-- ... -->
</websocket:message-broker>
</beans>
你也可以使用前面顯示的WebSocket傳輸配置來配置傳入STOMP消息的最大允許大小。理論上,WebSocket消息的大小幾乎沒有限制。在實踐中,WebSocket服務(wù)器會施加限制—?例如,Tomcat上的8K和Jetty上的64K。出于這個原因,STOMP客戶端(如JavaScript?webstomp-client?和其他客戶端)將較大的STOMP消息以16K的邊界分割,并作為多個WebSocket消息發(fā)送,這需要服務(wù)器進(jìn)行緩沖和重新組合。
Spring的基于 WebSocket 的 STOMP 支持做到了這一點,因此應(yīng)用程序可以配置STOMP消息的最大尺寸,而不考慮WebSocket服務(wù)器特定的消息尺寸。請記住,如果有必要,WebSocket消息的大小會被自動調(diào)整,以確保它們至少可以承載16K的WebSocket消息。
下面的例子顯示了一種可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(128 * 1024);
}
// ...
}
下面的例子顯示了前述例子的XML等效配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport message-size="131072" />
<!-- ... -->
</websocket:message-broker>
</beans>
關(guān)于擴(kuò)展的一個重要觀點涉及到使用多個應(yīng)用程序?qū)嵗D壳?,你不能?simple broker 做到這一點。但是,當(dāng)你使用全功能 broker(如 RabbitMQ)時,每個應(yīng)用程序?qū)嵗紩B接到 broker,并且從一個應(yīng)用程序?qū)嵗龔V播的消息可以通過 broker 廣播給通過任何其他應(yīng)用程序?qū)嵗B接的 WebSocket 客戶端。
二十二、 監(jiān)控
當(dāng)你使用?@EnableWebSocketMessageBroker
?或?<websocket:message-broker>
?時,關(guān)鍵的基礎(chǔ)設(shè)施組件會自動收集統(tǒng)計信息和計數(shù)器,從而提供對應(yīng)用程序內(nèi)部狀態(tài)的重要洞察力。該配置還聲明了一個?WebSocketMessageBrokerStats
?類型的Bean,它在一個地方收集所有可用的信息,并默認(rèn)每30分鐘在?INFO
?級別記錄一次。這個Bean可以通過Spring的?MBeanExporter
?導(dǎo)出到JMX,以便在運行時查看(例如,通過JDK的?jconsole
)。下面的列表總結(jié)了可用的信息:
Client WebSocket Sessions
Current
表示當(dāng)前有多少個客戶端會話,該計數(shù)按WebSocket與HTTP流和輪詢SockJS會話進(jìn)一步細(xì)分。
Total
表示已經(jīng)建立的總會話數(shù)量。
Abnormally Closed
Connect Failures
已建立的會話,但在60秒內(nèi)沒有收到任何信息后被關(guān)閉。這通常是代理或網(wǎng)絡(luò)問題的一個跡象。
Send Limit Exceeded
會話在超過配置的發(fā)送超時或發(fā)送緩沖區(qū)限制后關(guān)閉,這可能發(fā)生在慢速客戶端上(見前一節(jié))。
Transport Errors
在發(fā)生傳輸錯誤后關(guān)閉的會話,例如未能讀取或?qū)懭隬ebSocket連接或HTTP請求或響應(yīng)。
STOMP Frames
處理的CONNECT、CONNECTED和DISCONNECT幀的總數(shù),表示有多少客戶端在STOMP層連接。請注意,當(dāng)會話被異常關(guān)閉或客戶端關(guān)閉而沒有發(fā)送DISCONNECT幀時,DISCONNECT計數(shù)可能較低。
STOMP Broker Relay
TCP Connections
表示有多少代表客戶WebSocket會話的TCP連接被建立到代理。這應(yīng)該等于客戶WebSocket會話的數(shù)量+1個額外的共享 "系統(tǒng)" 連接,用于從應(yīng)用程序內(nèi)發(fā)送消息。
STOMP Frames
代表客戶端轉(zhuǎn)發(fā)到 broker 處或從 broker 處接收的 CONNECT、CONNECTED 和 DISCONNECT 幀的總數(shù)。請注意,無論客戶WebSocket會話是如何關(guān)閉的,DISCONNECT 幀都會被發(fā)送給 broker。因此,較低的 DISCONNECT 幀計數(shù)表明 broker 正在主動關(guān)閉連接(可能是因為心跳沒有及時到達(dá),無效的輸入幀,或其他問題)。
Client Inbound Channel
來自支持?clientInboundChannel
?的線程池的統(tǒng)計數(shù)據(jù),提供了對傳入消息處理的健康狀況的洞察力。任務(wù)在這里排隊是一個跡象,表明應(yīng)用程序可能太慢,無法處理消息。如果有I/O綁定的任務(wù)(例如,緩慢的數(shù)據(jù)庫查詢,對第三方REST API的HTTP請求,等等),考慮增加線程池大小。
Client Outbound Channel
來自支持?clientOutboundChannel
?的線程池的統(tǒng)計數(shù)據(jù),提供了對向客戶廣播消息的健康狀況的洞察力。任務(wù)在這里排隊是一個跡象,表明客戶對消息的消費太慢了。解決這個問題的方法之一是增加線程池的大小,以適應(yīng)預(yù)期的并發(fā)慢速客戶端的數(shù)量。另一個辦法是減少發(fā)送超時和發(fā)送緩沖區(qū)大小的限制(見上一節(jié))。
SockJS Task Scheduler
來自SockJS任務(wù)調(diào)度器的線程池的統(tǒng)計數(shù)據(jù),用于發(fā)送心跳。注意,當(dāng)心跳在STOMP級別上協(xié)商時,SockJS的心跳被禁用。
二十三、 測試
當(dāng)你使用Spring的基于 WebSocket 的 STOMP 支持時,有兩種主要方法來測試應(yīng)用程序。第一種是編寫服務(wù)器端測試,以驗證 controller 的功能和它們注解的消息處理方法。第二種是編寫完整的端到端測試,包括運行一個客戶端和一個服務(wù)器。
這兩種方法并不相互排斥。相反,每種方法在整個測試策略中都有其位置。服務(wù)器端的測試更有針對性,更容易編寫和維護(hù)。另一方面,端到端的集成測試更完整,測試的內(nèi)容更多,但它們也更需要編寫和維護(hù)。
服務(wù)器端測試的最簡單形式是編寫 controller 單元測試。然而,這還不夠有用,因為 controller 所做的很多事情都取決于它的注解。純粹的單元測試根本無法測試這些。
理想情況下,被測試的 controller 應(yīng)該在運行時被調(diào)用,就像使用Spring MVC測試框架測試處理HTTP請求的 controller 的方法一樣—?也就是說,不運行Servlet容器,而是依靠Spring框架來調(diào)用注解的 controller。與Spring MVC測試一樣,你在這里有兩種可能的選擇,要么使用 "基于 context",要么使用 "獨立 "設(shè)置:
-
在Spring TestContext框架的幫助下加載實際的Spring配置,注入?
clientInboundChannel
?作為測試字段,并使用它來發(fā)送 controller 方法所要處理的消息。 -
手動設(shè)置調(diào)用 controller 所需的最小Spring框架基礎(chǔ)設(shè)施(即?
SimpAnnotationMethodMessageHandler
),并將 controller 的消息直接傳遞給它。
這兩種設(shè)置情況在?股票投資組合?樣本應(yīng)用程序的測試中都有展示。文章來源:http://www.zghlxwxcb.cn/news/detail-758343.html
第二種方法是創(chuàng)建端到端的集成測試。為此,你需要在嵌入式模式下運行WebSocket服務(wù)器,并作為WebSocket客戶端連接到它,發(fā)送包含STOMP框架的WebSocket消息。?股票投資組合示例應(yīng)用程序的測試?也展示了這種方法,它使用Tomcat作為嵌入式WebSocket服務(wù)器和一個簡單的STOMP客戶端進(jìn)行測試。文章來源地址http://www.zghlxwxcb.cn/news/detail-758343.html
到了這里,關(guān)于WebSocket—STOMP詳解(官方原版)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!