在進行這節(jié)的學習前,我們先來回顧一下,前面三節(jié),我們學了些什么。
第 34 講,我們介紹了如何通過 RESTful API 在交易所下單;第 35 講,我們講解了如何通過 Websocket ,來獲取交易所的 orderbook 數(shù)據(jù);第 36 講,我們介紹了如何實現(xiàn)一個策略,以及如何對策略進行歷史回測。
事實上,到這里,一個簡單的、可以運作的量化交易系統(tǒng)已經成型了。你可以對策略進行反復修改,期待能得到不錯的 PnL。但是,對于一個完善的量化交易系統(tǒng)來說,只有基本骨架還是不夠的。
在大型量化交易公司,系統(tǒng)一般是分布式運行的,各個模塊獨立在不同的機器上,然后互相連接來實現(xiàn)。即使是個人的交易系統(tǒng),在進行諸如高頻套利等算法時,也需要將執(zhí)行層布置在靠近交易所的機器節(jié)點上。
所以,從今天這節(jié)開始,我們繼續(xù)回到 Python 的技術棧,從量化交易系統(tǒng)這個角度切入,講解如何實現(xiàn)分布式系統(tǒng)之間的復雜協(xié)作。
中間件
我們先來介紹一下中間件這個概念。中間件,是將技術底層工具和應用層進行連接的組件。它要實現(xiàn)的效果則是,讓我們這些需要利用服務的工程師,不必去關心底層的具體實現(xiàn)。我們只需要拿著中間件的接口來用就好了。
這個概念聽起來并不難理解,我們再舉個例子讓你徹底明白。比如拿數(shù)據(jù)庫來說,底層數(shù)據(jù)庫有很多很多種,從關系型數(shù)據(jù)庫 MySQL 到非關系型數(shù)據(jù)庫 NoSQL,從分布式數(shù)據(jù)庫 Spanner 到內存數(shù)據(jù)庫 Redis,不同的數(shù)據(jù)庫有不同的使用場景,也有著不同的優(yōu)缺點,更有著不同的調用方式。那么中間件起什么作用呢?
中間件,等于在這些不同的數(shù)據(jù)庫上加了一層邏輯,這一層邏輯專門用來和數(shù)據(jù)庫打交道,而對外只需要暴露同一個接口即可。這樣一來,上層的程序員調用中間件接口時,只需要讓中間件指定好數(shù)據(jù)庫即可,其他參數(shù)完全一致,極大地方便了上層的開發(fā);同時,下層技術棧在更新?lián)Q代的時候,也可以做到和上層完全分離,不影響程序員的使用。
它們之間的邏輯關系,可以參照下面畫的這張圖。習慣性把中間件的作用調侃為:沒有什么事情是加一層解決不了的;如果有,那就加兩層。
當然,這只是其中一個例子,也只是中間件的一種形式。事實上,比如在阿里,中間件主要有分布式關系型數(shù)據(jù)庫 DRDS、消息隊列和分布式服務這么三種形式。而我們今天,主要會用到消息隊列,因為它非常符合量化交易系統(tǒng)的應用場景,即事件驅動模型。
消息隊列
那么,什么是消息隊列呢?一如其名,消息,即互聯(lián)網信息傳遞的個體;而隊列,學過算法和數(shù)據(jù)結構的你,應該很清楚這個 FIFO(先進先出)的數(shù)據(jù)結構吧。
簡而言之,消息隊列就是一個臨時存放消息的容器,有人向消息隊列中推送消息;有人則監(jiān)聽消息隊列,發(fā)現(xiàn)新消息就會取走。根據(jù)我們剛剛對中間件的解釋,清晰可見,消息隊列也是一種中間件。
目前,市面上使用較多的消息隊列有 RabbitMQ、Kafka、RocketMQ、ZMQ 等。不過今天,只介紹最常用的 ZMQ 和 Kafka。
我們先來想想,消息隊列作為中間件有什么特點呢?
首先是嚴格的時序性。剛剛說了,隊列是一種先進先出的數(shù)據(jù)結構,你丟給它 1, 2, 3,然后另一個人從里面取數(shù)據(jù),那么取出來的一定也是 1, 2, 3,嚴格保證了先進去的數(shù)據(jù)先出去,后進去的數(shù)據(jù)后出去。顯然,這也是消息機制中必須要保證的一點,不然顛三倒四的結果一定不是我們想要的。
說到隊列的特點,簡單提一句,與“先進先出“相對的是棧這種數(shù)據(jù)結構,它是先進后出的,你丟給它 1, 2, 3,再從里面取出來的時候,拿到的就是3, 2, 1了,這一點一定要區(qū)分清楚。
其次,是分布式網絡系統(tǒng)的老生常談問題。如何保證消息不丟失?如何保證消息不重復?這一切,消息隊列在設計的時候都已經考慮好了,你只需要拿來用就可以,不必過多深究。
不過,很重要的一點,消息隊列是如何降低系統(tǒng)復雜度,起到中間件的解耦作用呢?我們來看下面這張圖。
消息隊列的模式是發(fā)布和訂閱,一個或多個消息發(fā)布者可以發(fā)布消息,一個或多個消息接受者可以訂閱消息。 從圖中你可以看到,消息發(fā)布者和消息接受者之間沒有直接耦合,其中,
消息發(fā)布者將消息發(fā)送到分布式消息隊列后,就結束了對消息的處理;
消息接受者從分布式消息隊列獲取該消息后,即可進行后續(xù)處理,并不需要探尋這個消息從何而來。
至于新增業(yè)務的問題,只要你對這類消息感興趣,即可訂閱該消息,對原有系統(tǒng)和業(yè)務沒有任何影響,所以也就實現(xiàn)了業(yè)務的可擴展性設計。
講了這么多概念層的東西,想必你迫不及待地想看具體代碼了吧。接下來,我們來看一下 ZMQ 的實現(xiàn)。
ZMQ
先來看 ZMQ,這是一個非常輕量級的消息隊列實現(xiàn)。
作者 Pieter Hintjens 是一位大牛,他本人的經歷也很傳奇,2010 年診斷出膽管癌,并成功做了手術切除。但 2016 年 4 月,卻發(fā)現(xiàn)癌癥大面積擴散到了肺部,已經無法治療。他寫的最后一篇通信模式是關于死亡協(xié)議的,之后在比利時選擇接受安樂死。
ZMQ 是一個簡單好用的傳輸層,它有三種使用模式:
Request - Reply 模式;
Publish - Subscribe 模式;
Parallel Pipeline 模式。
第一種模式很簡單,client 發(fā)消息給 server,server 處理后返回給 client,完成一次交互。這個場景你一定很熟悉吧,沒錯,和 HTTP 模式非常像,所以這里我就不重點介紹了。至于第三種模式,與今天內容無關,這里也不做深入講解。
我們需要詳細來看的是第二種,即“PubSub”模式。下面是它的具體實現(xiàn),代碼很清晰,應該很容易理解:
# 訂閱者 1
import zmq
def run():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:6666')
socket.setsockopt_string(zmq.SUBSCRIBE, '')
print('client 1')
while True:
msg = socket.recv()
print("msg: %s" % msg)
if __name__ == '__main__':
run()
########## 輸出 ##########
client 1
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'
# 訂閱者 2
import zmq
def run():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:6666')
socket.setsockopt_string(zmq.SUBSCRIBE, '')
print('client 2')
while True:
msg = socket.recv()
print("msg: %s" % msg)
if __name__ == '__main__':
run()
########## 輸出 ##########
client 2
msg: b'server cnt 1'
msg: b'server cnt 2'
msg: b'server cnt 3'
msg: b'server cnt 4'
msg: b'server cnt 5'
# 發(fā)布者
import time
import zmq
def run():
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind('tcp://*:6666')
cnt = 1
while True:
time.sleep(1)
socket.send_string('server cnt {}'.format(cnt))
print('send {}'.format(cnt))
cnt += 1
if __name__ == '__main__':
run()
########## 輸出 ##########
send 1
send 2
send 3
send 4
send 5
這里要注意的一點是,如果你想要運行代碼,請先運行兩個訂閱者,然后再打開發(fā)布者。
接下來,來簡單講解一下。
對于訂閱者,我們要做的是創(chuàng)建一個 zmq Context,連接 socket 到指定端口。其中,setsockopt_string() 函數(shù)用來過濾特定的消息,而下面這行代碼:
socket.setsockopt_string(zmq.SUBSCRIBE, '')
則表示不過濾任何消息。最后,我們調用 socket.recv() 來接受消息就行了,這條語句會阻塞在這里,直到有新消息來臨。
對于發(fā)布者,我們同樣要創(chuàng)建一個 zmq Context,綁定到指定端口,不過請注意,這里用的是 bind 而不是 connect。因為在任何情況下,同一個地址端口 bind 只能有一個,但卻可以有很多個 connect 鏈接到這個地方。初始化完成后,再調用 socket.send_string ,即可將我們想要發(fā)送的內容發(fā)送給 ZMQ。
當然,這里還有幾個需要注意的地方。首先,有了 send_string,我們其實已經可以通過 JSON 序列化,來傳遞幾乎我們想要的所有數(shù)據(jù)結構,這里的數(shù)據(jù)流結構就已經很清楚了。
另外,把發(fā)布者的 time.sleep(1) 放在 while 循環(huán)的最后,嚴格來說應該是不影響結果的。這里你可以嘗試做個實驗,看看會發(fā)生什么。
你還可以思考下另一個問題,如果這里是多個發(fā)布者,那么 ZMQ 應該怎么做呢?
Kafka
接著我們再來看一下 Kafka。
通過代碼實現(xiàn)可以發(fā)現(xiàn),ZMQ 的優(yōu)點主要在輕量、開源和方便易用上,但在工業(yè)級別的應用中,大部分人還是會轉向 Kafka 這樣的有充足支持的輪子上。
相比而言,Kafka 提供了點對點網絡和發(fā)布訂閱模型的支持,這也是用途最廣泛的兩種消息隊列模型。而且和 ZMQ 一樣,Kafka 也是完全開源的,因此你也能得到開源社區(qū)的充分支持。
Kafka 的代碼實現(xiàn),和 ZMQ 大同小異,這里就不專門講解了。
基于消息隊列的 Orderbook 數(shù)據(jù)流
最后回到我們的量化交易系統(tǒng)上。
量化交易系統(tǒng)中,獲取 orderbook 一般有兩種用途:策略端獲取實時數(shù)據(jù),用來做決策;備份在文件或者數(shù)據(jù)庫中,方便讓策略和回測系統(tǒng)將來使用。
如果我們直接單機監(jiān)聽交易所的消息,風險將會變得很大,這在分布式系統(tǒng)中叫做 Single Point Failure。一旦這臺機器出了故障,或者網絡連接突然中斷,我們的交易系統(tǒng)將立刻暴露于風險中。
于是,一個很自然的想法就是,我們可以在不同地區(qū)放置不同的機器,使用不同的網絡同時連接到交易所,然后將這些機器收集到的信息匯總、去重,最后生成我們需要的準確數(shù)據(jù)。相應的拓撲圖如下:
當然,這種做法也有很明顯的缺點:因為要同時等待多個數(shù)據(jù)服務器的數(shù)據(jù),再加上消息隊列的潛在處理延遲和網絡延遲,對策略服務器而言,可能要增加幾十到數(shù)百毫秒的延遲。如果是一些高頻或者滑點要求比較高的策略,這種做法需要謹慎考慮。
但是,對于低頻策略、波段策略,這種延遲換來的整個系統(tǒng)的穩(wěn)定性和架構的解耦性,還是非常值得的。不過,你仍然需要注意,這種情況下,消息隊列服務器有可能成為瓶頸,也就是剛剛所說的 Single Point Failure,一旦此處斷開,依然會將系統(tǒng)置于風險之中。
事實上,我們可以使用一些很成熟的系統(tǒng),例如阿里的消息隊列,AWS 的 Simple Queue Service 等等,使用這些非常成熟的消息隊列系統(tǒng),風險也將會最小化。
總結
這節(jié)我們分析了現(xiàn)代化軟件工程領域中的中間件系統(tǒng),以及其中的主要應用——消息隊列。我們講解了最基礎的消息隊列的模式,包括點對點模型、發(fā)布者訂閱者模型,和一些其他消息隊列自己支持的模型。文章來源:http://www.zghlxwxcb.cn/news/detail-794877.html
在真實的項目設計中,我們要根據(jù)自己的產品需求,來選擇使用不同的模型;同時也要在編程實踐中,加深對不同技能點的了解,對系統(tǒng)復雜性進行解耦,這才是設計出高質量系統(tǒng)的必經之路。文章來源地址http://www.zghlxwxcb.cn/news/detail-794877.html
到了這里,關于37 | Kafka & ZMQ:自動化交易流水線的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!