找回密码
 立即注册
首页 业界区 业界 基于 epoll 的协程调度器——零基础深入浅出 C++20 协程 ...

基于 epoll 的协程调度器——零基础深入浅出 C++20 协程

贡醮 2025-9-28 16:49:37
前言

上一篇《没有调度器的协程不是好协程》谈到协程如何自动运行,然而那个例子里的调度器还是不太自然,考查一下真实场景,挂起的协程一般是在等待异步事件的完成,如果异步事件没完成就轮到自己执行,它其实还是无法继续,相当于一次无效唤醒。所以这一篇准备引入异步事件,看看在真实的场景下,调度器是如何运作的。
文章仍然遵守之前的创作原则:
* 选取合适的 demo 是头等大事
* 以协程为目标,涉及到的新语法会简单说明,不涉及的不旁征博引
* 若语法的原理非常简单,也会简单展开讲讲,有利于透过现象看本质,用起来更得心应手
上一篇文章里不光引入了初级的调度器,还说明了 final_suspend 与协程自清理的关系、协程句柄通过类型擦除来屏蔽用户定义承诺对象的差异、以及 lambda 表达式的本质是仿函数等,如果没有这些内容铺垫,看本文时会有很多地方难以理解,还没看过的小伙伴,墙裂建议先看那篇。
工具还是之前介绍过的 C++ Insights ,这里不再用到 Compile Explorer,主要是它的运行环境不支持像文件、网络之类的异步 IO,为此需要用户自行搭建开发环境。
基于 epoll 的 IO 多路复用

本文演示的异步 IO 以文件操作为主,相比网络操作它具有代码量少、易于测试的优点。为了简化复杂度,这里没有接入任何三方库,而是直接调用操作系统 raw API,阅读本文需要具有 IO 多路复用 (multiplexing) 的知识基础,例如 Linux 的 epoll 或 Windows 的 IOCP。
在单线程时代,想要处理多个 IO 事件也不是不行,只要将异步 IO 句柄交给 select / poll / epoll / kqueue 等待即可,当任一 IO 事件到达时,控制权将从阻塞等待中返回,并告知用户哪个句柄上有何种事件发生,从而方便用户直接处理那个句柄上的 IO 事件,并且预期将不会被阻塞。这种模型因为检测完成后,还需要用户动作一下,也称为 Reactor 模型;相对的,还有 Proactor 模型,主要是基于 Windows IOCP,当事件完成时,相应的读、写动作已由系统完成,不再需要用户动作,故有此区别,关于这一点,后面在介绍基于 IOCP 的调度器时详述。
类 Unix 系统上的 IO 多路分离器比较多,早期的 select 就能监控 IO 句柄的读、写、异常三个事件集,并且带超时能力;后面发展的 poll 消除了 select 对句柄数量的限制;Linux 上诞生的 epoll 解决了 select & poll 在句柄数量增长时效能线性下降的问题,主要优化了句柄集合在用户态与内核态的来回复制、返回时遍历句柄集等性能开销;kqueue 则是 BSD 系统上的 epoll 平替,两者都支持水平触发与边缘触发两种模式。
水平触发意味着只要句柄上有事件,分离器就会一直通知,上述四个默认都是水平触发,适合少量离散数据的场景;边缘触发意味着一次通知中如果不将对应的事件处理完,下次不会再通知,除非有新的事件产生,epoll / kqueue 可选边缘触发,适合大数据量的场景,可以有效缓解高频通知导致的数据传输低效问题。
恶补了 IO 多路复用机制相关的知识后,考虑到我们是在 Linux 上进行测试,这里选取了 epoll 作为分离器。需要注意的是 epoll 不能直接处理普通文件读写,需要借助 fifo 文件,后面我们会看到这一点,话不多说直接上 demo:
  1. #include <coroutine>
  2. #include <unordered_map>
  3. #include <sys/epoll.h>
  4. #include <unistd.h>
  5. #include <fcntl.h>
  6. #include <vector>
  7. #include <stdexcept>
  8. #include <iostream>
  9. #include <sstream>
  10. #define MAX_EVENTS 10
  11. struct Task {
  12.     struct promise_type {
  13.         Task get_return_object() { return {}; }
  14.         std::suspend_never initial_suspend() { return {}; }
  15.         std::suspend_never final_suspend() noexcept { return {}; }
  16.         void return_void() {}
  17.         void unhandled_exception() { std::terminate(); }
  18.     };
  19. };
  20. class EpollScheduler {
  21. private:
  22.     int epoll_fd;
  23.     std::unordered_map<int, std::coroutine_handle<>> io_handles;
  24. public:
  25.     EpollScheduler() {
  26.         epoll_fd = epoll_create(MAX_EVENTS);
  27.         if (epoll_fd == -1) {
  28.             std::stringstream ss;
  29.             ss << "epoll_create failed, error " << errno;
  30.             throw std::runtime_error(ss.str());
  31.         }
  32.     }
  33.     ~EpollScheduler() {
  34.         close(epoll_fd);
  35.     }
  36.     void register_io(int fd, std::coroutine_handle<> handle) {
  37.         if (io_handles.find(fd) == io_handles.end()) {
  38.             io_handles[fd] = handle;
  39.             epoll_event event{};
  40.             event.events = EPOLLIN | EPOLLET;
  41.             event.data.fd = fd;
  42.             if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
  43.                 std::stringstream ss;
  44.                 ss << "epoll_ctl failed, error " << errno;
  45.                 throw std::runtime_error(ss.str());
  46.             }
  47.         }
  48.     }
  49.     void run() {
  50.         while (true) {
  51.             epoll_event events[MAX_EVENTS] = { 0 };
  52.             int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
  53.             for (int i = 0; i < n; ++i) {
  54.                 int ready_fd = events[i].data.fd;
  55.                 if (auto it = io_handles.find(ready_fd); it != io_handles.end()) {
  56.                     it->second.resume();
  57.                 }
  58.             }
  59.         }
  60.     }
  61. };
  62. struct AsyncReadAwaiter {
  63.     EpollScheduler& sched;
  64.     int fd;
  65.     std::string buffer;
  66.     AsyncReadAwaiter(EpollScheduler& s, int file_fd, size_t buf_size)
  67.         : sched(s), fd(file_fd), buffer(buf_size, '\0') {}
  68.     bool await_ready() const {
  69.         return false;
  70.     }
  71.     void await_suspend(std::coroutine_handle<> h) {
  72.         sched.register_io(fd, h);
  73.     }
  74.     std::string await_resume() {
  75.         ssize_t n = read(fd, buffer.data(), buffer.size());
  76.         if (n == -1) {
  77.             std::stringstream ss;
  78.             ss << "read failed, error " << errno;
  79.             throw std::runtime_error(ss.str());
  80.         }
  81.         buffer.resize(n);
  82.         return std::move(buffer);
  83.     }
  84. };
  85. Task async_read_file(EpollScheduler& sched, const char* path) {
  86.     int fd = open(path, O_RDONLY | O_NONBLOCK);
  87.     if (fd == -1) {
  88.         std::stringstream ss;
  89.         ss << "open failed, error " << errno;
  90.         throw std::runtime_error(ss.str());
  91.     }
  92.     while (true) {
  93.         auto data = co_await AsyncReadAwaiter(sched, fd, 4096);
  94.         std::cout << "Read " << data.size() << " bytes\n";
  95.         // if (data.size() == 0)
  96.         //     break;
  97.     }
  98.     close(fd);
  99. }
  100. int main(int argc, char* argv[]) {
  101.     if (argc < 2) {
  102.         std::cout << "Usage: sample pipe" << std::endl;
  103.         return 1;
  104.     }
  105.     EpollScheduler scheduler;
  106.     async_read_file(scheduler, argv[1]);
  107.     scheduler.run();
  108.     return 0;
  109. }
复制代码
有上一篇的铺垫,看起来没什么尿点,甚至有点老三样。唯一有新意的地方是 co_await 也能通过 await_resume 获取返回数据,这与 co_yield & co_return 有异曲同工之妙,体现出 C++20 协程灵活的一面。
多文件并行

上面的例子虽然通过多次读取展示了协程多次唤醒的过程,但没有展示多个 IO 句柄并发的能力,下面稍加改造,同时读取多个 fifo:
[code]Task async_read_file(EpollScheduler& sched, const char* path) {...    while (true) {        auto data = co_await AsyncReadAwaiter(sched, fd, 4096);        std::cout

相关推荐

您需要登录后才可以回帖 登录 | 立即注册