C++特性

Ethereal Lv5

1. std::execution/coroutine结合

核心概念

在深入代码之前,我们先了解几个关键概念:

  1. 执行上下文 (Execution Context): 代表了可以执行工作的“地方”,例如一个线程池、一个 I/O 事件循环或一个 GPU 流。
  2. 调度器 (Scheduler): 一个轻量级的句柄,代表了一个执行上下文。它的主要职责是创建一个“调度发送者”(Schedule Sender)。
  3. 发送者 (Sender): 一个描述异步操作的类型。它知道如何启动一个操作,但本身并不执行任何工作。它像一个“蓝图”。
  4. 接收者 (Receiver): 一个“回调”的泛化,定义了当异步操作完成时(成功、失败或取消)应该做什么。
  5. 操作状态 (Operation State): 通过 std::execution::connect 将一个发送者和一个接收者连接起来所创建的对象。这个对象封装了执行操作所需的所有状态,并通过调用 std::execution::start 来启动。
  6. 协程 (co_await): 在我们的设计中,协程将作为连接发送者和接收者的桥梁,管理工作的提交和执行流程。

使用协程实现的调度器

我们的目标是创建一个在后台使用一个或多个工作线程的线程池调度器。当 schedule() 被调用时,它返回的发送者会将工作单元(一个函数)提交给这个线程池,并利用协程来等待其执行完成。

1. 简单的线程池执行上下文

首先,我们定义一个简单的线程池作为我们的执行上下文。它包含一个线程队列和一个用于分派任务的 std::vector<std::jthread>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <iostream>
#include <vector>
#include <thread>
#include <functional>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <concepts>

class simple_thread_pool {
public:
explicit simple_thread_pool(std::size_t thread_count = std::thread::hardware_concurrency()) {
for (std::size_t i = 0; i < thread_count; ++i) {
threads_.emplace_back([this] { worker_thread(); });
}
}

~simple_thread_pool() {
{
std::unique_lock lock(mutex_);
stop_ = true;
}
cv_.notify_all();
}

void submit(std::function<void()> work) {
{
std::unique_lock lock(mutex_);
work_queue_.push(std::move(work));
}
cv_.notify_one();
}

private:
void worker_thread() {
while (true) {
std::function<void()> work;
{
std::unique_lock lock(mutex_);
cv_.wait(lock, [this] { return stop_ || !work_queue_.empty(); });
if (stop_ && work_queue_.empty()) {
return;
}
work = std::move(work_queue_.front());
work_queue_.pop();
}
work();
}
}

std::vector<std::jthread> threads_;
std::queue<std::function<void()>> work_queue_;
std::mutex mutex_;
std::condition_variable cv_;
bool stop_{false};
};

2. 协程任务类型 (task)

我们将定义一个简单的协程任务类型。当这个任务被 co_await 时,它会将等待它的协程句柄提交给线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <coroutine>

// 前向声明
struct coroutine_scheduler;

struct task {
struct promise_type;
using handle_type = std::coroutine_handle<promise_type>;

struct promise_type {
task get_return_object() { return {handle_type::from_promise(*this)}; }
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
};

handle_type handle;
};

struct schedule_awaiter {
coroutine_scheduler& scheduler_;

bool await_ready() noexcept { return false; }
void await_suspend(std::coroutine_handle<> h); // 实现将移至 scheduler 定义之后
void await_resume() noexcept {}
};

3. 协程调度器 (coroutine_scheduler)

这是我们设计的核心。它持有对线程池的引用,并提供 schedule() 方法来创建一个发送者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <execution> // 假设的 C++26 头文件

// 调度器需要满足 std::execution::scheduler 概念
struct coroutine_scheduler {
simple_thread_pool& pool_;

// schedule() 返回一个发送者
auto schedule() const noexcept;

// 提交协程句柄
void submit(std::coroutine_handle<> h) {
pool_.submit([h]() mutable { h.resume(); });
}

// 重载以支持我们的awaiter
schedule_awaiter operator co_await() { return {*this}; }

// 为了满足概念,需要定义相等比较
bool operator==(const coroutine_scheduler&) const = default;
};

// await_suspend 的实现
void schedule_awaiter::await_suspend(std::coroutine_handle<> h) {
scheduler_.submit(h);
}

4. 调度发送者 (schedule_sender)

当调用 coroutine_scheduler::schedule() 时,会返回此类型的对象。它持有调度器的引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template <typename Receiver>
struct coroutine_operation_state {
// ... 实现将在下一步 ...
};

struct schedule_sender {
using is_sender = void; // 标记这是一个发送者
coroutine_scheduler scheduler_;

// 定义此发送者可以发送的值、错误和停止信号的类型
template <template <typename...> class Tuple, template <typename...> class Variant>
using value_types = Variant<Tuple<>>;

template <template <typename...> class Variant>
using error_types = Variant<std::exception_ptr>;

using sends_done = std::true_type;

// `connect` 方法将发送者和接收者连接起来,创建操作状态
template <std::execution::receiver Receiver>
auto connect(Receiver&& r) && {
return coroutine_operation_state<std::decay_t<Receiver>>{
scheduler_, std::forward<Receiver>(r)
};
}
};

// coroutine_scheduler::schedule() 的实现
auto coroutine_scheduler::schedule() const noexcept {
return schedule_sender{*this};
}

5. 协程驱动的操作状态 (coroutine_operation_state)

这是最精妙的部分。connect 的结果是一个操作状态对象。当 start() 被调用时,它会启动一个协程。这个协程 co_await 我们的调度器,从而将后续的执行(即调用接收者的 set_value)提交到线程池中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <std::execution::receiver Receiver>
struct coroutine_operation_state {
coroutine_scheduler scheduler_;
Receiver receiver_;

// 启动操作的协程
task run() {
try {
// 关键点:等待调度器,这将使协程的剩余部分
// 在线程池的某个线程上恢复执行。
co_await scheduler_;

// 在线程池的线程上,调用接收者的 set_value
std::execution::set_value(std::move(receiver_));
} catch (...) {
std::execution::set_error(std::move(receiver_), std::current_exception());
}
}

// `start()` 启动协程
void start() & noexcept {
// 创建协程,但它会立即在 initial_suspend 处暂停
task t = run();
// 恢复协程,执行到 co_await scheduler_
t.handle.resume();
}
};

完整示例与使用

现在,我们将所有部分组合在一起,并展示如何使用这个由协程驱动的调度器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int main() {
std::cout << "Main thread ID: " << std::this_thread::get_id() << std::endl;

// 1. 创建执行上下文
simple_thread_pool pool(4);

// 2. 创建调度器
coroutine_scheduler scheduler{pool};

// 3. 从调度器获取一个发送者
auto work_sender = scheduler.schedule();

// 4. 定义一个接收者
auto my_receiver = std::execution::then(
work_sender,
[] {
std::cout << "Work executed on thread ID: " << std::this_thread::get_id() << std::endl;
}
);

// 5. 使用 std::this_thread::sync_wait 等待操作完成
// sync_wait 会连接(connect)并启动(start)发送者,并阻塞直到操作完成
std::this_thread::sync_wait(std::move(my_receiver));

return 0;
}

编译与运行(假设的 C++26 环境):

如果在一个支持 std::execution 的未来 C++26 编译器中编译此代码,预期的输出将是:

1
2
Main thread ID: <ID_of_main_thread>
Work executed on thread ID: <ID_of_a_pool_thread>

这清楚地表明,通过我们的协程调度器,工作被成功地从主线程转移到了线程池中的一个工作线程上执行。

结论

这个例子展示了协程如何能被用作实现 std::execution 调度器内部机制的强大工具。通过在操作状态中启动一个协程并 co_await 调度器本身,我们能够以一种声明式且高度可读的方式,将执行流无缝地转移到目标执行上下文。

这种设计不仅优雅,而且充分利用了 C++20 协程的优势,将底层的回调和句柄管理封装在 taskawaiter 的实现细节中。随着 C++26 的到来,我们有理由相信,这种协程与 std::execution 的深度集成将成为构建高性能、可扩展的异步系统的标准模式。

注解

  • scheduler包sender,sender包operation_state,opration_state通过协程切换上下文到线程池中。

  • 协程池不好实现,因为无栈协程,和主线程共享同一份栈,不支持子调用中co_await

2. 合约

3. 静态反射

rust如何打印枚举类型的名称?通过debug萃取,编译器插桩

参考

std::execution::scheduler - cppreference.cn - C++参考手册

执行控制库 (自 C++26 起) - cppreference.cn - C++参考手册

C++23:std::execution/unifex导读-CSDN博客

Execution control library (since C++26) - cppreference.com

协程 (C++20) - cppreference.cn - C++参考手册

(25 封私信 / 80 条消息) c++ execution 与 coroutine (二) : execution概述 - 知乎

(25 封私信 / 80 条消息) ★C++20协程与stdexec(C++26 std::execution)学习笔记 - 知乎

(25 封私信 / 80 条消息) async_simple 源码分析(上) - 知乎

(25 封私信 / 80 条消息) 漫谈C++类型擦除(Type Erasure) - 知乎

(25 封私信 / 80 条消息) 浅谈The C++ Executors - 知乎

C++异步编程详解:future、promise与async | 现代C++并发编程 | C++ 编程指南

(25 封私信 / 80 条消息) C++26启航:Safe C++的破晓时刻 - 知乎

C++26 静态反射提案解析 - 知乎

  • Title: C++特性
  • Author: Ethereal
  • Created at: 2025-08-15 01:35:50
  • Updated at: 2025-08-15 01:56:28
  • Link: https://ethereal-o.github.io/2025/08/15/C-特性/
  • License: This work is licensed under CC BY-NC-SA 4.0.
 Comments