Koroutine — 实现级架构说明
本文件面向项目维护者与贡献者,提供与当前源码严格对齐的实现级架构说明:目录映射、核心类型与接口、任务生命周期时序、调度/执行路径、以及常见扩展点与注意事项。
仓库(精简视图):
.
├── CMakeLists.txt
├── cmakes/
├── demos/
├── docs/
├── include/
│ └── koroutine/
│ ├── async_io/
│ ├── awaiters/
│ ├── executors/
│ ├── schedulers/
│ ├── sync/
│ ├── task.hpp
│ ├── task_promise.hpp
│ ├── runtime.hpp
│ ├── scheduler_manager.h
│ └── when_all.hpp / when_any.hpp
├── scripts/
└── src/
└── scheduler_manager.cpp
目标
- 帮助贡献者快速定位实现位置(文件 ↔ 概念);
- 明确任务从创建到恢复的关键调用链与责任划分;
- 指出可插拔拓展点(Executor、Scheduler、Awaiter、IO 引擎);
- 提供实现/ABI 稳定性的注意事项与未来兼容建议。
核心概念与实现位置
- Task
/ Task —include/koroutine/task.hpp -
持有
std::coroutine_handle<promise_type>。公开start()、operator co_await/await_transform(在 promise 中定制)。 -
TaskPromise —
include/koroutine/task_promise.hpp -
管理
initial_suspend/final_suspend、结果/异常存储、以及在await_transform中生成自定义 awaiter。负责将继续逻辑(continuation)交给调度器或执行器。 -
ScheduleRequest —
include/koroutine/schedulers/schedule_request.hpp -
轻量包装:包含
std::coroutine_handle<> handle_与ScheduleMetadata(如优先级、tag 等)。提供resume()和显式有效性检测。 -
AbstractScheduler —
include/koroutine/schedulers/scheduler.h -
抽象接口:
schedule(ScheduleRequest, uint64_t delay_ms = 0)。决定何时/如何调用ScheduleRequest::resume()。 -
SimpleScheduler —
include/koroutine/schedulers/SimpleScheduler.h -
默认实现。内部持有一个
LooperExecutor(事件循环执行器),对ScheduleRequest做立即或延迟的交付:delay_ms==0->_executor->execute([req=move(request)](){ req.resume(); })delay_ms>0->_executor->execute_delayed(..., delay_ms)
-
Executors(执行器) —
include/koroutine/executors/*.h -
职责:“怎样执行一个函数”。常见:
LooperExecutor(事件循环)、NewThreadExecutor(新线程)、AsyncExecutor(基于 std::async)。接口最小:execute(fn),可选:execute_delayed(fn, ms)。 -
SchedulerManager —
include/koroutine/scheduler_manager.h/src/scheduler_manager.cpp -
存储并返回全局默认调度器(
std::shared_ptr<AbstractScheduler>)。允许通过set_default_scheduler()覆盖以实现全局策略切换。 -
Awaiters —
include/koroutine/awaiters/ -
各种 awaiter 实现(task awaiter / I/O awaiter / channel awaiter / switch_executor awaiter 等)。部分 awaiter 会直接使用
Executor来 resume(绕过 Scheduler),以降低延迟。 -
Runtime(同步入口) —
include/koroutine/runtime.hpp -
提供
block_on(Task)、join_all等,用于在同步环境(如main()) 启动并等待异步任务完成。block_on使用条件变量将异步结果回传到调用线程。 -
Async I/O 抽象 —
include/koroutine/async_io/* -
工厂函数根据平台选择实现(io_uring / kqueue / IOCP),并向上层提供 awaitable I/O 操作。
-
Channel / 同步原语 —
include/koroutine/channel.hpp,include/koroutine/sync/* - 提供协程间通信、异步互斥与条件变量,小心处理关闭(close)以及待唤醒集合的遍历与异常传播。
任务生命周期与关键调用链(实现级)
下面描述任务从创建到最终销毁时的典型路径,严格对应当前实现:
1) 协程函数被调用,编译器生成 coroutine frame,返回 Task:
- Task 包含 coroutine_handle<promise_type>,promise 在构造阶段初始化其内部状态(结果容器、异常存储、可能的 scheduler 引用)。
2) 调用 Task::start()(或 runtime wrapper 自动调用):
- start() 标记任务为已启动;从 promise 中获取已设置的 scheduler,如果未设置则调用 SchedulerManager::get_default_scheduler();
- 构造 ScheduleRequest(包装 handle 与 metadata),调用 scheduler->schedule(request, 0)。
3) AbstractScheduler::schedule()(默认 SimpleScheduler)接收请求:
- SimpleScheduler 将请求交给其内部 LooperExecutor:
- 无延迟 -> _executor->execute(lambda{ req.resume(); })
- 有延迟 -> _executor->execute_delayed(lambda{ req.resume(); }, delay_ms)
4) LooperExecutor 将 lambda 放入工作队列,由工作线程或事件循环执行:
- 工作线程取出 lambda 并执行 -> 调用 ScheduleRequest::resume() -> coroutine_handle.resume() -> 协程在该线程/上下文中继续执行。
5) 某些 awaiter 直接与 Executor 协作:
- 例如 switch_to(executor) 的 awaiter:在 await_suspend(handle) 中调用 executor->execute([handle]{ handle.resume(); }),绕过 Scheduler,提供更直接的上下文切换路径。
6) 协程完成与清理:
- final_suspend() 根据实现返回 suspend_always 或其它策略;资源(frame)通常由 Task 的析构或运行时显式 handle.destroy() 回收。
时序简图(简化):
sequenceDiagram
participant U as User
participant T as Task/Promise
participant SM as SchedulerManager
participant S as SimpleScheduler
participant E as LooperExecutor
participant W as Worker
U->>T: create Task (coroutine frame)
U->>T: task.start()
T->>SM: get_default_scheduler()
T->>S: S.schedule(ScheduleRequest(handle))
S->>E: E.execute([req.resume()])
E->>W: Worker executes lambda
W->>T: req.resume() -> handle.resume()
T->>T: coroutine body resumes
扩展点与实现建议
-
添加自定义 Executor:实现
execute(fn)/execute_delayed(fn, ms)即可被SimpleScheduler或 awaiter 直接利用。注意线程安全与生命周期管理。 -
新的 Scheduler:继承
AbstractScheduler,实现更复杂的调度策略(优先级、延时队列、工作窃取等),并通过SchedulerManager::set_default_scheduler()切换。 -
Awaiter:若需低延迟恢复,可让 awaiter 直接持有
Executor并在await_suspend中调用executor->execute来 resume,而不是通过Scheduler。 -
Async IO 后端:在
include/koroutine/async_io/增加平台实现,务必保证 awaitable 的await_suspend将完成回调通过适当的 Executor/ Scheduler resume 对应 coroutine handle。
与 ABI/未来兼容相关的注意事项
- 为了未来兼容(比如 C++26 的
std::execution或其他标准化调度/执行抽象): - 将库内部依赖的抽象限制在小而稳定的接口(
execute(fn),schedule(request)),并提供适配层(adapters)以桥接到std::execution或第三方执行器。 -
避免在公共 ABI 中暴露模板化的具体实现细节,优先使用
std::shared_ptr<AbstractScheduler>/std::unique_ptr<AbstractExecutor>等间接化指针类型以便替换实现。 -
单头发布 (single-header):保持生成脚本
scripts/generate_single_header.py的可用性,在合并实现时清楚列出需要内联或移除的实现细节,以减少用户端破坏性变更。
常见修改场景示例
-
将默认
SimpleScheduler替换为基于线程池的PriorityScheduler:实现PriorityScheduler继承AbstractScheduler-> 在启动阶段set_default_scheduler(std::make_shared<PriorityScheduler>())-> 所有Task::start()将自动使用新策略。 -
在 awaiter 中直接切换到特定线程池:编写 awaiter,在
await_suspend中调用executor->execute([handle]{ handle.resume(); })。
参考文件索引(快速跳转)
- 任务/Promise:
include/koroutine/task.hpp,include/koroutine/task_promise.hpp - 调度/请求:
include/koroutine/schedulers/scheduler.h,include/koroutine/schedulers/schedule_request.hpp,include/koroutine/schedulers/SimpleScheduler.h - 执行器:
include/koroutine/executors/looper_executor.h,include/koroutine/executors/new_thread_executor.h,include/koroutine/executors/async_executor.h - 管理:
include/koroutine/scheduler_manager.h,src/scheduler_manager.cpp - 运行时:
include/koroutine/runtime.hpp - Async I/O:
include/koroutine/async_io/*