協(xié)程本身并不能實現(xiàn)異步操作,它們需要依賴于調(diào)度器(Scheduler)的組件來實現(xiàn)異步操作。調(diào)度器負責管理協(xié)程的執(zhí)行和調(diào)度。
調(diào)度器的抽象設(shè)計
為了實現(xiàn)協(xié)程的異步調(diào)度,我們需要提供調(diào)度器的一個抽象實現(xiàn),來支持不同的調(diào)度邏輯。
class AbstractExecutor {
public:
virtual void execute(std::function<void()> &&func) = 0;
};
調(diào)度的時機
在C++20協(xié)程中,co_await表達式用于暫停當前協(xié)程的執(zhí)行,并且可以進行異步操作。當異步操作完成后,協(xié)程將繼續(xù)執(zhí)行。這一過程的關(guān)鍵是所謂的Awaiter對象,它決定了何時以及如何恢復協(xié)程。
Awaiter必須實現(xiàn)以下三個成員函數(shù):
- await_ready():判斷協(xié)程是否可以立即繼續(xù)執(zhí)行。如果返回true,則協(xié)程繼續(xù)執(zhí)行,不會真正掛起。
- await_suspend(std::coroutine_handle<>):定義當協(xié)程掛起時執(zhí)行的操作。這是調(diào)度器發(fā)揮作用的時機。
- await_resume():定義當協(xié)程準備恢復執(zhí)行時的返回值或行為。
通過自定義Awaiter,我們可以在await_suspend方法中集成一個調(diào)度器。這個調(diào)度器不必關(guān)心協(xié)程的具體執(zhí)行內(nèi)容,而是專注于何時以及在哪個上下文中恢復協(xié)程。其他兩個函數(shù)都要求同步返回,是不能作為調(diào)度的時機。
實際上,按照 C++ 協(xié)程的設(shè)計,await_suspend 確實是用來提供調(diào)度支持的,由于這個時間點協(xié)程已經(jīng)完全掛起,因此我們可以在任意一個線程上調(diào)用 handle.resume(),甚至不用擔心線程安全的問題。
如果存在調(diào)度器,其實現(xiàn)的大致樣子如下所示:
// 調(diào)度器的類型有多種,因此專門提供一個模板參數(shù) Executor
template<typename Result, typename Executor>
struct TaskAwaiter {
// 構(gòu)造 TaskAwaiter 的時候傳入調(diào)度器的具體實現(xiàn)
explicit TaskAwaiter(AbstractExecutor *executor, Task<Result, Executor> &&task) noexcept
: _executor(executor), task(std::move(task)) {}
void await_suspend(std::coroutine_handle<> handle) noexcept {
task.finally([handle, this]() {
// 將 resume 函數(shù)的調(diào)用交給調(diào)度器執(zhí)行
_executor->execute([handle]() {
handle.resume();
});
});
}
//...省略其余實現(xiàn)
private:
Task<Result, Executor> task;
AbstractExecutor *_executor;
};
調(diào)度器的持有問題?(初次和掛起)
TaskAwaiter 當中的調(diào)度器實例是從外部傳來的,這樣設(shè)計的目的是希望把調(diào)度器的創(chuàng)建和綁定交給協(xié)程本身。換句話說,調(diào)度器應(yīng)該屬于協(xié)程。這樣設(shè)計的好處就是協(xié)程內(nèi)部的代碼均會被調(diào)度到它對應(yīng)的調(diào)度器上執(zhí)行,可以確保邏輯的一致性和正確性。
調(diào)度器應(yīng)該與 Task 或者 TaskPromise 綁定到一起。TaskPromise 作為協(xié)程的承諾類型(promise type),其中需要包含一個調(diào)度器實例。這個調(diào)度器決定了協(xié)程的運行方式,確保所有通過 co_await 掛起的任務(wù)在恢復時都能通過同一調(diào)度器執(zhí)行。這里使用模板參數(shù) Executor 來指定調(diào)度器的類型,使得每一個 Task 實例都綁定一個具體的 Executor 實例。
// 增加模板參數(shù) Executor
template<typename ResultType, typename Executor>
struct TaskPromise {
// 協(xié)程啟動時也需要在恢復時實現(xiàn)調(diào)度
DispatchAwaiter initial_suspend() { return DispatchAwaiter{&executor}; }
std::suspend_always final_suspend() noexcept { return {}; }
// Task 類型增加模板參數(shù) Executor 可以方便創(chuàng)建協(xié)程時執(zhí)行調(diào)度器的類型
Task<ResultType, Executor> get_return_object() {
return Task{std::coroutine_handle<TaskPromise>::from_promise(*this)};
}
// 注意模板參數(shù)
template<typename _ResultType, typename _Executor>
TaskAwaiter<_ResultType, _Executor> await_transform(Task<_ResultType, _Executor> &&task) {
return TaskAwaiter<_ResultType, _Executor>(&executor, std::move(task));
}
private:
Executor executor;
};
initial_suspend:在協(xié)程啟動時觸發(fā),使用 DispatchAwaiter 將協(xié)程的第一次執(zhí)行提交給綁定的調(diào)度器。這確保了協(xié)程即使在異步環(huán)境中也能保持執(zhí)行的一致性(與co_wait 使用同一個調(diào)度器)。
await_transform:提供了對協(xié)程內(nèi)部每一個 co_await 的處理。通過返回一個配置了當前調(diào)度器的 TaskAwaiter,保證了協(xié)程在每次掛起后都能通過相同的調(diào)度器恢復運行。
這里面的調(diào)度器都是由TaskPromise傳入進去的指針指針,其對象實例是在TaskPromise里。
對于DispatchAwaiter實現(xiàn)如下,其協(xié)程內(nèi)部的所有邏輯都可以順利地調(diào)度到協(xié)程對應(yīng)的調(diào)度器上。
struct DispatchAwaiter {
explicit DispatchAwaiter(AbstractExecutor *executor) noexcept
: _executor(executor) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) const {
// 調(diào)度到協(xié)程對應(yīng)的調(diào)度器上
_executor->execute([handle]() {
handle.resume();
});
}
void await_resume() {}
private:
AbstractExecutor *_executor;
};
調(diào)度器的實現(xiàn)
接下來給出幾種簡單的調(diào)度器實現(xiàn)作為示例,了解其思想
NoopExecutor
最簡單的調(diào)度器類型。它立即在當前線程中執(zhí)行給定的任務(wù),沒有任何異步處理或線程切換。這種調(diào)度器適用于需要保證代碼執(zhí)行在同一線程上,或者在測試和調(diào)試階段不希望引入額外的線程復雜性時使用。
class NoopExecutor : public AbstractExecutor {
public:
void execute(std::function<void()> &&func) override {
func();
}
};
對于Task 的改動不大,只是增加了模板參數(shù) Executor:
template<typename ResultType, typename Executor = NewThreadExecutor>
struct Task {
using promise_type = TaskPromise<ResultType, Executor>;
...
};
NewThreadExecutor
每次調(diào)用 execute 時都會創(chuàng)建一個新的線程來運行傳遞的任務(wù)。這種調(diào)度器適合于執(zhí)行時間較長的任務(wù),且任務(wù)之間幾乎沒有依賴,能夠并行處理。由于每個任務(wù)都在新線程上執(zhí)行(這個需要線程池來優(yōu)化)
class NewThreadExecutor : public AbstractExecutor {
public:
void execute(std::function<void()> &&func) override {
std::thread(func).detach();
}
};
AsyncExecutor
使用std::async機制來調(diào)度協(xié)程。std::launch::async指定了異步執(zhí)行。
class AsyncExecutor : public AbstractExecutor {
public:
void execute(std::function<void()> &&func) override {
auto future = std::async(std::launch::async, func);
}
};
LooperExecutor
是一個基于事件循環(huán)的單線程調(diào)度器,通常用于需要順序處理任務(wù)的場景。內(nèi)部使用一個線程來循環(huán)處理任務(wù)隊列中的任務(wù),通過同步機制(條件變量和互斥鎖)管理任務(wù)的添加和執(zhí)行。
class LooperExecutor : public AbstractExecutor {
private:
std::condition_variable queue_condition;
std::mutex queue_lock;
std::queue<std::function<void()>> executable_queue;
// true 的時候是工作狀態(tài),如果要關(guān)閉事件循環(huán),就置為 false
std::atomic<bool> is_active;
std::thread work_thread;
// 處理事件循環(huán)
void run_loop() {
// 檢查當前事件循環(huán)是否是工作狀態(tài),或者隊列沒有清空
while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {
std::unique_lock lock(queue_lock);
if (executable_queue.empty()) {
// 隊列為空,需要等待新任務(wù)加入隊列或者關(guān)閉事件循環(huán)的通知
queue_condition.wait(lock);
// 如果隊列為空,那么說明收到的是關(guān)閉的通知
if (executable_queue.empty()) {
// 現(xiàn)有邏輯下此處用 break 也可
// 使用 continue 可以再次檢查狀態(tài)和隊列,方便將來擴展
continue;
}
}
// 取出第一個任務(wù),解鎖再執(zhí)行。
// 解鎖非常:func 是外部邏輯,不需要鎖保護;func 當中可能請求鎖,導致死鎖
auto func = executable_queue.front();
executable_queue.pop();
lock.unlock();
func();
}
}
public:
LooperExecutor() {
is_active.store(true, std::memory_order_relaxed);
work_thread = std::thread(&LooperExecutor::run_loop, this);
}
~LooperExecutor() {
shutdown(false);
// 等待線程執(zhí)行完,防止出現(xiàn)意外情況
join();
}
void execute(std::function<void()> &&func) override {
std::unique_lock lock(queue_lock);
if (is_active.load(std::memory_order_relaxed)) {
executable_queue.push(func);
lock.unlock();
// 通知隊列,主要用于隊列之前為空時調(diào)用 wait 等待的情況
// 通知不需要加鎖,否則鎖會交給 wait 方導致當前線程阻塞
queue_condition.notify_one();
}
}
void shutdown(bool wait_for_complete = true) {
// 修改后立即生效,在 run_loop 當中就能盡早(加鎖前)就檢測到 is_active 的變化
is_active.store(false, std::memory_order_relaxed);
if (!wait_for_complete) {
std::unique_lock lock(queue_lock);
// 清空任務(wù)隊列
decltype(executable_queue) empty_queue;
std::swap(executable_queue, empty_queue);
lock.unlock();
}
// 通知 wait 函數(shù),避免 Looper 線程不退出
// 不需要加鎖,否則鎖會交給 wait 方導致當前線程阻塞
queue_condition.notify_all();
}
void join() {
if (work_thread.joinable()) {
work_thread.join();
}
}
};
- 當隊列為空時,Looper 的線程通過 wait 來實現(xiàn)阻塞等待。
- 有新任務(wù)加入時,通過 notify_one 來通知 run_loop 繼續(xù)執(zhí)行。
結(jié)果展示
測試結(jié)果代碼如下:
// 這意味著每個恢復的位置都會通過 std::async 上執(zhí)行
Task<int,AsyncExecutor> simple_task2() {
debug("task 2 start ...");
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
debug("task 2 returns after 1s.");
co_return 2;
}
// 這意味著每個恢復的位置都會新建一個線程來執(zhí)行
Task<int,NewThreadExecutor> simple_task3() {
debug("in task 3 start ...");
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
debug("task 3 returns after 2s.");
co_return 3;
}
//這意味著每個恢復的位置都會在同一個線程上執(zhí)行
Task<int, LooperExecutor> simple_task() {
debug("task start ...");
auto result2 = co_await simple_task2();
debug("returns from task2: ", result2);
auto result3 = co_await simple_task3();
debug("returns from task3: ", result3);
co_return 1 + result2 + result3;
}
int main() {
auto simpleTask = simple_task();
simpleTask.then([](int i) {
debug("simple task end: ", i);
}).catching([](std::exception& e) {
//debug("error occurred", e.what());
});
try {
auto i = simpleTask.get_result();
debug("simple task end from get: ", i);
}
catch (std::exception& e) {
//debug("error: ", e.what());
}
return 0;
}
結(jié)果如下:
37432 task start ...
39472 task 2 start ...
39472 task 2 returns after 1s.
37432 returns from task2: 2
33856 in task 3 start ...
33856 task 3 returns after 2s.
37432 returns from task3: 3
37432 simple task end: 6
4420 simple task end from get: 6
37432 run_loop exit.
解讀如下:
- 37432 task start,協(xié)程 simple_task 在 LooperExecutor 上開始執(zhí)行的標志。
- 39472 task 2 start,在 simple_task 中,第一個 co_await simple_task2() 被調(diào)用。因為 simple_task2 使用 AsyncExecutor,它啟動了在新線程上的異步操作。
- 39472 task 2 returns after 1s,simple_task2 完成執(zhí)行,同樣在 39472 線程上。表示 std::async 的操作已完成。
- 37432 returns from task2: 2,控制返回到 simple_task 中,接收 simple_task2 的結(jié)果(返回值 2)。此操作仍然在 LooperExecutor 線程 (37432) 上執(zhí)行,表明 LooperExecutor 成功地將任務(wù)調(diào)度回了其事件循環(huán)中。
- 33856 in task 3 start,simple_task 中的第二個 co_await,即 simple_task3() 被調(diào)用。因為 simple_task3 使用 NewThreadExecutor,它在新的線程 (33856) 上開始執(zhí)行。
- 33856 task 3 returns after 2s,在新線程 (33856) 上,simple_task3 完成了執(zhí)行,該線程由 NewThreadExecutor 創(chuàng)建,與 simple_task2 的執(zhí)行線程 (39472) 不同。
- 37432 returns from task3: 3,控制權(quán)再次回到 simple_task 中,處理從 simple_task3 返回的結(jié)果(返回值 3),依舊在 LooperExecutor 的線程 (37432) 上執(zhí)行。
- 37432 simple task end: 6,simple_task 計算最終結(jié)果并結(jié)束,所有操作均在 LooperExecutor 的線程 (37432) 上順利完成。
- 4420 simple task end from get: 6,最終結(jié)果 (6) 被從 simple_task.get_result() 方法中檢索,在主線程上執(zhí)行。
- 37432 run_loop exit,LooperExecutor 的事件循環(huán)線程 (37432) 結(jié)束,可能是因為程序的整體結(jié)束或 LooperExecutor 的 shutdown() 方法被調(diào)用。
完整代碼
存在兩個等待體,都有調(diào)度器,DispatchAwaiter和TaskAwaiter。
協(xié)程的啟動時(通過TaskPromise的 initial_suspend):DispatchAwaiter 確保協(xié)程的第一步執(zhí)行可以被適當調(diào)度。這其中設(shè)置協(xié)程的執(zhí)行環(huán)境,確保協(xié)程在特定的線程或線程池中啟動。文章來源:http://www.zghlxwxcb.cn/news/detail-860372.html
當協(xié)程在 co_await 表達式處掛起時(通過await_transform):TaskAwaiter 會通過傳遞給它的調(diào)度器調(diào)度協(xié)程的恢復。這是通過調(diào)用調(diào)度器的 execute 方法來安排 std::coroutine_handle 的 resume 方法執(zhí)行的。文章來源地址http://www.zghlxwxcb.cn/news/detail-860372.html
#define __cpp_lib_coroutine
#include <coroutine>
#include <exception>
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
#include <list>
#include <optional>
#include <cassert>
#include <queue>
#include <future>
using namespace std;
void debug(const std::string& s) {
printf("%d %s\n", std::this_thread::get_id(), s.c_str());
}
void debug(const std::string& s, int x) {
printf("%d %s %d\n", std::this_thread::get_id(), s.c_str(), x);
}
// 調(diào)度器
class AbstractExecutor {
public:
virtual void execute(std::function<void()>&& func) = 0;
};
class NoopExecutor : public AbstractExecutor {
public:
void execute(std::function<void()>&& func) override {
func();
}
};
class NewThreadExecutor : public AbstractExecutor {
public:
void execute(std::function<void()>&& func) override {
std::thread(func).detach();
}
};
class AsyncExecutor : public AbstractExecutor {
public:
void execute(std::function<void()>&& func) override {
auto future = std::async(func);
}
};
class LooperExecutor : public AbstractExecutor {
private:
std::condition_variable queue_condition;
std::mutex queue_lock;
std::queue<std::function<void()>> executable_queue;
std::atomic<bool> is_active;
std::thread work_thread;
void run_loop() {
while (is_active.load(std::memory_order_relaxed) || !executable_queue.empty()) {
std::unique_lock lock(queue_lock);
if (executable_queue.empty()) {
queue_condition.wait(lock);
if (executable_queue.empty()) {
continue;
}
}
auto func = executable_queue.front();
executable_queue.pop();
lock.unlock();
func();
}
debug("run_loop exit.");
}
public:
LooperExecutor() {
is_active.store(true, std::memory_order_relaxed);
work_thread = std::thread(&LooperExecutor::run_loop, this);
}
~LooperExecutor() {
shutdown(false);
if (work_thread.joinable()) {
work_thread.join();
}
}
void execute(std::function<void()>&& func) override {
std::unique_lock lock(queue_lock);
if (is_active.load(std::memory_order_relaxed)) {
executable_queue.push(func);
lock.unlock();
queue_condition.notify_one();
}
}
void shutdown(bool wait_for_complete = true) {
is_active.store(false, std::memory_order_relaxed);
if (!wait_for_complete) {
// clear queue.
std::unique_lock lock(queue_lock);
decltype(executable_queue) empty_queue;
std::swap(executable_queue, empty_queue);
lock.unlock();
}
queue_condition.notify_all();
}
};
template<typename T>
struct Result
{
explicit Result() = default;
explicit Result(T&& value) : _value(value) {}
explicit Result(std::exception_ptr&& exception_ptr) : _exception_ptr(exception_ptr) {}
T get_or_throw() {
if (_exception_ptr) {
std::rethrow_exception(_exception_ptr);
}
return _value;
}
private:
T _value;
std::exception_ptr _exception_ptr;
};
// 用于協(xié)程initial_suspend()時直接將運行邏輯切入調(diào)度器的等待體
struct DispatchAwaiter {
explicit DispatchAwaiter(AbstractExecutor* executor) noexcept
: _executor(executor) {}
bool await_ready() const { return false; }
void await_suspend(std::coroutine_handle<> handle) const {
_executor->execute([handle]() {
handle.resume();
});
}
void await_resume() {}
private:
AbstractExecutor* _executor;
};
// 前向聲明
template<typename ResultType, typename Executor>
struct Task;
template<typename Result, typename Executor>
struct TaskAwaiter {
explicit TaskAwaiter(AbstractExecutor* executor, Task<Result, Executor>&& task) noexcept
: _executor(executor), task(std::move(task)) {}
TaskAwaiter(TaskAwaiter&& completion) noexcept
: _executor(completion._executor), task(std::exchange(completion.task, {})) {}
TaskAwaiter(TaskAwaiter&) = delete;
TaskAwaiter& operator=(TaskAwaiter&) = delete;
constexpr bool await_ready() const noexcept {
return false;
}
// 在這里增加了調(diào)度器的運行
void await_suspend(std::coroutine_handle<> handle) noexcept {
task.finally([handle, this]() {
_executor->execute([handle]() {
handle.resume();
});
});
}
Result await_resume() noexcept {
return task.get_result();
}
private:
Task<Result, Executor> task;
AbstractExecutor* _executor;
};
// 對應(yīng)修改增加調(diào)度器的傳入
template<typename ResultType,typename Executor>
struct TaskPromise {
//此時調(diào)度器將開始調(diào)度,執(zhí)行的邏輯
DispatchAwaiter initial_suspend() { return DispatchAwaiter(&executor); }
std::suspend_always final_suspend() noexcept { return {}; }
Task<ResultType, Executor> get_return_object() {
return Task{ std::coroutine_handle<TaskPromise>::from_promise(*this) };
}
//在這里返回等待器對象時需要將調(diào)度器的指針帶上
template<typename _ResultType, typename _Executor>
TaskAwaiter<_ResultType, _Executor> await_transform(Task<_ResultType, _Executor>&& task) {
return TaskAwaiter<_ResultType, _Executor>(&executor, std::move(task));
}
void unhandled_exception() {
std::lock_guard lock(completion_lock);
result = Result<ResultType>(std::current_exception());
completion.notify_all();
notify_callbacks();
}
void return_value(ResultType value) {
std::lock_guard lock(completion_lock);
result = Result<ResultType>(std::move(value));
completion.notify_all();
notify_callbacks();
}
ResultType get_result() {
std::unique_lock lock(completion_lock);
if (!result.has_value()) {
completion.wait(lock);
}
return result->get_or_throw();
}
void on_completed(std::function<void(Result<ResultType>)>&& func) {
std::unique_lock lock(completion_lock);
if (result.has_value()) {
auto value = result.value();
lock.unlock();
func(value);
}
else {
completion_callbacks.push_back(func);
}
}
private:
std::optional<Result<ResultType>> result;
Executor executor;
std::mutex completion_lock;
std::condition_variable completion;
std::list<std::function<void(Result<ResultType>)>> completion_callbacks;
void notify_callbacks() {
auto value = result.value();
for (auto& callback : completion_callbacks) {
callback(value);
}
completion_callbacks.clear();
}
};
template<typename ResultType,typename Executor = NewThreadExecutor>
struct Task {
using promise_type = TaskPromise<ResultType, Executor>;
ResultType get_result() {
return handle.promise().get_result();
}
Task& then(std::function<void(ResultType)>&& func) {
handle.promise().on_completed([func](auto result) {
try {
func(result.get_or_throw());
}
catch (std::exception& e) {
// ignore.
}
});
return *this;
}
Task& catching(std::function<void(std::exception&)>&& func) {
handle.promise().on_completed([func](auto result) {
try {
result.get_or_throw();
}
catch (std::exception& e) {
func(e);
}
});
return *this;
}
Task& finally(std::function<void()>&& func) {
handle.promise().on_completed([func](auto result) { func(); });
return *this;
}
explicit Task(std::coroutine_handle<promise_type> handle) noexcept : handle(handle) {}
Task(Task&& task) noexcept : handle(std::exchange(task.handle, {})) {}
Task(Task&) = delete;
Task& operator=(Task&) = delete;
~Task() {
if (handle) handle.destroy();
}
private:
std::coroutine_handle<promise_type> handle;
};
// 這意味著每個恢復的位置都會通過 std::async 上執(zhí)行
Task<int,AsyncExecutor> simple_task2() {
debug("task 2 start ...");
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);
debug("task 2 returns after 1s.");
co_return 2;
}
// 這意味著每個恢復的位置都會新建一個線程來執(zhí)行
Task<int,NewThreadExecutor> simple_task3() {
debug("in task 3 start ...");
using namespace std::chrono_literals;
std::this_thread::sleep_for(2s);
debug("task 3 returns after 2s.");
co_return 3;
}
//這意味著每個恢復的位置都會在同一個線程上執(zhí)行
Task<int, LooperExecutor> simple_task() {
debug("task start ...");
auto result2 = co_await simple_task2();
debug("returns from task2: ", result2);
auto result3 = co_await simple_task3();
debug("returns from task3: ", result3);
co_return 1 + result2 + result3;
}
int main() {
auto simpleTask = simple_task();
simpleTask.then([](int i) {
debug("simple task end: ", i);
}).catching([](std::exception& e) {
//debug("error occurred", e.what());
});
try {
auto i = simpleTask.get_result();
debug("simple task end from get: ", i);
}
catch (std::exception& e) {
//debug("error: ", e.what());
}
return 0;
}
到了這里,關(guān)于探究C++20協(xié)程(4)——協(xié)程中的調(diào)度器的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!