距離2020年已經(jīng)過去很久了,各大編譯器對(duì)于C++20各項(xiàng)標(biāo)準(zhǔn)的支持也日趨完善,無棧協(xié)程也是其中之一,所以我就嘗試著拿協(xié)程與io_uring
實(shí)現(xiàn)了一下proactor模式,這篇文章用來記錄一下我的設(shè)計(jì)和想法。除此之外,我們能在網(wǎng)絡(luò)上找到許多優(yōu)秀的C++20協(xié)程的教程以及許多優(yōu)秀的協(xié)程應(yīng)用(庫),但從協(xié)程入門到架構(gòu)出成熟的應(yīng)用(庫)之間還存在著不小的鴻溝,而直接去啃大型工程的源代碼絕對(duì)不算是一種高效率的學(xué)習(xí)方式。所以,如果這篇文章能夠在這方面提供一定的幫助的話那就再好不過了。
正如上所述,這篇文章是介紹基于C++20協(xié)程實(shí)現(xiàn)異步IO的,而不是介紹C++20協(xié)程的,因此有一定的閱讀門檻。在閱讀之前,你應(yīng)當(dāng)至少熟悉一下C++20協(xié)程。
為什么要使用協(xié)程
因?yàn)閰f(xié)程能夠讓我們像寫同步IO那樣來實(shí)現(xiàn)異步IO,如下所示:
auto foo(tcp_connection connection) -> task<void> {
char buffer[1024];
int result = co_await connection.recv(buffer, sizeof(buffer));
// do something...
result = co_await connection.send(buffer, result);
// do something...
co_return;
}
如果我們合理地實(shí)現(xiàn)了協(xié)程的掛起、恢復(fù)等操作,那么當(dāng)我們執(zhí)行co_await connection.recv
時(shí),我們實(shí)際上希望代碼執(zhí)行的操作如下:
- 告訴操作系統(tǒng),監(jiān)聽
recv
操作,等待對(duì)方發(fā)送數(shù)據(jù); - 掛起當(dāng)前協(xié)程;
- 去處理別的事情。
當(dāng)操作系統(tǒng)接收到recv
的數(shù)據(jù)時(shí),執(zhí)行以下操作:
- 處理recv,把數(shù)據(jù)讀進(jìn)來;
- 恢復(fù)之前掛起的協(xié)程,從掛起的地方恢復(fù)執(zhí)行。
這就是我們需要協(xié)程做的事情。如果你熟悉reactor模式的話,這應(yīng)該并不陌生——我們只是把回調(diào)函數(shù)換成了協(xié)程而已。那么回到這一部分的標(biāo)題——我們?yōu)槭裁匆褂脜f(xié)程而不是回調(diào)函數(shù)呢?——因?yàn)槭褂脜f(xié)程寫出來的代碼更好看,也更好維護(hù),僅此而已。
關(guān)于為什么要使用異步IO:異步IO能夠提高程序的吞吐量。試想一下一臺(tái)基于同步IO的HTTP服務(wù)器,一種不難想到的實(shí)現(xiàn)方式是每accept
一個(gè)連接,就創(chuàng)建一個(gè)新的線程來處理這個(gè)連接的IO,最后當(dāng)這個(gè)連接斷開時(shí)銷毀這個(gè)線程。這么實(shí)現(xiàn)當(dāng)然可以,但創(chuàng)建和銷毀線程的開銷是很大的,而且這要求線程調(diào)度器能夠很好地分配線程之間的時(shí)間。使用IO多路復(fù)用的方式能夠利用有限(甚至單線程)處理許多連接的IO,而不至于浪費(fèi)過多的資源。
關(guān)于協(xié)程和回調(diào)函數(shù)的性能:我想二者應(yīng)當(dāng)是差不多的,或者協(xié)程可能還會(huì)更差一點(diǎn),因?yàn)閽炱饏f(xié)程和恢復(fù)協(xié)程需要執(zhí)行一些額外操作。不過既然性能還沒有緊張到需要去摳dpdk,那么和這一點(diǎn)點(diǎn)的性能優(yōu)勢(shì)相比較的話,代碼的可維護(hù)性和可讀性絕對(duì)也是不容忽視的問題。
關(guān)于異步IO的性能:我們通常講異步IO性能更好指的是吞吐量,而不是低延時(shí)。不論是reactor模式還是proactor模式,其設(shè)計(jì)主旨都是要讓CPU在等待IO的時(shí)候去處理別的事情,不要讓CPU閑下來。如果低延時(shí)很重要的話,應(yīng)當(dāng)考慮使用同步IO與輪詢的方式。
設(shè)計(jì)思路
根據(jù)第一部分,設(shè)計(jì)的基調(diào)就能夠定下來了。我們重新考慮一下需要做的事情:
- 當(dāng)我們執(zhí)行到
co_await read(...)
等異步IO時(shí),掛起當(dāng)前協(xié)程,去處理其他事情; - 當(dāng)異步IO執(zhí)行完畢時(shí),恢復(fù)協(xié)程的執(zhí)行。
仔細(xì)思考一下上述兩點(diǎn),我們就能夠得到所有要做的事情:
- 我們需要適時(shí)掛起協(xié)程,所以首先我們要實(shí)現(xiàn)協(xié)程
task
; - 協(xié)程可能會(huì)調(diào)用協(xié)程,所以需要維護(hù)一下協(xié)程的調(diào)用棧(我是在
promise
里維護(hù)的); - 協(xié)程是用來處理異步IO的,所以我們需要有一些組件來處理
io_uring
的IO(我是在io_context_worker
中處理的); - 當(dāng)異步IO執(zhí)行完畢時(shí),需要有什么東西恢復(fù)協(xié)程的執(zhí)行(這也是在
io_context_worker
中處理的); - 當(dāng)整個(gè)協(xié)程執(zhí)行完畢時(shí),需要銷毀協(xié)程(這也是在
io_context_worker
中處理的)。
在繼續(xù)閱讀之前,我先貼一下代碼。對(duì)照著代碼看的話會(huì)舒服一些:GitHub。
task與promise
task
和promise
均在coco/task.hpp
中定義。我對(duì)task
的定位正如協(xié)程最基本的功能——能夠掛起和恢復(fù)的函數(shù)。task
類本身只是對(duì)std::coroutine_handle
的簡(jiǎn)易封裝。在這里我只介紹一下task
的operator co_await
。
task
的operator co_await
只是返回task_awaitable
,所以co_await
處理的重點(diǎn)實(shí)際上是在task_awaitable
中實(shí)現(xiàn)的??紤]一下,當(dāng)我們co_await
一個(gè)task
時(shí),我們究竟是在干什么:
- 掛起當(dāng)前協(xié)程
- 維護(hù)協(xié)程的調(diào)用棧
- 啟動(dòng)被
co_await
的協(xié)程
在task_awaitable::await_suspend()
中很容易看出這三點(diǎn):
template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
// Set caller for this coroutine.
promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
promise_base &caller_base = static_cast<promise_base &>(caller.promise());
base.m_caller_or_top = &caller_base;
// Maintain stack bottom and top.
promise_base *stack_bottom = caller_base.m_stack_bottom;
assert(stack_bottom == stack_bottom->m_stack_bottom);
base.m_stack_bottom = stack_bottom;
stack_bottom->m_caller_or_top = &base;
return m_coroutine;
}
這里著重講一下維護(hù)協(xié)程的調(diào)用棧。對(duì)于協(xié)程而言,至少存在一個(gè)協(xié)程,它是在協(xié)程外創(chuàng)建的(比如main
函數(shù))。因?yàn)樗辉趨f(xié)程中,所以它也無法被co_await
,這個(gè)協(xié)程我們稱之為協(xié)程的棧底。在這個(gè)協(xié)程的執(zhí)行過程中,它可能會(huì)創(chuàng)建和執(zhí)行新的協(xié)程。當(dāng)新的協(xié)程執(zhí)行完畢時(shí),它們會(huì)被清理,并且將執(zhí)行權(quán)交還給調(diào)用者,這與普通函數(shù)的調(diào)用棧是一樣的,只不過這個(gè)功能需要我們自己來實(shí)現(xiàn)。
因?yàn)?code>promise在內(nèi)存中的位置是不可移動(dòng)的(我禁止了promise
的拷貝與移動(dòng)),所以我直接采用了類似鏈表的方式將協(xié)程的調(diào)用棧串了起來。在promise_base
中,有兩個(gè)成員變量用來維護(hù)這個(gè)調(diào)用棧:
promise_base *m_caller_or_top;
promise_base *m_stack_bottom;
因?yàn)榈谝粋€(gè)變量被復(fù)用了(具備不同的含義),所以可能有點(diǎn)亂。對(duì)于棧底協(xié)程而言,m_caller_or_top
指向當(dāng)前調(diào)用棧的棧頂協(xié)程,對(duì)于其他協(xié)程而言,m_caller_or_top
指向自己的調(diào)用者(父協(xié)程)。這么設(shè)計(jì)是因?yàn)?strong>棧底協(xié)程不存在調(diào)用者,所以就干脆用這個(gè)變量存一下棧頂了。m_stack_bottom
顧名思義,就是指向棧底的協(xié)程。對(duì)于棧底協(xié)程而言,這個(gè)變量指向的就是它自己了。
有了m_caller_or_top
,當(dāng)一個(gè)協(xié)程執(zhí)行完畢時(shí),就能方便地找到它的父協(xié)程并交換執(zhí)行權(quán)。有了m_stack_bottom
和m_caller_or_top
,我們就能很方便地找到協(xié)程的棧底和棧頂。當(dāng)需要恢復(fù)task
時(shí),就能夠保證總是恢復(fù)棧頂?shù)膮f(xié)程。
當(dāng)協(xié)程執(zhí)行完畢時(shí),需要將控制權(quán)交還給父協(xié)程。我們考慮一下交還控制權(quán)需要做的事情:
- 維護(hù)調(diào)用棧,變更棧頂
- 如果不是棧底,則恢復(fù)父協(xié)程的執(zhí)行
協(xié)程執(zhí)行完畢時(shí)會(huì)去嘗試執(zhí)行promise
的final_suspend()
,因此這部分代碼在promise
的final_suspend()
中實(shí)現(xiàn)。final_suspend()
返回的類型叫promise_awaitable
,其對(duì)應(yīng)的代碼如下:
template <class Promise>
auto promise_awaitable::await_suspend(
std::coroutine_handle<Promise> coroutine) noexcept
-> std::coroutine_handle<> {
promise_base &base = static_cast<promise_base &>(coroutine.promise());
promise_base *stack = base.m_stack_bottom;
// Stack bottom completed. Nothing to resume.
if (stack == &base)
return std::noop_coroutine();
// Set caller coroutine as the top of the stack.
promise_base *caller = base.m_caller_or_top;
stack->m_caller_or_top = caller;
// Resume caller.
return caller->m_coroutine;
}
這段代碼應(yīng)該非常易懂,不過我們很容易聯(lián)想到一個(gè)問題:既然交還了控制權(quán),那么它是在何時(shí)銷毀的?
其實(shí)這也不難想到,協(xié)程是由父協(xié)程銷毀的。協(xié)程的返回值存放在promise
中,當(dāng)父協(xié)程co_await sometask
時(shí),父協(xié)程還需要讀取子協(xié)程的promise
以獲取返回值。當(dāng)子協(xié)程task<T>
析構(gòu)時(shí),子協(xié)程才真正被銷毀。
io_context的設(shè)計(jì)
既然協(xié)程需要異步地處理IO,那么必然需要個(gè)處理IO的地方,就是io_context
。io_context
維護(hù)了一個(gè)線程池,線程池中每一個(gè)線程均執(zhí)行一個(gè)worker
,每個(gè)worker
均維護(hù)一個(gè)io_uring
來處理本線程的IO事件和協(xié)程。當(dāng)需要提交新的task
給線程池的時(shí)候,由io_context
分配給某一個(gè)worker
執(zhí)行。
這聽起來和reactor模式好像沒啥區(qū)別,用epoll
寫reactor模式的時(shí)候基本上也是這么干的,這是因?yàn)槲冶緛砭褪菑膔eactor模式那邊搬過來的。不過相比于reactor模式,這么做還是有不少細(xì)節(jié)要處理的。在使用io_uring
時(shí),每次我們啟動(dòng)異步IO時(shí),都需要獲取到io_uring
對(duì)象——要使用io_uring_prep_<io_operation>
系列函數(shù),我們必須從io_uring
對(duì)象中獲取一個(gè)sqe
。而如上所述,io_uring
對(duì)象在worker
中,這就造成了一個(gè)麻煩:我們無法在io_context
給task
分配worker
的時(shí)候?qū)?code>io_uring對(duì)象的引用(指針)傳遞給task
。雖然使用全局變量不失為一種選擇,但我不想這么做,因?yàn)橐苍S使用者想要在一個(gè)進(jìn)程中創(chuàng)建幾個(gè)不同的io_context
用呢。
雖然無法直接將io_uring
的引用傳遞給task
,但還有一種方法可以進(jìn)行交互。在所有awaitable
的await_suspend
中,我們可以拿到當(dāng)前協(xié)程的coroutine_handle
,而在io_context
中,我們也能拿到task
的coroutine_handle
,因此可以通過promise
來傳遞io_uring
的引用。
具體在實(shí)現(xiàn)時(shí),我沒有傳遞io_uring
的引用,而是傳遞了worker
的指針。這么做是因?yàn)楫?dāng)初我想同時(shí)支持IOCP
,傳遞worker
可以省掉一些麻煩,雖然后來放棄了。worker
的指針只被放在協(xié)程棧的棧底,這么做是因?yàn)楫?dāng)協(xié)程在不同worker
之間轉(zhuǎn)移時(shí),能夠很方便地修改協(xié)程所屬的worker
(只需要修改棧底就可以了),盡管后來也沒有實(shí)現(xiàn)work-stealing隊(duì)列。在promise
中,處理worker
的方法如下所示:
/// \brief
/// Set I/O context for current coroutine.
/// \param[in] io_ctx
/// I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}
/// \brief
/// Get I/O context for current coroutine.
/// \return
/// I/O context for current coroutine.
[[nodiscard]] auto worker() const noexcept -> io_context_worker * {
return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}
不過這么做也有一個(gè)缺點(diǎn),就是io_context
侵入了promise
的設(shè)計(jì),使得task
必須在io_context
中才能發(fā)揮作用。
awaitable的設(shè)計(jì)
awaitable
在coco/io.hpp
中定義。各種awaitable
的設(shè)計(jì)就比較簡(jiǎn)單了。以read_awaitable
為例,它在await_suspend()
中獲取當(dāng)前協(xié)程所屬的worker
,然后啟用異步IO,如下所示:
template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
m_userdata.coroutine = coro.address();
return this->suspend(coro.promise().worker());
}
獲取了當(dāng)前協(xié)程的worker
后,就轉(zhuǎn)入this->suspend()
函數(shù)中去執(zhí)行了。suspend()
方法主要的工作是啟動(dòng)異步IO操作,并掛起當(dāng)前協(xié)程:
auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
assert(worker != nullptr);
m_userdata.cqe_res = 0;
m_userdata.cqe_flags = 0;
io_uring *ring = worker->io_ring();
io_uring_sqe *sqe = io_uring_get_sqe(ring);
while (sqe == nullptr) [[unlikely]] {
io_uring_submit(ring);
sqe = io_uring_get_sqe(ring);
}
io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);
int result = io_uring_submit(ring);
if (result < 0) [[unlikely]] { // Result is -errno.
m_userdata.cqe_res = result;
return false;
}
return true;
}
我沒有把啟用異步IO部分放到模板函數(shù)中,這是因?yàn)閷?duì)于各種不同的IO操作,這部分的代碼實(shí)際上大同小異。但考慮到這部分代碼的長度,放到模板中可能會(huì)導(dǎo)致比較嚴(yán)重的二進(jìn)制膨脹,所以就單獨(dú)拿出來放到.cpp
文件中了。
如果去翻我之前的commit記錄的話,會(huì)發(fā)現(xiàn)起初我并沒有把各種awaitable
暴露出來,而是讓各種異步操作(比如connection.receive()
)返回task
。后來將awaitable
暴露出來是考慮到諸如read
、write
等操作可能會(huì)被頻繁地調(diào)用,而每次創(chuàng)建一個(gè)協(xié)程都需要申請(qǐng)一次堆內(nèi)存,在循環(huán)中執(zhí)行的話可能對(duì)運(yùn)行效率有比較嚴(yán)重的影響。
關(guān)于代碼
再放一遍代碼地址:GitHub
這份代碼不長,總共兩三千行,而且其中一多半都是注釋,結(jié)合本文的話應(yīng)該不會(huì)很難讀。這本身只是一份實(shí)驗(yàn)性質(zhì)的代碼,同時(shí)我希望它適合拿來學(xué)習(xí),所以我并沒有打算塞入太多的功能。除此之外,我不是很建議你拿來放到工程中使用,因?yàn)槲铱赡軙?huì)一時(shí)興起做出一些breaking change。如果你真的有這個(gè)需要的話,我建議你fork一份代碼自己維護(hù)。
一些可能會(huì)被問到的問題
- 為什么沒有實(shí)現(xiàn)UDP相關(guān)的IO?
因?yàn)?code>io_uring似乎還沒有支持recvfrom
,至少我實(shí)現(xiàn)的時(shí)候還沒有。
- 為什么不使用
mmap
和內(nèi)核共享內(nèi)存/為什么不向io_uring
注冊(cè)文件描述符等性能相關(guān)的問題
因?yàn)槲也皇?code>io_uring專家。我寫這個(gè)庫的目的是學(xué)習(xí)用C++20的協(xié)程架構(gòu)一個(gè)異步IO庫,做這些性能優(yōu)化會(huì)加大架構(gòu)難度,并且花掉我大量的頭發(fā)和時(shí)間,使我本不茂密的頭發(fā)雪上加霜。除此之外,你會(huì)發(fā)現(xiàn)我也沒有實(shí)現(xiàn)work-stealing隊(duì)列,原因同理。Round Robin的性能雖然不至于最優(yōu),但也不會(huì)太差。
- 考不考慮加上HTTP支持?
考慮過,太懶所以放棄了。一方面是手寫HTTP parser還是挺麻煩的,就算能用bison自動(dòng)生成也還得去啃RFC。另一方面是,TCP作為一種基于流的協(xié)議,我沒有想好如何處理連接的緩存能夠兼顧性能和使用的便捷性。如果你有這方面的需求的話,不妨先用著其他的HTTP parser,比如llhttp。
- 為什么沒有實(shí)現(xiàn)
yield
?
我覺得異步IO一般用不到這東西,所以就沒寫。如果需要的話就自己實(shí)現(xiàn)吧。
- 會(huì)支持Windows(IOCP)嗎?
我有考慮過支持IOCP,但I(xiàn)OCP不支持定時(shí)器,我又不想刪除Linux這邊的timer
,所以暫且沒有這個(gè)想法。
- 作者你README寫得好水啊
確實(shí),我也覺得好水啊,有沒有好心人幫忙修一修啊。
- 為什么不用中文寫注釋和README?
不用中文寫注釋是因?yàn)閏lang-format沒法處理中文的斷行。不用中文寫README是因?yàn)閼校幌雽憙煞軷EADME。說不定哪天心情好了就寫一份中文版。文章來源:http://www.zghlxwxcb.cn/news/detail-843742.html
碎碎念
要不要塞張插圖呢?文章來源地址http://www.zghlxwxcb.cn/news/detail-843742.html
到了這里,關(guān)于使用C++20協(xié)程和io_uring優(yōu)雅地實(shí)現(xiàn)異步IO的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!