Skip to content

执行器与调度器

koroutine_lib 中,执行器 (Executor)调度器 (Scheduler) 是控制协程在何处、何时运行的核心组件。理解它们是掌握高级异步编程的关键。

1. Executor: 在哪里执行?

Executor 是一个非常简单的概念:它是一个对象,可以接受一个函数(std::function<void()>)并执行它。它代表了“执行上下文”,即你的代码实际运行的地方。

koroutine_lib 提供了几种内置的 Executor

  • ThreadPoolExecutor: 一个固定大小的线程池执行器。它维护一组工作线程和一个任务队列。
  • 优点: 高效利用系统资源,避免频繁创建销毁线程,适合高并发场景。
  • 缺点: 需要注意线程安全问题。

  • LooperExecutor: 该执行器内部维护一个独立的事件循环线程。所有提交给它的任务都会被放入一个队列中,由该线程按顺序执行。

  • 优点: 保证任务在同一个线程上串行执行,非常适合需要线程亲和性的场景(如 UI 更新、访问非线程安全资源)。
  • 缺点: 如果一个任务阻塞,会阻塞后续所有任务。

  • NewThreadExecutor: 最简单粗暴的执行器。每次调用 execute,它都会创建一个全新的 std::thread 来运行任务,然后立即分离 (detach)。

  • 优点: 简单,任务之间完全隔离。
  • 缺点: 创建线程的开销很大,不适合大量、短小的任务。

2. Scheduler: 如何调度?

如果说 Executor 是“工人”,那么 Scheduler 就是“工头”。Scheduler 管理一个或多个 Executor,并根据特定的策略决定将任务分派给哪个“工人”。

Scheduler 是一个更高级的抽象,它允许你实现复杂的调度逻辑,例如:

  • 负载均衡: 将任务分发给最空闲的 Executor
  • 优先级调度: 优先执行高优先级的任务。
  • 时间调度: 在指定时间点或延迟后执行任务。

SchedulerManager

koroutine_lib 提供了 SchedulerManager 来管理全局默认的调度器。

  • 默认调度器: 通过 SchedulerManager::get_default_scheduler() 获取。默认情况下,它是一个 SimpleScheduler,内部使用 ThreadPoolExecutor,线程数等于硬件并发数。
// 获取默认调度器
auto default_scheduler = SchedulerManager::get_default_scheduler();

// 如果需要,你可以创建新的调度器(拥有独立的线程池)
auto my_scheduler = std::make_shared<SimpleScheduler>();

// 设置新的默认调度器
SchedulerManager::set_default_scheduler(my_scheduler);

3. co_await switch_to(scheduler): 在协程中切换上下文

koroutine_lib 最强大的功能之一,就是允许协程在不同的调度器(即不同的线程或线程池)之间无缝切换。这是通过 co_await switch_to(scheduler) 实现的。

switch_to 会返回一个 Awaiter。当 co_await 这个 Awaiter 时: 1. 当前协程会挂起。 2. Awaiter 将协程的恢复句柄(continuation)提交给你指定的 Scheduler。 3. 该 Scheduler 会在它管理的 Executor 上安排恢复操作。 4. 当协程恢复时,它已经运行在新的线程上下文中了。

示例:将 I/O 操作卸载到独立线程池

假设我们有一个任务,它需要先在主线程上做一些计算,然后执行一个耗时的文件写入,最后再回到主线程更新状态。

Task<void> process_data_and_write_to_file(const std::string& data, std::shared_ptr<AbstractScheduler> io_scheduler) {
    // 当前在默认调度器上
    std::cout << "1. 准备数据 on thread " << std::this_thread::get_id() << std::endl;
    auto processed_data = data + " [processed]";

    // 切换到 I/O 调度器来执行文件操作
    co_await switch_to(io_scheduler);

    std::cout << "2. 写入文件 on thread " << std::this_thread::get_id() << std::endl;
    // 模拟耗时的文件写入
    co_await sleep_for(std::chrono::seconds(2));
    // std::ofstream file("output.txt"); file << processed_data;

    // 切换回默认调度器
    co_await switch_to(SchedulerManager::get_default_scheduler());

    std::cout << "3. 操作完成 on thread " << std::this_thread::get_id() << std::endl;
    co_return;
}

int main() {
    // 创建一个独立的调度器用于IO操作
    auto io_scheduler = std::make_shared<SimpleScheduler>();

    Runtime::block_on(process_data_and_write_to_file("my_data", io_scheduler));
}

输出可能会是这样:

1. 准备数据 on thread 0x10e9c7000
2. 写入文件 on thread 0x16f5b3000  // <-- 线程 ID 变了!
3. 操作完成 on thread 0x10e9c7000  // <-- 线程 ID 又变回来了(或者变成了默认调度器线程池中的另一个线程)!

通过这种方式,你可以将不同性质的任务隔离在不同的线程池中,防止 I/O 操作阻塞计算任务,从而极大地提升应用的响应性和吞吐量。这是构建高性能服务器和复杂应用的基石。