执行器与调度器
在 koroutine_lib 中,执行器 (Executor) 和 调度器 (Scheduler) 是控制协程在何处、何时运行的核心组件。理解它们是掌握高级异步编程的关键。
1. Executor: 在哪里执行?
Executor 是一个非常简单的概念:它是一个对象,可以接受一个函数(std::function<void()>)并执行它。它代表了“执行上下文”,即你的代码实际运行的地方。
koroutine_lib 提供了几种内置的 Executor:
ThreadPoolExecutor: 一个固定大小的线程池执行器。它维护一组工作线程和一个任务队列。- 优点: 高效利用系统资源,避免频繁创建销毁线程,适合高并发场景。
-
缺点: 需要注意线程安全问题。
-
LooperExecutor: 该执行器内部维护一个独立的事件循环线程。所有提交给它的任务都会被放入一个队列中,由该线程按顺序执行。 - 优点: 保证任务在同一个线程上串行执行,非常适合需要线程亲和性的场景(如 UI 更新、访问非线程安全资源)。
-
缺点: 如果一个任务阻塞,会阻塞后续所有任务。
-
NewThreadExecutor: 最简单粗暴的执行器。每次调用execute,它都会创建一个全新的std::thread来运行任务,然后立即分离 (detach)。 - 优点: 简单,任务之间完全隔离。
- 缺点: 创建线程的开销很大,不适合大量、短小的任务。
2. Scheduler: 如何调度?
如果说 Executor 是“工人”,那么 Scheduler 就是“工头”。Scheduler 管理一个或多个 Executor,并根据特定的策略决定将任务分派给哪个“工人”。
Scheduler 是一个更高级的抽象,它允许你实现复杂的调度逻辑,例如:
- 负载均衡: 将任务分发给最空闲的
Executor。 - 优先级调度: 优先执行高优先级的任务。
- 时间调度: 在指定时间点或延迟后执行任务。
SchedulerManager
koroutine_lib 提供了 SchedulerManager 来管理全局默认的调度器。
- 默认调度器: 通过
SchedulerManager::get_default_scheduler()获取。默认情况下,它是一个SimpleScheduler,内部使用ThreadPoolExecutor,线程数等于硬件并发数。
// 获取默认调度器
auto default_scheduler = SchedulerManager::get_default_scheduler();
// 如果需要,你可以创建新的调度器(拥有独立的线程池)
auto my_scheduler = std::make_shared<SimpleScheduler>();
// 设置新的默认调度器
SchedulerManager::set_default_scheduler(my_scheduler);
3. co_await switch_to(scheduler): 在协程中切换上下文
koroutine_lib 最强大的功能之一,就是允许协程在不同的调度器(即不同的线程或线程池)之间无缝切换。这是通过 co_await switch_to(scheduler) 实现的。
switch_to 会返回一个 Awaiter。当 co_await 这个 Awaiter 时:
1. 当前协程会挂起。
2. Awaiter 将协程的恢复句柄(continuation)提交给你指定的 Scheduler。
3. 该 Scheduler 会在它管理的 Executor 上安排恢复操作。
4. 当协程恢复时,它已经运行在新的线程上下文中了。
示例:将 I/O 操作卸载到独立线程池
假设我们有一个任务,它需要先在主线程上做一些计算,然后执行一个耗时的文件写入,最后再回到主线程更新状态。
Task<void> process_data_and_write_to_file(const std::string& data, std::shared_ptr<AbstractScheduler> io_scheduler) {
// 当前在默认调度器上
std::cout << "1. 准备数据 on thread " << std::this_thread::get_id() << std::endl;
auto processed_data = data + " [processed]";
// 切换到 I/O 调度器来执行文件操作
co_await switch_to(io_scheduler);
std::cout << "2. 写入文件 on thread " << std::this_thread::get_id() << std::endl;
// 模拟耗时的文件写入
co_await sleep_for(std::chrono::seconds(2));
// std::ofstream file("output.txt"); file << processed_data;
// 切换回默认调度器
co_await switch_to(SchedulerManager::get_default_scheduler());
std::cout << "3. 操作完成 on thread " << std::this_thread::get_id() << std::endl;
co_return;
}
int main() {
// 创建一个独立的调度器用于IO操作
auto io_scheduler = std::make_shared<SimpleScheduler>();
Runtime::block_on(process_data_and_write_to_file("my_data", io_scheduler));
}
输出可能会是这样:
1. 准备数据 on thread 0x10e9c7000
2. 写入文件 on thread 0x16f5b3000 // <-- 线程 ID 变了!
3. 操作完成 on thread 0x10e9c7000 // <-- 线程 ID 又变回来了(或者变成了默认调度器线程池中的另一个线程)!
通过这种方式,你可以将不同性质的任务隔离在不同的线程池中,防止 I/O 操作阻塞计算任务,从而极大地提升应用的响应性和吞吐量。这是构建高性能服务器和复杂应用的基石。