本文是 C++ Coroutines and Structured Concurrency in Practice 的个人观片笔记。主要内容是作者(Dmitry Prokoptsev)分享他考虑结构化并发而实现的 C++20 协程网络库 Corral

如今协程库基本上是满街走了,但是这场演讲对于结构化并发的讨论是相当的不错。

一点说明:

  • 和上一篇笔记类似,本文的内容会相对简略。推荐看原视频,感受下作者离谱的语速。
  • 本文假定读者已了解 C++20 协程。不了解也没关系,看个思路。

实在的好处

协程的好处其实没有必要强调,用过都说好。

callback-vs-coroutine

作者给了一个与传统回调对比的例子,可以看出协程更好写、更好读,也更易维护。协程对于生命周期、资源管理和错误传播三个方面都能得到收益:不再是跟语言对抗,而是享受语言带来的特性。

潜在的问题

typical-async-framework 市面上典型的异步框架

一个典型的异步框架通常由三部分组成:

  • task:可能是一个线程或者协程等等,通常是一个与异步任务关联的具有生命周期的实体。
  • join:一个显式的暂停点和用于错误处理的传播路径。
  • detach:允许任务分离执行。

(个人习惯把 task 称为异步操作,不过本文均采用作者的描述方式:任务或者 task。)

作者认为,如果协程库采用这种设计,其潜在的问题是:分离的任务是有害的。

// 不要这么干
void bad(tcp::socket& s) {
    std::array<char> buf(1024);
    asio::co_spawn(
        ex,
        [&s, &buf]() -> asio::awaitable<void> {
            co_await s.read_some(
                s,
                asio::buffer(buf),
                asio::use_awaitable
            );
        },
        asio::detached
    );
}

(作者给的很多代码都是毛手毛脚的,不要在意缺失的细节……后面不再提醒了)

以 Asio 为例,使用 asio::co_spawn(asio::detached) 生成分离的异步任务后,你再也无法得知传递进去的资源(比如 buf)是否还在被 detached task 所占用。当然这段代码有更加离谱的问题,就是任务完成前 buf 就已经结束生命周期了。

// don’t do this either
void slightly_better(tcp::socket& s) {
    // 常见的 shared state 设计
    auto buf = make_shared<char[]>(1024);
    asio::co_spawn(
        ex,
        [&s, buf]() -> asio::awaitable {
            co_await s.read_some(
                s,
                asio::buffer(buf.get(), 1024),
                asio::use_awaitable
            );
        },
        asio::detached
    );
}

很容易想到使用智能指针去封装需要的上下文,尽管这意味着有大量的共享状态的实体在你的框架里乱飞。但更重要的是仍有错误处理的问题,如果在 detached task 当中抛出异常,这种实现方式是没有办法传播异常的。

这使得设计上要么完全忽略错误,要么直接停掉整个程序,非常尴尬。

// and also don’t do this
void slightly_better(tcp::socket& s) {
    auto buf = make_shared<char[]>(1024);
    asio::co_spawn(
        ex,
        [&s, buf]() -> asio::awaitable {
            try {
                co_await s.read_some(
                    s,
                    asio::buffer(buf.get(), 1024),
                    asio::use_awaitable
                );
            } catch (std::exception& e) {
                // ...um?..
            }
        },
        asio::detached
    );
}

很容易想到在当前上下文 try-catch 来解决无法传播的错误处理。不过尴尬在于你想怎么处理,打个日志当无事发生?正因为在当前上下文无法处理(没有足够的信息,并且你也不是最终用户),所以需要向上传播,这是问题搞反了。

因此,使用 detach 设计的协程在资源管理和错误处理方面,依然存在和传统回调无异的糟糕体验。

The completion token that will handle the notification that the thread of execution has completed. The function signature of the completion handler must be: void handler(std::exception_ptr, T);

NOTE: 作者这里其实有点强词夺理,使用分离式设计强制 fire and forget 能怪谁。见 co_spawn 的函数签名,搭配 handler 即可传播异常了。不过作为问题的出发点是挺好的,Asio 有这能力不代表别的框架有同等水平。

解决方案?

怎么解决这些问题,一个简单做法是完全移除分离式设计,即要求每一个任务都必须在某个时刻 join。也许「某个时刻」的最佳选择是析构,也就是在 task 的析构函数中放置 join。这是因为析构函数是唯一的能保证最终能调用的函数。

但是错误处理的传播仍有问题,因为析构函数通常假定不抛出异常,除非你有自己明确定义的规则(确保另一个异常正在处理时能够丢弃异常?但这是不好的实践)。

destructor-is-never-reached

更加复杂的问题是,你的程序可能永远不会触及析构。上图的 liveness 是一个保活后台任务,假设抛了异常,本来是应然在 join 时传播的,但是 join 所在的析构事实上不可触及。

最终结果就是保活服务没了,并且你完全不知情。尴尬在于这种 IO 模式还很常见。

这个问题的根因在于 liveness task 是运行在后台的,我们要进一步摒弃「后台执行」这个概念。

结构化并发

the-ground-rule

作者在框架中规定了结构化并发的规则:task 只有在被等待时co_await task才允许运行,且正在等待的上层awating caller恢复后,task 必然处于已完成的状态。

NOTE: 关于结构化并发的更多信息可参考上图的链接,主要是讨论 Go 语言的 go 语句和 goto 控制流的相似性,认为后台执行是一种不受控的并发控制流,也就是非结构化的并发;进一步提出 nursery 语句来实现结构化并发。

这种有结构、受控的并发控制流保证了:上层恢复后,task 的生命周期(已完成)、资源管理(可释放)和异常处理(会传播)都是确定的。同时,结构化并发也意味着没有「后台执行」的概念。

Combiner

Task<void> greet() {
    cout << "going to greet "
         << "the world\n";
    co_await sleep_for(1s);
    cout << "Hello world!\n";
}

Task<void> greetTwice() {
    cout << "spawning tasks\n";
    // 更推荐直接 co_await greet(),但这不是重点
    auto task1 = greet();
    auto task2 = greet();
    cout << "awaiting tasks\n";
    co_await task1;
    co_await task2;
}

// spawning tasks
// awaiting tasks
// going to greet the world
//  <1 second pause>
// Hello world!
// going to greet the world
//  <another 1 second pause>
// Hello world!

但是字面理解上面的结构化并发规则后,你会发现这完全就是串行执行。

allof-combiner

这是因为目前缺少表达一组任务的概念,所以需要引入一个 allOf 组合器(combiner):将需要并发的部分通过组合来完成,同时依然满足结构化并发。

Task<void> greet() {
    cout << "going to greet "
         << "the world\n";
    co_await sleep_for(1s);
    cout << "Hello world!\n";
}

Task<void> greetTwice() {
    // 使用 combiner 完成并发任务
    co_await allOf(
        greet(),
        greet()
    );
}

// going to greet the world
// going to greet the world
//  <1 second pause>
// Hello world!
// Hello world!

可以简单理解组合器为组合后的 task,它也需要被等待才会执行;等待方恢复时,它已经完成任务。

anyof-combiner

同时也可以引入一个相当有用的 anyOf 组合器。该组合器最重要的特性是只要有一个任务完成,其他的任务将被取消

-anyof-use-cases

这种取消特性的用途非常广泛,比如:

  • 非侵入地提供任务的超时特性,不用修改每一个 API 去添加超时参数;
  • 暴露给用户的任务权限管理,不需提供完整的 task(就是单一职责的意思);
  • 以及优雅退出(异步信号处理)和 IO 竞争优化等等……

话说我写的 uring_exec 也支持这些啊,而且还是潮到出水的 std::execution+io_uring,怎么就没见人给我 ⭐ 呢……路过的客官觉得有意思的话,还请高抬贵手 star 一下 吧。

// 最后一个 use case: racing
// 可以描绘成以下 call tree 形式

         +------------+
         | resolve()  |
         +------------+
               |
         +------------+
         |  anyOf()   |
         +------------+
           /       \
  +---------------+  +----------+
  | resolveOn()   |  |  lambda  |
  +---------------+  +----------+
        |                |
  +-----+-----+      +----------+
  |   .....   |      |  sleep() |
  +-----------+      +----------+

这种基于组合的结构化并发是树形结构(call tree),任意一个 task 的父节点就是上述定义的 awaiting caller。在这里面,异常是向上传播,取消是向下传播。

Nursery

Task<void> serve(tcp::socket s) {
    std::array<char> buf(1024);
    try {
        for (;;) {
            size_t len = co_await s.async_read_some(
                asio::buffer(buf), 
                asio::use_awaitable
            );
            co_await async_write(
                s, 
                asio::buffer(buf, len),
                asio::use_awaitable
            );
        }
    } catch (std::exception&) {
        /*connection closed or I/O error*/ 
    }
}

Task<void> listen(tcp::acceptor& acc) {
    for (;;) {
        tcp::socket s = co_await acc.async_accept(
            io_context,
            asio::use_awaitable
        );
        // ???
    }
}

以一个 TCP echo server 为例,listen 的并发成为了设计上的问题:上面的 allOf 是一种静态 DAG 设计,即 call tree 没法提供运行时增删 task 的能力。

Task<void> serve(tcp::socket s) {
    std::array<char> buf(1024);
    try {
        for (;;) {
            size_t len = co_await s.async_read_some(
                asio::buffer(buf), 
                use_awaitable
            );
            co_await async_write(
                s, 
                asio::buffer(buf, len),
                asio::use_awaitable
            );
        }
    } catch (std::exception&) { 
        /*connection closed or I/O error*/ 
    }
}

Task<void> listen(tcp::acceptor& acc) {
    tcp::socket s = co_await acc.async_accept(
        io_context,
        asio::use_awaitable
    );
    // 递归 listen
    co_await anyOf(
        serve(std::move(s)), 
        listen(acc)
    );
}

一种妥协的做法是递归地 anyOf。看着很 cool,但是这并非良好实践。你不能强求所有的程序设计都要改成递归形式;而且对于有三种以上的协程参与递归时,基本不能实现;最后个人多说一句,C++26 的 sender 可做不到递归。

dynamic-allof

另一种做法是运行时的动态的 anyOf(dynamic allOf),使得 call tree 可以动态地添加或删除 task。但是我们仍要满足结构化并发的语义,因此是 co await 形式,这意味着所有的 children task 的生命周期都不会超出这个 allOf scope 的范围。

Task<void> serve(tcp::socket);

Task<void> listen(tcp::acceptor& acc) {
    // WITH_NURSERY 和上面的 co_await dynamic_allOf([](DynamicAllOf&) -> Task {}) 一个意思
    // 作者说 dynamic_allOf 的重要程度足以成为一种新的原语 nursery,
    // 还可能有 .size()/.cancel() 等成员函数用于跟踪/处理任务进展
    WITH_NURSERY(nursery) {
        for (;;) {
            tcp::socket s = co_await acc.async_accept(
                io_context, 
                use_awaitable
            );
            // 类似于 co_spawn,可以立刻启动 serve(s) 任务,
            // 但是该任务不会超出 nursery 的结构范围,
            // 因此也不会超出 spawner(在这里为 listen)的结构范围
            // No task is ever left behind
            nursery.start(serve(std::move(s)));
        }
    };
}

这种更为复杂的动态组合器被称为 nursery,托儿所就是用于管理 children 的意思。并且作者在这里使用宏简化了使用方式,更加有 python 味(毕竟是从这里借来的)。

nursery 最重要的性质是没有公开的构造函数,它只能在异步上下文(协程)中创建。nursery 具有父节点,并且依然可以满足 call tree 的结构。

(作者这里的设计确实是比 Asio 的 co_spawn 和 io_context 更加细致的,本来一个 io_context 上下文只有全局的结构,现在可以平行多个 nursery 互不干涉。)

Task<void> serve(tcp::socket);
Task<void> listen(asio::io_context& io_context, tcp::acceptor& acc);

int main() {
    asio::io_context io_context(/*threads = */ 1);
    tcp::acceptor acceptor(io_context,
                           tcp::endpoint(tcp::v4(), /*port = */ 12345));

    asio::signal_set signals(io_context, SIGINT, SIGTERM);

    // Kick off!
    run(io_context, anyOf(listen(io_context, acceptor),
                          signals.async_await(use_awaitable)));

    return 0;
}

但是只允许在异步上下文中创建是有问题的,你的异步链路该如何 kick off?(第一个协程要怎么启动?)作者在这里定义了 run,可以在普通函数中启动协程,并返回其 co return 结果或者未处理的异常。组合器也是可以使用的。

这就是生活

但是万一呢?万一我真的需要一个类似 daemon 的东西,要比 nursery 调用者(spawner)还要长生命周期的任务呢?又万一我需要的 nursery 就非要在非异步上下文中使用呢?我们不能破坏上面的规则,不然全都白干了。

void beginListen(Nursery& n, io_context& io, uint16_t port) {
    n.start([&]() -> Task<void> {
        tcp::acceptor acc(io, tcp::endpoint(tcp::v4(), port));
        for (;;) {
            auto s = co_await acc.async_accept(io, use_awaitable);
            n.start(serve(std::move(s)));
        }
    });
}

C++ 的老办法就是传递 nursery 引用。任务仍在 nusery 的范围内,同时在规则内也允许 spawner 退出后任务继续存活下去;引用不需要构造,所以在同步上下文(普通函数)使用没有问题。

bending-the-rules-to-a-degree

注意这种设计会使得 spawner、nursery 和 task 的关系变得不再是层层嵌套的结构化关系。但是有个能动的解决方案总比没有好,这种做法可以直接从函数签名中推导出潜在的风险。如果真的写出问题,你只需在上图红色的函数签名中查找问题即可,不需要在全局的代码堆中抓瞎,这是编译时确定的。

虽然作者的方案只能这样,但这就是生活。


// Active objects
class ProcessSupervisor {
    Nursery* nursery = nullptr;
public:
    // active object 的调用端
    void start(const std::string& cmdline) {
        nursery->start(runProcess(cmdline));
    }
private:
    // suspends until the process completes
    // active object 的执行端
    Task<void> runProcess(const std::string& cmdline);
};

补充一点。这种复杂需求的一个可能场景是类内使用 active object 模式。也就是只调用一个 start,然后让实际任务的执行(runProcess)都在 nursery 里面知道完成。而 nusery 并不能通过公开的构造函数(非异步的上下文)获取,因此一个显然的办法是在 ProcessSupervisor 的构造函数中作为引用/指针去获取,随后传递给 start

什么是 active object?用简要的文字说明。


Active Object 线
1. Proxy
2. Task Queue
3. Scheduler
4. 线Thread
5. Result Handle

线 Future

线GUI


Task<void> workWithSupervisor() {
    WITH_NURSERY(n) {
        // nursery 是引用/指针的形式传递给 ps 的构造函数
        ProcessSupervisor ps(n);
        // 万一,执行端 runProcess 会调用到 ps 的某些成员函数?
        ps.start("/bin/true");
        // ...stuff...
        // 执行到这里,ps 因为超出作用域而被析构
    };
 }

前面讨论过,task 的生命周期会超过 spanwer(这里是 *this/ps),因此这段代码潜在访问已析构对象的风险。(这一块个人觉得有点反直觉?意思应该是 nursery 提供的 lambda 作用域与 nursery 的生命周期有区别,后者只要求在上层恢复前完成。)

two-phase-initialization 左侧是第二阶段的初始化,右侧是运行端

所以一个更好的做法应该是让 spawner(*this)持有(open)nursery 的所有权。既然构造函数不行(不存在协程形式的异步构造函数),那就使用经典的二阶段初始化:让第二阶段的初始化提供一个异步上下文的环境以打开 nursery。该异步上下文将被设计为永远不恢复,直到运行端完成并使用 anyOf 语义取消 SuspendForever。这样就能间接地让 spawner 获得 nursery 的所有权的同时实现安全的 active object 模式。

任务的取消

整理一下我们的取消点需求:

  • 一是它必须内建且隐式,这方面的理由和异常一样;
  • 二是它要求被动执行且不能使用异常实现,除了语义问题以外,开销极大也是一个问题;
  • 三是它必须异步,这个取消点是随时会被触发,也是会在任意线程被触发。

作者介绍了一些取消点应具有的性质,这方面的话题我以前也总结过。再次强调不要显式地写 if (cond);不要用慢速异常来当作取消;异步取消是必须支持的。

同步取消指的是一种可以立刻取消并且立刻得到确认的操作,部分的任务也确实可以做到同步取消。但是任务通常是跟外部设施一起协作的(比如,你的应用层 xyz 队列调度,你的 uvw 操作系统),事实上你并没有能力去做到同步取消。因此,异步取消是必须支持的。

asynchronous-cancellation-example

一个例子是进程管理。当你尝试取消(kill)进程,你也必须要 wait() 才能确认内核完成,否则任何状态都是不可确定的;上图就是一个进程占用 mount 的例子,你必须要有确定的状态(不再占用)才能继续 unmount。否则后续操作都是一连串错误。作者还提了另外一个例子 io_uring/IOCP,总之不支持异步取消的话就会喜提 use-after-free 错误。

那么怎么实现?众所周知 C++20 协程已有 await_suspend/resume/ready 三板斧,作者使用了非常 hack 的手段加了一个 await_cancel 接口(假定了协程帧的内存布局)来支持取消点特性,目前可以在主流编译器上工作。这块感兴趣就看演讲吧,取消点支持也不是非得这么做。

raii-is-more-important catch 补救不再有效

取消点支持后,RAII 对于资源管理变得更加重要。以前只有异常会破坏非 RAII 的控制流,现在取消点也有这种行为,并且以往的补救方案无效。总之时刻记得实现 RAII 的资源管理,或者使用类似 gsl::finally 等 scope guard 工具类间接完成,因为协程的销毁保证了局部变量的析构。

asynchronous-resource-cleanup

但是异步的资源回收会更加复杂,因为目前没有异步 RAII 这种概念。上面例子的 close 可以是一个异步的任务,那我们也没办法放置 co_await f.close() 这样的代码(析构函数和 scope guard 都不能处理协程),此时需要使用 anyOf 组合器的取消特性来完成异步化,这也是异步环境中一种等效于 RAII 的做法。

桥接旧代码

唉算了吧,作者提到的做法都很勉强(unsafe)。个人认为,协议层可以讨论兼容,但是旧有代码的错误设计就不要强行兼容了。跳过!

THE END

最后是总结时间。作者提到这一套异步框架使得历史代码体积减少了将近一半,这是最为直接的收益;但是框架(比较超前的理念)仍然是存在学习曲线,这是需要时间去适应的;最后是性能问题,虽然这种框架设计能让你的 IO 密集型任务更加简化,尤其是复杂流程,但是作者认为 C++20 协程在主流编译器优化中仍处于很初级的阶段,如果你的任务是 10 ns 以内的粒度,那还是先考虑点别的方案吧。