本文是 C++ Coroutines and Structured Concurrency in Practice 的个人观片笔记。主要内容是作者(Dmitry Prokoptsev)分享他考虑结构化并发而实现的 C++20 协程网络库 Corral。
如今协程库基本上是满街走了,但是这场演讲对于结构化并发的讨论是相当的不错。
一点说明:
实在的好处
协程的好处其实没有必要强调,用过都说好。
作者给了一个与传统回调对比的例子,可以看出协程更好写、更好读,也更易维护。协程对于生命周期、资源管理和错误传播三个方面都能得到收益:不再是跟语言对抗,而是享受语言带来的特性。
潜在的问题
市面上典型的异步框架
一个典型的异步框架通常由三部分组成:
- task:可能是一个线程或者协程等等,通常是一个与异步任务关联的具有生命周期的实体。
- join:一个显式的暂停点和用于错误处理的传播路径。
- detach:允许任务分离执行。
(个人习惯把 task 称为异步操作,不过本文均采用作者的描述方式:任务或者 task。)
作者认为,如果协程库采用这种设计,其潜在的问题是:分离的任务是有害的。
// 不要这么干
void bad(tcp::socket& s) {
std::array<char> buf(1024);
asio::co_spawn(
ex,
[&s, &buf]() -> asio::awaitable<void> {
co_await s.read_some(
s,
asio::buffer(buf),
asio::use_awaitable
);
},
asio::detached
);
}
(作者给的很多代码都是毛手毛脚的,不要在意缺失的细节……后面不再提醒了)
以 Asio 为例,使用 asio::co_spawn(asio::detached)
生成分离的异步任务后,你再也无法得知传递进去的资源(比如 buf
)是否还在被 detached task 所占用。当然这段代码有更加离谱的问题,就是任务完成前 buf
就已经结束生命周期了。
// don’t do this either
void slightly_better(tcp::socket& s) {
// 常见的 shared state 设计
auto buf = make_shared<char[]>(1024);
asio::co_spawn(
ex,
[&s, buf]() -> asio::awaitable {
co_await s.read_some(
s,
asio::buffer(buf.get(), 1024),
asio::use_awaitable
);
},
asio::detached
);
}
很容易想到使用智能指针去封装需要的上下文,尽管这意味着有大量的共享状态的实体在你的框架里乱飞。但更重要的是仍有错误处理的问题,如果在 detached task 当中抛出异常,这种实现方式是没有办法传播异常的。
这使得设计上要么完全忽略错误,要么直接停掉整个程序,非常尴尬。
// and also don’t do this
void slightly_better(tcp::socket& s) {
auto buf = make_shared<char[]>(1024);
asio::co_spawn(
ex,
[&s, buf]() -> asio::awaitable {
try {
co_await s.read_some(
s,
asio::buffer(buf.get(), 1024),
asio::use_awaitable
);
} catch (std::exception& e) {
// ...um?..
}
},
asio::detached
);
}
很容易想到在当前上下文 try-catch 来解决无法传播的错误处理。不过尴尬在于你想怎么处理,打个日志当无事发生?正因为在当前上下文无法处理(没有足够的信息,并且你也不是最终用户),所以需要向上传播,这是问题搞反了。
因此,使用 detach 设计的协程在资源管理和错误处理方面,依然存在和传统回调无异的糟糕体验。
The completion token that will handle the notification that the thread of execution has completed. The function signature of the completion handler must be:
void handler(std::exception_ptr, T);
NOTE: 作者这里其实有点强词夺理,使用分离式设计强制 fire and forget 能怪谁。见 co_spawn 的函数签名,搭配 handler 即可传播异常了。不过作为问题的出发点是挺好的,Asio 有这能力不代表别的框架有同等水平。
解决方案?
怎么解决这些问题,一个简单做法是完全移除分离式设计,即要求每一个任务都必须在某个时刻 join。也许「某个时刻」的最佳选择是析构,也就是在 task 的析构函数中放置 join。这是因为析构函数是唯一的能保证最终能调用的函数。
但是错误处理的传播仍有问题,因为析构函数通常假定不抛出异常,除非你有自己明确定义的规则(确保另一个异常正在处理时能够丢弃异常?但这是不好的实践)。
更加复杂的问题是,你的程序可能永远不会触及析构。上图的 liveness 是一个保活后台任务,假设抛了异常,本来是应然在 join 时传播的,但是 join 所在的析构事实上不可触及。
最终结果就是保活服务没了,并且你完全不知情。尴尬在于这种 IO 模式还很常见。
这个问题的根因在于 liveness task 是运行在后台的,我们要进一步摒弃「后台执行」这个概念。
结构化并发
作者在框架中规定了结构化并发的规则:task 只有在被等待时才允许运行,且正在等待的上层恢复后,task 必然处于已完成的状态。
NOTE: 关于结构化并发的更多信息可参考上图的链接,主要是讨论 Go 语言的 go 语句和 goto 控制流的相似性,认为后台执行是一种不受控的并发控制流,也就是非结构化的并发;进一步提出 nursery 语句来实现结构化并发。
这种有结构、受控的并发控制流保证了:上层恢复后,task 的生命周期(已完成)、资源管理(可释放)和异常处理(会传播)都是确定的。同时,结构化并发也意味着没有「后台执行」的概念。
Combiner
Task<void> greet() {
cout << "going to greet "
<< "the world\n";
co_await sleep_for(1s);
cout << "Hello world!\n";
}
Task<void> greetTwice() {
cout << "spawning tasks\n";
// 更推荐直接 co_await greet(),但这不是重点
auto task1 = greet();
auto task2 = greet();
cout << "awaiting tasks\n";
co_await task1;
co_await task2;
}
// spawning tasks
// awaiting tasks
// going to greet the world
// <1 second pause>
// Hello world!
// going to greet the world
// <another 1 second pause>
// Hello world!
但是字面理解上面的结构化并发规则后,你会发现这完全就是串行执行。
这是因为目前缺少表达一组任务的概念,所以需要引入一个 allOf 组合器(combiner):将需要并发的部分通过组合来完成,同时依然满足结构化并发。
Task<void> greet() {
cout << "going to greet "
<< "the world\n";
co_await sleep_for(1s);
cout << "Hello world!\n";
}
Task<void> greetTwice() {
// 使用 combiner 完成并发任务
co_await allOf(
greet(),
greet()
);
}
// going to greet the world
// going to greet the world
// <1 second pause>
// Hello world!
// Hello world!
可以简单理解组合器为组合后的 task,它也需要被等待才会执行;等待方恢复时,它已经完成任务。
同时也可以引入一个相当有用的 anyOf 组合器。该组合器最重要的特性是只要有一个任务完成,其他的任务将被取消。
这种取消特性的用途非常广泛,比如:
- 非侵入地提供任务的超时特性,不用修改每一个 API 去添加超时参数;
- 暴露给用户的任务权限管理,不需提供完整的 task(就是单一职责的意思);
- 以及优雅退出(异步信号处理)和 IO 竞争优化等等……
话说我写的 uring_exec 也支持这些啊,而且还是潮到出水的 std::execution+io_uring,怎么就没见人给我 ⭐ 呢……路过的客官觉得有意思的话,还请高抬贵手 star 一下 吧。
// 最后一个 use case: racing
// 可以描绘成以下 call tree 形式
+------------+
| resolve() |
+------------+
|
+------------+
| anyOf() |
+------------+
/ \
+---------------+ +----------+
| resolveOn() | | lambda |
+---------------+ +----------+
| |
+-----+-----+ +----------+
| ..... | | sleep() |
+-----------+ +----------+
这种基于组合的结构化并发是树形结构(call tree),任意一个 task 的父节点就是上述定义的 awaiting caller。在这里面,异常是向上传播,取消是向下传播。
Nursery
Task<void> serve(tcp::socket s) {
std::array<char> buf(1024);
try {
for (;;) {
size_t len = co_await s.async_read_some(
asio::buffer(buf),
asio::use_awaitable
);
co_await async_write(
s,
asio::buffer(buf, len),
asio::use_awaitable
);
}
} catch (std::exception&) {
/*connection closed or I/O error*/
}
}
Task<void> listen(tcp::acceptor& acc) {
for (;;) {
tcp::socket s = co_await acc.async_accept(
io_context,
asio::use_awaitable
);
// ???
}
}
以一个 TCP echo server 为例,listen 的并发成为了设计上的问题:上面的 allOf 是一种静态 DAG 设计,即 call tree 没法提供运行时增删 task 的能力。
Task<void> serve(tcp::socket s) {
std::array<char> buf(1024);
try {
for (;;) {
size_t len = co_await s.async_read_some(
asio::buffer(buf),
use_awaitable
);
co_await async_write(
s,
asio::buffer(buf, len),
asio::use_awaitable
);
}
} catch (std::exception&) {
/*connection closed or I/O error*/
}
}
Task<void> listen(tcp::acceptor& acc) {
tcp::socket s = co_await acc.async_accept(
io_context,
asio::use_awaitable
);
// 递归 listen
co_await anyOf(
serve(std::move(s)),
listen(acc)
);
}
一种妥协的做法是递归地 anyOf。看着很 cool,但是这并非良好实践。你不能强求所有的程序设计都要改成递归形式;而且对于有三种以上的协程参与递归时,基本不能实现;最后个人多说一句,C++26 的 sender 可做不到递归。
另一种做法是运行时的动态的 anyOf(dynamic allOf),使得 call tree 可以动态地添加或删除 task。但是我们仍要满足结构化并发的语义,因此是 co await 形式,这意味着所有的 children task 的生命周期都不会超出这个 allOf scope 的范围。
Task<void> serve(tcp::socket);
Task<void> listen(tcp::acceptor& acc) {
// WITH_NURSERY 和上面的 co_await dynamic_allOf([](DynamicAllOf&) -> Task {}) 一个意思
// 作者说 dynamic_allOf 的重要程度足以成为一种新的原语 nursery,
// 还可能有 .size()/.cancel() 等成员函数用于跟踪/处理任务进展
WITH_NURSERY(nursery) {
for (;;) {
tcp::socket s = co_await acc.async_accept(
io_context,
use_awaitable
);
// 类似于 co_spawn,可以立刻启动 serve(s) 任务,
// 但是该任务不会超出 nursery 的结构范围,
// 因此也不会超出 spawner(在这里为 listen)的结构范围
// No task is ever left behind
nursery.start(serve(std::move(s)));
}
};
}
这种更为复杂的动态组合器被称为 nursery,托儿所就是用于管理 children 的意思。并且作者在这里使用宏简化了使用方式,更加有 python 味(毕竟是从这里借来的)。
nursery 最重要的性质是没有公开的构造函数,它只能在异步上下文(协程)中创建。nursery 具有父节点,并且依然可以满足 call tree 的结构。
(作者这里的设计确实是比 Asio 的 co_spawn 和 io_context 更加细致的,本来一个 io_context 上下文只有全局的结构,现在可以平行多个 nursery 互不干涉。)
Task<void> serve(tcp::socket);
Task<void> listen(asio::io_context& io_context, tcp::acceptor& acc);
int main() {
asio::io_context io_context(/*threads = */ 1);
tcp::acceptor acceptor(io_context,
tcp::endpoint(tcp::v4(), /*port = */ 12345));
asio::signal_set signals(io_context, SIGINT, SIGTERM);
// Kick off!
run(io_context, anyOf(listen(io_context, acceptor),
signals.async_await(use_awaitable)));
return 0;
}
但是只允许在异步上下文中创建是有问题的,你的异步链路该如何 kick off?(第一个协程要怎么启动?)作者在这里定义了 run
,可以在普通函数中启动协程,并返回其 co return 结果或者未处理的异常。组合器也是可以使用的。
这就是生活
但是万一呢?万一我真的需要一个类似 daemon 的东西,要比 nursery 调用者(spawner)还要长生命周期的任务呢?又万一我需要的 nursery 就非要在非异步上下文中使用呢?我们不能破坏上面的规则,不然全都白干了。
void beginListen(Nursery& n, io_context& io, uint16_t port) {
n.start([&]() -> Task<void> {
tcp::acceptor acc(io, tcp::endpoint(tcp::v4(), port));
for (;;) {
auto s = co_await acc.async_accept(io, use_awaitable);
n.start(serve(std::move(s)));
}
});
}
C++ 的老办法就是传递 nursery 引用。任务仍在 nusery 的范围内,同时在规则内也允许 spawner 退出后任务继续存活下去;引用不需要构造,所以在同步上下文(普通函数)使用没有问题。
注意这种设计会使得 spawner、nursery 和 task 的关系变得不再是层层嵌套的结构化关系。但是有个能动的解决方案总比没有好,这种做法可以直接从函数签名中推导出潜在的风险。如果真的写出问题,你只需在上图红色的函数签名中查找问题即可,不需要在全局的代码堆中抓瞎,这是编译时确定的。
虽然作者的方案只能这样,但这就是生活。
// Active objects
class ProcessSupervisor {
Nursery* nursery = nullptr;
public:
// active object 的调用端
void start(const std::string& cmdline) {
nursery->start(runProcess(cmdline));
}
private:
// suspends until the process completes
// active object 的执行端
Task<void> runProcess(const std::string& cmdline);
};
补充一点。这种复杂需求的一个可能场景是类内使用 active object 模式。也就是只调用一个 start
,然后让实际任务的执行(runProcess
)都在 nursery 里面知道完成。而 nusery 并不能通过公开的构造函数(非异步的上下文)获取,因此一个显然的办法是在 ProcessSupervisor 的构造函数中作为引用/指针去获取,随后传递给 start
。
1. 代理接口(Proxy):对外暴露异步方法,接收调用请求。
2. 任务队列(Task Queue):存储待处理的请求(封装为独立任务)。
3. 调度器(Scheduler):决定任务执行顺序(如按队列顺序)。
4. 执行线程(Thread):独立运行,从队列中取出任务并执行。
5. 结果容器(Result Handle):可选,允许调用方异步获取方法执行结果。
工作机制:调用者通过代理发起请求,任务被封装并存入队列,由独立线程异步执行。调用者无需阻塞等待结果(若需要结果,可通过 Future 等机制延迟获取)。
优势:避免直接锁竞争,提升系统吞吐量;通过隔离线程与任务调度,简化并发逻辑的复杂性。常用于需要高响应性或资源隔离的场景(如事件驱动系统、GUI 框架)。
(咋样,我的前端水平还不错吧。是不是有模有样。)
Task<void> workWithSupervisor() {
WITH_NURSERY(n) {
// nursery 是引用/指针的形式传递给 ps 的构造函数
ProcessSupervisor ps(n);
// 万一,执行端 runProcess 会调用到 ps 的某些成员函数?
ps.start("/bin/true");
// ...stuff...
// 执行到这里,ps 因为超出作用域而被析构
};
}
前面讨论过,task 的生命周期会超过 spanwer(这里是 *this/ps),因此这段代码潜在访问已析构对象的风险。(这一块个人觉得有点反直觉?意思应该是 nursery 提供的 lambda 作用域与 nursery 的生命周期有区别,后者只要求在上层恢复前完成。)
左侧是第二阶段的初始化,右侧是运行端
所以一个更好的做法应该是让 spawner(*this)持有(open)nursery 的所有权。既然构造函数不行(不存在协程形式的异步构造函数),那就使用经典的二阶段初始化:让第二阶段的初始化提供一个异步上下文的环境以打开 nursery。该异步上下文将被设计为永远不恢复,直到运行端完成并使用 anyOf 语义取消 SuspendForever。这样就能间接地让 spawner 获得 nursery 的所有权的同时实现安全的 active object 模式。
任务的取消
整理一下我们的取消点需求:
- 一是它必须内建且隐式,这方面的理由和异常一样;
- 二是它要求被动执行且不能使用异常实现,除了语义问题以外,开销极大也是一个问题;
- 三是它必须异步,这个取消点是随时会被触发,也是会在任意线程被触发。
作者介绍了一些取消点应具有的性质,这方面的话题我以前也总结过。再次强调不要显式地写 if (cond)
;不要用慢速异常来当作取消;异步取消是必须支持的。
同步取消指的是一种可以立刻取消并且立刻得到确认的操作,部分的任务也确实可以做到同步取消。但是任务通常是跟外部设施一起协作的(比如,你的应用层 xyz 队列调度,你的 uvw 操作系统),事实上你并没有能力去做到同步取消。因此,异步取消是必须支持的。
一个例子是进程管理。当你尝试取消(kill)进程,你也必须要 wait() 才能确认内核完成,否则任何状态都是不可确定的;上图就是一个进程占用 mount 的例子,你必须要有确定的状态(不再占用)才能继续 unmount。否则后续操作都是一连串错误。作者还提了另外一个例子 io_uring/IOCP,总之不支持异步取消的话就会喜提 use-after-free 错误。
那么怎么实现?众所周知 C++20 协程已有 await_suspend/resume/ready 三板斧,作者使用了非常 hack 的手段加了一个 await_cancel 接口(假定了协程帧的内存布局)来支持取消点特性,目前可以在主流编译器上工作。这块感兴趣就看演讲吧,取消点支持也不是非得这么做。
catch 补救不再有效
取消点支持后,RAII 对于资源管理变得更加重要。以前只有异常会破坏非 RAII 的控制流,现在取消点也有这种行为,并且以往的补救方案无效。总之时刻记得实现 RAII 的资源管理,或者使用类似 gsl::finally 等 scope guard 工具类间接完成,因为协程的销毁保证了局部变量的析构。
但是异步的资源回收会更加复杂,因为目前没有异步 RAII 这种概念。上面例子的 close 可以是一个异步的任务,那我们也没办法放置 co_await f.close()
这样的代码(析构函数和 scope guard 都不能处理协程),此时需要使用 anyOf 组合器的取消特性来完成异步化,这也是异步环境中一种等效于 RAII 的做法。
桥接旧代码
唉算了吧,作者提到的做法都很勉强(unsafe)。个人认为,协议层可以讨论兼容,但是旧有代码的错误设计就不要强行兼容了。跳过!
THE END
最后是总结时间。作者提到这一套异步框架使得历史代码体积减少了将近一半,这是最为直接的收益;但是框架(比较超前的理念)仍然是存在学习曲线,这是需要时间去适应的;最后是性能问题,虽然这种框架设计能让你的 IO 密集型任务更加简化,尤其是复杂流程,但是作者认为 C++20 协程在主流编译器优化中仍处于很初级的阶段,如果你的任务是 10 ns 以内的粒度,那还是先考虑点别的方案吧。