使用C++20协程和io_uring优雅地实现异步IO
  Z9KOrx4bDlAX 2024年03月27日 32 0
C++

距离2020年已经过去很久了,各大编译器对于C++20各项标准的支持也日趋完善,无栈协程也是其中之一,所以我就尝试着拿协程与io_uring实现了一下proactor模式,这篇文章用来记录一下我的设计和想法。除此之外,我们能在网络上找到许多优秀的C++20协程的教程以及许多优秀的协程应用(库),但从协程入门到架构出成熟的应用(库)之间还存在着不小的鸿沟,而直接去啃大型工程的源代码绝对不算是一种高效率的学习方式。所以,如果这篇文章能够在这方面提供一定的帮助的话那就再好不过了。

正如上所述,这篇文章是介绍基于C++20协程实现异步IO的,而不是介绍C++20协程的,因此有一定的阅读门槛。在阅读之前,你应当至少熟悉一下C++20协程。

为什么要使用协程

因为协程能够让我们像写同步IO那样来实现异步IO,如下所示:

auto foo(tcp_connection connection) -> task<void> {
    char buffer[1024];
    int result = co_await connection.recv(buffer, sizeof(buffer));
    // do something...
    result = co_await connection.send(buffer, result);
    // do something...
    co_return;
}

如果我们合理地实现了协程的挂起、恢复等操作,那么当我们执行co_await connection.recv时,我们实际上希望代码执行的操作如下:

  1. 告诉操作系统,监听recv操作,等待对方发送数据;
  2. 挂起当前协程;
  3. 去处理别的事情。

当操作系统接收到recv的数据时,执行以下操作:

  1. 处理recv,把数据读进来;
  2. 恢复之前挂起的协程,从挂起的地方恢复执行。

这就是我们需要协程做的事情。如果你熟悉reactor模式的话,这应该并不陌生——我们只是把回调函数换成了协程而已。那么回到这一部分的标题——我们为什么要使用协程而不是回调函数呢?——因为使用协程写出来的代码更好看,也更好维护,仅此而已。

关于为什么要使用异步IO:异步IO能够提高程序的吞吐量。试想一下一台基于同步IO的HTTP服务器,一种不难想到的实现方式是每accept一个连接,就创建一个新的线程来处理这个连接的IO,最后当这个连接断开时销毁这个线程。这么实现当然可以,但创建和销毁线程的开销是很大的,而且这要求线程调度器能够很好地分配线程之间的时间。使用IO多路复用的方式能够利用有限(甚至单线程)处理许多连接的IO,而不至于浪费过多的资源。

关于协程和回调函数的性能:我想二者应当是差不多的,或者协程可能还会更差一点,因为挂起协程和恢复协程需要执行一些额外操作。不过既然性能还没有紧张到需要去抠dpdk,那么和这一点点的性能优势相比较的话,代码的可维护性和可读性绝对也是不容忽视的问题。

关于异步IO的性能:我们通常讲异步IO性能更好指的是吞吐量,而不是低延时。不论是reactor模式还是proactor模式,其设计主旨都是要让CPU在等待IO的时候去处理别的事情,不要让CPU闲下来。如果低延时很重要的话,应当考虑使用同步IO与轮询的方式。

设计思路

根据第一部分,设计的基调就能够定下来了。我们重新考虑一下需要做的事情:

  1. 当我们执行到co_await read(...)等异步IO时,挂起当前协程,去处理其他事情;
  2. 当异步IO执行完毕时,恢复协程的执行。

仔细思考一下上述两点,我们就能够得到所有要做的事情:

  1. 我们需要适时挂起协程,所以首先我们要实现协程task
  2. 协程可能会调用协程,所以需要维护一下协程的调用栈(我是在promise里维护的);
  3. 协程是用来处理异步IO的,所以我们需要有一些组件来处理io_uring的IO(我是在io_context_worker中处理的);
  4. 当异步IO执行完毕时,需要有什么东西恢复协程的执行(这也是在io_context_worker中处理的);
  5. 当整个协程执行完毕时,需要销毁协程(这也是在io_context_worker中处理的)。

在继续阅读之前,我先贴一下代码。对照着代码看的话会舒服一些:GitHub

taskpromise

taskpromise均在coco/task.hpp中定义。我对task的定位正如协程最基本的功能——能够挂起和恢复的函数。task类本身只是对std::coroutine_handle的简易封装。在这里我只介绍一下taskoperator co_await

taskoperator co_await只是返回task_awaitable,所以co_await处理的重点实际上是在task_awaitable中实现的。考虑一下,当我们co_await一个task时,我们究竟是在干什么:

  1. 挂起当前协程
  2. 维护协程的调用栈
  3. 启动被co_await的协程

task_awaitable::await_suspend()中很容易看出这三点:

template <class T>
template <class Promise>
auto task_awaitable<T>::await_suspend(
    std::coroutine_handle<Promise> caller) noexcept -> coroutine_handle {
    // Set caller for this coroutine.
    promise_base &base = static_cast<promise_base &>(m_coroutine.promise());
    promise_base &caller_base = static_cast<promise_base &>(caller.promise());
    base.m_caller_or_top      = &caller_base;

    // Maintain stack bottom and top.
    promise_base *stack_bottom = caller_base.m_stack_bottom;
    assert(stack_bottom == stack_bottom->m_stack_bottom);

    base.m_stack_bottom           = stack_bottom;
    stack_bottom->m_caller_or_top = &base;

    return m_coroutine;
}

这里着重讲一下维护协程的调用栈。对于协程而言,至少存在一个协程,它是在协程外创建的(比如main函数)。因为它不在协程中,所以它也无法被co_await,这个协程我们称之为协程的栈底。在这个协程的执行过程中,它可能会创建和执行新的协程。当新的协程执行完毕时,它们会被清理,并且将执行权交还给调用者,这与普通函数的调用栈是一样的,只不过这个功能需要我们自己来实现。

因为promise在内存中的位置是不可移动的(我禁止了promise的拷贝与移动),所以我直接采用了类似链表的方式将协程的调用栈串了起来。在promise_base中,有两个成员变量用来维护这个调用栈:

promise_base *m_caller_or_top;
promise_base *m_stack_bottom;

因为第一个变量被复用了(具备不同的含义),所以可能有点乱。对于栈底协程而言,m_caller_or_top指向当前调用栈的栈顶协程,对于其他协程而言,m_caller_or_top指向自己的调用者(父协程)。这么设计是因为栈底协程不存在调用者,所以就干脆用这个变量存一下栈顶了。m_stack_bottom顾名思义,就是指向栈底的协程。对于栈底协程而言,这个变量指向的就是它自己了。

有了m_caller_or_top,当一个协程执行完毕时,就能方便地找到它的父协程并交换执行权。有了m_stack_bottomm_caller_or_top,我们就能很方便地找到协程的栈底和栈顶。当需要恢复task时,就能够保证总是恢复栈顶的协程。

当协程执行完毕时,需要将控制权交还给父协程。我们考虑一下交还控制权需要做的事情:

  1. 维护调用栈,变更栈顶
  2. 如果不是栈底,则恢复父协程的执行

协程执行完毕时会去尝试执行promisefinal_suspend(),因此这部分代码在promisefinal_suspend()中实现。final_suspend()返回的类型叫promise_awaitable,其对应的代码如下:

template <class Promise>
auto promise_awaitable::await_suspend(
    std::coroutine_handle<Promise> coroutine) noexcept
    -> std::coroutine_handle<> {
    promise_base &base  = static_cast<promise_base &>(coroutine.promise());
    promise_base *stack = base.m_stack_bottom;

    // Stack bottom completed. Nothing to resume.
    if (stack == &base)
        return std::noop_coroutine();

    // Set caller coroutine as the top of the stack.
    promise_base *caller   = base.m_caller_or_top;
    stack->m_caller_or_top = caller;

    // Resume caller.
    return caller->m_coroutine;
}

这段代码应该非常易懂,不过我们很容易联想到一个问题:既然交还了控制权,那么它是在何时销毁的?

其实这也不难想到,协程是由父协程销毁的。协程的返回值存放在promise中,当父协程co_await sometask时,父协程还需要读取子协程的promise以获取返回值。当子协程task<T>析构时,子协程才真正被销毁。

io_context的设计

既然协程需要异步地处理IO,那么必然需要个处理IO的地方,就是io_contextio_context维护了一个线程池,线程池中每一个线程均执行一个worker,每个worker均维护一个io_uring来处理本线程的IO事件和协程。当需要提交新的task给线程池的时候,由io_context分配给某一个worker执行。

这听起来和reactor模式好像没啥区别,用epoll写reactor模式的时候基本上也是这么干的,这是因为我本来就是从reactor模式那边搬过来的。不过相比于reactor模式,这么做还是有不少细节要处理的。在使用io_uring时,每次我们启动异步IO时,都需要获取到io_uring对象——要使用io_uring_prep_<io_operation>系列函数,我们必须从io_uring对象中获取一个sqe。而如上所述,io_uring对象在worker中,这就造成了一个麻烦:我们无法在io_contexttask分配worker的时候将io_uring对象的引用(指针)传递给task。虽然使用全局变量不失为一种选择,但我不想这么做,因为也许使用者想要在一个进程中创建几个不同的io_context用呢。

虽然无法直接将io_uring的引用传递给task,但还有一种方法可以进行交互。在所有awaitableawait_suspend中,我们可以拿到当前协程的coroutine_handle,而在io_context中,我们也能拿到taskcoroutine_handle,因此可以通过promise来传递io_uring的引用。

具体在实现时,我没有传递io_uring的引用,而是传递了worker的指针。这么做是因为当初我想同时支持IOCP,传递worker可以省掉一些麻烦,虽然后来放弃了。worker的指针只被放在协程栈的栈底,这么做是因为当协程在不同worker之间转移时,能够很方便地修改协程所属的worker(只需要修改栈底就可以了),尽管后来也没有实现work-stealing队列。在promise中,处理worker的方法如下所示:

/// \brief
///   Set I/O context for current coroutine.
/// \param[in] io_ctx
///   I/O context to be set for current coroutine.
auto set_worker(io_context_worker *io_ctx) noexcept -> void {
    m_stack_bottom->m_worker.store(io_ctx, std::memory_order_release);
}

/// \brief
///   Get I/O context for current coroutine.
/// \return
///   I/O context for current coroutine.
[[nodiscard]] auto worker() const noexcept -> io_context_worker * {
    return m_stack_bottom->m_worker.load(std::memory_order_acquire);
}

不过这么做也有一个缺点,就是io_context侵入了promise的设计,使得task必须在io_context中才能发挥作用。

awaitable的设计

awaitablecoco/io.hpp中定义。各种awaitable的设计就比较简单了。以read_awaitable为例,它在await_suspend()中获取当前协程所属的worker,然后启用异步IO,如下所示:

template <class Promise>
auto await_suspend(std::coroutine_handle<Promise> coro) noexcept -> bool {
    m_userdata.coroutine = coro.address();
    return this->suspend(coro.promise().worker());
}

获取了当前协程的worker后,就转入this->suspend()函数中去执行了。suspend()方法主要的工作是启动异步IO操作,并挂起当前协程:

auto coco::read_awaitable::suspend(io_context_worker *worker) noexcept -> bool {
    assert(worker != nullptr);
    m_userdata.cqe_res   = 0;
    m_userdata.cqe_flags = 0;

    io_uring     *ring = worker->io_ring();
    io_uring_sqe *sqe  = io_uring_get_sqe(ring);
    while (sqe == nullptr) [[unlikely]] {
        io_uring_submit(ring);
        sqe = io_uring_get_sqe(ring);
    }

    io_uring_prep_read(sqe, m_file, m_buffer, m_size, m_offset);
    sqe->user_data = reinterpret_cast<uint64_t>(&m_userdata);

    int result = io_uring_submit(ring);
    if (result < 0) [[unlikely]] { // Result is -errno.
        m_userdata.cqe_res = result;
        return false;
    }

    return true;
}

我没有把启用异步IO部分放到模板函数中,这是因为对于各种不同的IO操作,这部分的代码实际上大同小异。但考虑到这部分代码的长度,放到模板中可能会导致比较严重的二进制膨胀,所以就单独拿出来放到.cpp文件中了。

如果去翻我之前的commit记录的话,会发现起初我并没有把各种awaitable暴露出来,而是让各种异步操作(比如connection.receive())返回task。后来将awaitable暴露出来是考虑到诸如readwrite等操作可能会被频繁地调用,而每次创建一个协程都需要申请一次堆内存,在循环中执行的话可能对运行效率有比较严重的影响。

关于代码

再放一遍代码地址:GitHub

这份代码不长,总共两三千行,而且其中一多半都是注释,结合本文的话应该不会很难读。这本身只是一份实验性质的代码,同时我希望它适合拿来学习,所以我并没有打算塞入太多的功能。除此之外,我不是很建议你拿来放到工程中使用,因为我可能会一时兴起做出一些breaking change。如果你真的有这个需要的话,我建议你fork一份代码自己维护。

一些可能会被问到的问题

  1. 为什么没有实现UDP相关的IO?

因为io_uring似乎还没有支持recvfrom,至少我实现的时候还没有。

  1. 为什么不使用mmap和内核共享内存/为什么不向io_uring注册文件描述符等性能相关的问题

因为我不是io_uring专家。我写这个库的目的是学习用C++20的协程架构一个异步IO库,做这些性能优化会加大架构难度,并且花掉我大量的头发和时间,使我本不茂密的头发雪上加霜。除此之外,你会发现我也没有实现work-stealing队列,原因同理。Round Robin的性能虽然不至于最优,但也不会太差。

  1. 考不考虑加上HTTP支持?

考虑过,太懒所以放弃了。一方面是手写HTTP parser还是挺麻烦的,就算能用bison自动生成也还得去啃RFC。另一方面是,TCP作为一种基于流的协议,我没有想好如何处理连接的缓存能够兼顾性能和使用的便捷性。如果你有这方面的需求的话,不妨先用着其他的HTTP parser,比如llhttp

  1. 为什么没有实现yield

我觉得异步IO一般用不到这东西,所以就没写。如果需要的话就自己实现吧。

  1. 会支持Windows(IOCP)吗?

我有考虑过支持IOCP,但IOCP不支持定时器,我又不想删除Linux这边的timer,所以暂且没有这个想法。

  1. 作者你README写得好水啊

确实,我也觉得好水啊,有没有好心人帮忙修一修啊

  1. 为什么不用中文写注释和README?

不用中文写注释是因为clang-format没法处理中文的断行。不用中文写README是因为懒,不想写两份README。说不定哪天心情好了就写一份中文版。

碎碎念

要不要塞张插图呢?

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2024年03月27日 0

暂无评论

推荐阅读
  5ixybSny6Da0   12天前   21   0   0 C++
Z9KOrx4bDlAX