1. std::execution/coroutine结合
核心概念
在深入代码之前,我们先了解几个关键概念:
- 执行上下文 (Execution Context): 代表了可以执行工作的“地方”,例如一个线程池、一个 I/O 事件循环或一个 GPU 流。
- 调度器 (Scheduler): 一个轻量级的句柄,代表了一个执行上下文。它的主要职责是创建一个“调度发送者”(Schedule Sender)。
- 发送者 (Sender): 一个描述异步操作的类型。它知道如何启动一个操作,但本身并不执行任何工作。它像一个“蓝图”。
- 接收者 (Receiver): 一个“回调”的泛化,定义了当异步操作完成时(成功、失败或取消)应该做什么。
- 操作状态 (Operation State): 通过
std::execution::connect
将一个发送者和一个接收者连接起来所创建的对象。这个对象封装了执行操作所需的所有状态,并通过调用 std::execution::start
来启动。
- 协程 (
co_await
): 在我们的设计中,协程将作为连接发送者和接收者的桥梁,管理工作的提交和执行流程。
使用协程实现的调度器
我们的目标是创建一个在后台使用一个或多个工作线程的线程池调度器。当 schedule()
被调用时,它返回的发送者会将工作单元(一个函数)提交给这个线程池,并利用协程来等待其执行完成。
1. 简单的线程池执行上下文
首先,我们定义一个简单的线程池作为我们的执行上下文。它包含一个线程队列和一个用于分派任务的 std::vector<std::jthread>
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #include <iostream> #include <vector> #include <thread> #include <functional> #include <queue> #include <mutex> #include <condition_variable> #include <concepts>
class simple_thread_pool { public: explicit simple_thread_pool(std::size_t thread_count = std::thread::hardware_concurrency()) { for (std::size_t i = 0; i < thread_count; ++i) { threads_.emplace_back([this] { worker_thread(); }); } }
~simple_thread_pool() { { std::unique_lock lock(mutex_); stop_ = true; } cv_.notify_all(); }
void submit(std::function<void()> work) { { std::unique_lock lock(mutex_); work_queue_.push(std::move(work)); } cv_.notify_one(); }
private: void worker_thread() { while (true) { std::function<void()> work; { std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return stop_ || !work_queue_.empty(); }); if (stop_ && work_queue_.empty()) { return; } work = std::move(work_queue_.front()); work_queue_.pop(); } work(); } }
std::vector<std::jthread> threads_; std::queue<std::function<void()>> work_queue_; std::mutex mutex_; std::condition_variable cv_; bool stop_{false}; };
|
2. 协程任务类型 (task
)
我们将定义一个简单的协程任务类型。当这个任务被 co_await
时,它会将等待它的协程句柄提交给线程池。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <coroutine>
struct coroutine_scheduler;
struct task { struct promise_type; using handle_type = std::coroutine_handle<promise_type>;
struct promise_type { task get_return_object() { return {handle_type::from_promise(*this)}; } std::suspend_always initial_suspend() noexcept { return {}; } std::suspend_always final_suspend() noexcept { return {}; } void return_void() {} void unhandled_exception() { std::terminate(); } };
handle_type handle; };
struct schedule_awaiter { coroutine_scheduler& scheduler_;
bool await_ready() noexcept { return false; } void await_suspend(std::coroutine_handle<> h); void await_resume() noexcept {} };
|
3. 协程调度器 (coroutine_scheduler
)
这是我们设计的核心。它持有对线程池的引用,并提供 schedule()
方法来创建一个发送者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #include <execution>
struct coroutine_scheduler { simple_thread_pool& pool_;
auto schedule() const noexcept;
void submit(std::coroutine_handle<> h) { pool_.submit([h]() mutable { h.resume(); }); }
schedule_awaiter operator co_await() { return {*this}; }
bool operator==(const coroutine_scheduler&) const = default; };
void schedule_awaiter::await_suspend(std::coroutine_handle<> h) { scheduler_.submit(h); }
|
4. 调度发送者 (schedule_sender
)
当调用 coroutine_scheduler::schedule()
时,会返回此类型的对象。它持有调度器的引用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| template <typename Receiver> struct coroutine_operation_state { };
struct schedule_sender { using is_sender = void; coroutine_scheduler scheduler_;
template <template <typename...> class Tuple, template <typename...> class Variant> using value_types = Variant<Tuple<>>;
template <template <typename...> class Variant> using error_types = Variant<std::exception_ptr>;
using sends_done = std::true_type;
template <std::execution::receiver Receiver> auto connect(Receiver&& r) && { return coroutine_operation_state<std::decay_t<Receiver>>{ scheduler_, std::forward<Receiver>(r) }; } };
auto coroutine_scheduler::schedule() const noexcept { return schedule_sender{*this}; }
|
5. 协程驱动的操作状态 (coroutine_operation_state
)
这是最精妙的部分。connect
的结果是一个操作状态对象。当 start()
被调用时,它会启动一个协程。这个协程 co_await
我们的调度器,从而将后续的执行(即调用接收者的 set_value
)提交到线程池中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| template <std::execution::receiver Receiver> struct coroutine_operation_state { coroutine_scheduler scheduler_; Receiver receiver_;
task run() { try { co_await scheduler_;
std::execution::set_value(std::move(receiver_)); } catch (...) { std::execution::set_error(std::move(receiver_), std::current_exception()); } }
void start() & noexcept { task t = run(); t.handle.resume(); } };
|
完整示例与使用
现在,我们将所有部分组合在一起,并展示如何使用这个由协程驱动的调度器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| int main() { std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;
simple_thread_pool pool(4);
coroutine_scheduler scheduler{pool};
auto work_sender = scheduler.schedule();
auto my_receiver = std::execution::then( work_sender, [] { std::cout << "Work executed on thread ID: " << std::this_thread::get_id() << std::endl; } );
std::this_thread::sync_wait(std::move(my_receiver));
return 0; }
|
编译与运行(假设的 C++26 环境):
如果在一个支持 std::execution
的未来 C++26 编译器中编译此代码,预期的输出将是:
1 2
| Main thread ID: <ID_of_main_thread> Work executed on thread ID: <ID_of_a_pool_thread>
|
这清楚地表明,通过我们的协程调度器,工作被成功地从主线程转移到了线程池中的一个工作线程上执行。
结论
这个例子展示了协程如何能被用作实现 std::execution
调度器内部机制的强大工具。通过在操作状态中启动一个协程并 co_await
调度器本身,我们能够以一种声明式且高度可读的方式,将执行流无缝地转移到目标执行上下文。
这种设计不仅优雅,而且充分利用了 C++20 协程的优势,将底层的回调和句柄管理封装在 task
和 awaiter
的实现细节中。随着 C++26 的到来,我们有理由相信,这种协程与 std::execution
的深度集成将成为构建高性能、可扩展的异步系统的标准模式。
注解
2. 合约
3. 静态反射
rust如何打印枚举类型的名称?通过debug萃取,编译器插桩
参考
std::execution::scheduler - cppreference.cn - C++参考手册
执行控制库 (自 C++26 起) - cppreference.cn - C++参考手册
C++23:std::execution/unifex导读-CSDN博客
Execution control library (since C++26) - cppreference.com
协程 (C++20) - cppreference.cn - C++参考手册
(25 封私信 / 80 条消息) c++ execution 与 coroutine (二) : execution概述 - 知乎
(25 封私信 / 80 条消息) ★C++20协程与stdexec(C++26 std::execution)学习笔记 - 知乎
(25 封私信 / 80 条消息) async_simple 源码分析(上) - 知乎
(25 封私信 / 80 条消息) 漫谈C++类型擦除(Type Erasure) - 知乎
(25 封私信 / 80 条消息) 浅谈The C++ Executors - 知乎
C++异步编程详解:future、promise与async | 现代C++并发编程 | C++ 编程指南
(25 封私信 / 80 条消息) C++26启航:Safe C++的破晓时刻 - 知乎
C++26 静态反射提案解析 - 知乎