前言
上文已经解析了 Asio 早期的无栈协程实现,本文继续探讨 Asio 对于 C++20 协程的适配。
另外,我在知乎翻译了一篇文章,是关于 Asio 的接口演进历史以及现代网络编程,推荐阅读本文 Asio 协程章节前先热身:为什么 C++20 是最 awesome 的网络编程语言。
C++20 协程
翻源码也是一种了解 C++20 协程的实践途径,在开始前先啰嗦几句 C++20 协程标准的设计。
不想听老 ass 念经?点击跳过!
基本介绍
C++20 协程并不是完全的编译器实现,也不是完全的库实现。从类设计上分类,可以分为三大块:
- 协程描述符(coroutine handle),用于外部接口的交互。
- 承诺类型(promise type),用于内部的库实现设计。
- 协程帧(coroutine frame / coroutine state),由编译器处理的上下文信息。
当然除了类以外,还有函数本身。在 C++ 标准中,协程是函数:A coroutine is a function that can suspend execution to be resumed later.
不知用了多少遍的好图
先说协程描述符 std::coroutine_handle<>
,它作为一个描述符的作用就是调用接口。比如 resume()
和 destroy()
,一个已暂停的协程接下来能做的事情只有恢复或销毁;还有观测用接口 done()
和访问关联的承诺类型 promise()
等等。总之是个容易理解的接口类型。
承诺类型 promise_type
可以定制协程的具体行为以及处理值的传递。它可以通过 initial_suspend()
和 final_suspend()
定制是否要在特殊的暂停点(suspension point)执行暂停操作;也可以通过 get_return_object()
得到首次暂停时得到的对象;以及 yield_value(v)
可接收 co_yield v
所产出(yield,表示暂停并返回一个值)的值 v
。
至于协程帧则是由编译器控制生成的部分。我们在聊协程时之所以完全不关心所谓上下文保存,那是因为这部分交给编译器管理了,一般来说会隐式的调用动态内存分配 operator new
来存储上下文信息,但是也会尽可能优化而无需实际分配。
执行流程
实现一个协程 hello world 程序还需要了解承诺类型的 get_return_object()
,协程描述符的静态函数 from_promise()
,以及协程函数的的返回值之间的关联。先看用例吧:
// 一个懒惰的生成器,生成 [0, n) 序列,每调用一次生成一个值
static My_generator<int> coroutine_function(size_t n) {
// 尽管这里看着像是无限生成序列……
for(auto i : std::views::iota(0)) {
co_yield i;
}
}
int main() {
constexpr size_t n = 7;
auto generate = coroutine_function(n);
// 这里使用定制的 operator bool 和 operator ()
while(generate) {
std::cout << generate() << ' ';
}
return 0;
}
// 输出:
0 1 2 3 4 5 6
假设实现一个 My_generator
,这里的协程该怎么工作?可以简单认为:
- 编译器使用
operator new
分配协程帧对象。 - 如果有函数参数,则拷贝/移动到协程帧中,引用不需处理。
- 调用承诺对象
promise
的构造函数,其参数使用协程中的传参(在这里为n
)。 - 调用
promise.get_return_object()
,后续作为返回对象给上层(在这里就是generate
对象)。 - 协程是否暂停取决于
promise.initial_suspend()
,一般如果返回类型是std::suspend_always
,立刻回到上层调用者。 - 后续调用再执行函数体。
- 每一次调用
generate()
(内部实际为coroutine_handle.resume()
封装),已暂停的coroutine_function()
会恢复现场,后续在co_yield
处暂停并返回i
。 - 调用
promise.yield_value(...)
,传参即为上一步返回(产出)的值。 - 由用户在
yield_value()
函数体内实现promise
内部存放临时存储,回到上层generate()
,由用户获取存放在promise
的值。 generate()
返回值给上层(即main()
)。
NOTES:
- 协程的返回类型
your_return_type
务必有typename your_return_type::promise_type
,这也是第 3 步时编译器得知对应promise_type
的原因(或者使用std::coroutine_traits
)。 - 第 4 步如果需要从中获取协程描述符,那就在承诺对象的成员函数内使用
coruntine_handle<decltype(promise)>::from_promise(*this)
。 - 当遇到暂停点时,先前获得的对象将返回,因此第 4 步
get_return_object()
得到的对象返回。 - 第 5 步其实是
co_await promise.initial_suspend()
,具体后面再说。
一个完整的示例如下:
#include <coroutine>
#include <iostream>
#include <ranges>
template <typename T>
class My_generator {
public:
struct promise_type;
// ctor / dtor
explicit constexpr
My_generator(std::coroutine_handle<promise_type> h) noexcept
: _handle(h) {}
~My_generator() { _handle ? _handle.destroy() : void(0); }
// copy / move
My_generator(const My_generator &) = delete;
My_generator(My_generator &&rhs) = delete;
My_generator& operator=(const My_generator &rhs) = delete;
// user APIs
T operator ()() {
if(!resumable()) {
throw std::runtime_error("AIEEEEE! A NINJA!?");
}
if(!operator bool()) {
throw std::logic_error("WHY THERE'S A NINJA HERE!?");
}
_handle.resume();
_handle.promise().count--;
return std::move(_handle.promise().result);
}
explicit operator bool() const noexcept {
return _handle.promise().count > 0;
}
bool resumable() const noexcept {
return _handle && !_handle.done();
}
private:
std::coroutine_handle<promise_type> _handle;
};
template <typename T>
struct My_generator<T>::promise_type {
explicit constexpr
promise_type(size_t count) noexcept(noexcept(T())): count(count) {}
auto initial_suspend() noexcept { return std::suspend_always{}; }
auto final_suspend() noexcept { return std::suspend_never{}; }
void unhandled_exception() {}
My_generator<T> get_return_object();
auto yield_value(auto &&value);
size_t count;
T result;
};
template <typename T>
My_generator<T> My_generator<T>::promise_type::get_return_object() {
auto handle = std::coroutine_handle<promise_type>::from_promise(*this);
return My_generator{handle};
}
template <typename T>
auto My_generator<T>::promise_type::yield_value(auto &&value) {
result = std::forward<decltype(value)>(value);
return std::suspend_always{};
}
// C++ coroutines are special functions
static My_generator<int> coroutine_function(size_t n) {
for(auto i : std::views::iota(0)) {
co_yield i;
}
}
int main() {
constexpr size_t n = 7;
auto generate = coroutine_function(n);
while(generate) {
std::cout << generate() << ' ';
}
return 0;
}
Awaiter
实际上,co_yield expr
等价于co_await promise.yield_value(expr)
。co_await
是一个暂停点,也就是说等待并返回控制权给上层的操作由它来完成。
我们定义两种不同的概念用于 co_await
的定制:
- 可等待体(awaitable):允许被
co_await
接收的表达式。 - 等待器(awaiter):实现了
await_ready()
/await_suspend()
/await_resume()
的对象。
当可等待体 awaitable
被调用 co_await
时,编译器可以对可等待体执行转换:
- 如果此时暂停点为
initial_suspend()
或者final_suspend()
,则没有变动。 - 如果此时暂停点实际使用
co_yield
,则没有变动。 - 如果承诺对象
promise
存在函数await_transform(awaitable)
,那可等待体可以转换为await_transform(awaitable)
。
当确认可等待体后,还需要继续通过重载以确认等待器:
- 通过成员函数
awaitable.operator co_await()
确认。 - 通过非成员函数
operator co_await(awaitable)
确认。 - 如果没有重载,那么可等待体即是等待器。
这些复杂的概念哪里会用到?我们一直在用。前面例子中的 co_yield promise.yield_value(expr)
实际展开后,(除了 yield_value()
的函数执行以外)就是 co_await std::suspend_always{}
。
// libstdc++ 实现
struct suspend_always
{
constexpr bool await_ready() const noexcept { return false; }
constexpr void await_suspend(coroutine_handle<>) const noexcept {}
constexpr void await_resume() const noexcept {}
};
struct suspend_never
{
constexpr bool await_ready() const noexcept { return true; }
constexpr void await_suspend(coroutine_handle<>) const noexcept {}
constexpr void await_resume() const noexcept {}
};
由于没有上述的转换和重载,可以得知 std::suspend_always
既是可等待体,也是等待器。
等待器的基本实现思路如下:
- 假设协程于
<suspend-point>
表示实际的暂停点,于<resume-point>
表示实际的恢复点。 bool await_ready()
表示是否到达<suspend-point>
。- 返回
false
进入<suspend-point>
。 - 返回
true
进入<resume-point>
。
- 返回
auto await_suspend(coroutine_handle)
表示已到达<suspend-point>
后的执行函数:- 如果返回
void
或true
,表示保持暂停而不进入<resume-point>
。 - 如果返回
false
,将进入当前协程的<resume-point>
。 - 如果返回某个协程描述符,将进入对应协程的
<resume-point>
。
- 如果返回
auto await_resume()
表示已到达<resume-point>
后的执行函数:- 其返回值就是
co_await
最终得到的返回结果。
- 其返回值就是
不难看出,等待器的三个接口是基于事件处理来设计的:
await_ready()
表示事件是否需要已完成,完成则直接收集,否则需要处理再收集。await_suspend()
表示处理事件的方式,可采用同步或者异步。await_resume()
表示收集事件的结果。
NOTES:
-
<suspend-point>
和<resume-point>
是由编译器生成的。 - 不进入
<resume-point>
意味着控制权返回给上层。 await_suspend(coroutine_handle)
如果返回任意协程描述符h
,等同于调用h.resume()
。await_suspend(coroutine_handle)
可以返回自身关联的coroutine_handle
。- 到达
<suspend-point>
后可以.resume()
,因此await_suspend()
返回前甚至可以.resume()
自身。 - 关于上一条 NOTE 需要注意协程帧与
*this
的生命周期差异,这份代码会教你做事。
Lewis Baker 的文章给了一份很好的伪代码作为解释:
// So, assuming we have encapsulated the logic for turning the <expr> result
// into an Awaiter object into the above functions then the semantics of
// co_await <expr> can be translated (roughly) as follows:
{
auto&& value = <expr>;
auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));
if (!awaiter.await_ready())
{
using handle_t = std::experimental::coroutine_handle<P>;
using await_suspend_result_t =
decltype(awaiter.await_suspend(handle_t::from_promise(p)));
<suspend-coroutine>
if constexpr (std::is_void_v<await_suspend_result_t>)
{
awaiter.await_suspend(handle_t::from_promise(p));
<return-to-caller-or-resumer>
}
else
{
// 这里有些过时了,handle 也是可以的,总之问题不大
static_assert(
std::is_same_v<await_suspend_result_t, bool>,
"await_suspend() must return 'void' or 'bool'.");
if (awaiter.await_suspend(handle_t::from_promise(p)))
{
<return-to-caller-or-resumer>
}
}
<resume-point>
}
return awaiter.await_resume();
}
你也可以动手改造文中给的示例,来理解等待器的行为:
template <typename T>
auto My_generator<T>::promise_type::yield_value(auto &&value) {
result = std::forward<decltype(value)>(value);
- return std::suspend_always{};
+ //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ struct Awaiter {
+ bool await_ready() {
+ std::cout << "ready?" << std::endl;
+ return false;
+ }
+ auto await_suspend(std::coroutine_handle<> handle) {
+ std::cout << "suspend!" << std::endl;
+ std::cout << handle.address() << std::endl;
+ return void();
+ // return false;
+ // return true;
+ // return /*next_handle*/;
+ }
+ void await_resume() {
+ std::cout << "resume!" << std::endl;
+ }
+ };
+ return Awaiter{};
+ //+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
}
返回语句
Q: 我已经明白 co_yield
和 co_await
了,那还有 co_return
呢?
A: 不想写了,自己品鉴吧。
When a coroutine reaches the
co_return
statement, it performs the following:
- calls
promise.return_void()
for
co_return;
co_return expr;
whereexpr
has typevoid
- or calls
promise.return_value(expr)
forco_return expr;
whereexpr
has non-void type- destroys all variables with automatic storage duration in reverse order they were created.
- calls
promise.final_suspend()
and co_awaits the result.When the coroutine state is destroyed either because it terminated via co_return or uncaught exception, or because it was destroyed via its handle, it does the following:
- calls the destructor of the promise object.
- calls the destructors of the function parameter copies.
- calls operator delete to free the memory used by the coroutine state.
- transfers execution back to the caller/resumer.
Asio 协程
众所周知,Asio 出于扩展性和兼容性的需求,代码是比较抽象的。本章按照一个实际调用的过程来探讨对于异步函数的 C++20 协程适配,非协程相关的内容会被忽略,这样可以减少阅读上的复杂度。
实际上是按照深度优先的顺序去阅读的,而不是先给设计然后解释的顺序。又因为异步的关系,往往需要边看边往上翻。
异步函数
以 async_read_some()
为例,Asio 可按如下方式使用协程:
asio::awaitable<void> do_stuff(asio::ip::tcp::socket socket) {
char buf[1024];
for(;;) {
size_t n = co_await socket.async_read_some(asio::buffer(buf), asio::use_awaitable);
// ...
}
}
async operation
现在跟踪 async_read_some()
的函数签名:
template <typename MutableBufferSequence,
ASIO_COMPLETION_TOKEN_FOR(void (asio::error_code,
std::size_t)) ReadHandler
ASIO_DEFAULT_COMPLETION_TOKEN_TYPE(executor_type)>
ASIO_INITFN_AUTO_RESULT_TYPE(ReadHandler,
void (asio::error_code, std::size_t))
async_read_some(const MutableBufferSequence& buffers,
ASIO_MOVE_ARG(ReadHandler) handler
ASIO_DEFAULT_COMPLETION_TOKEN(executor_type))
{
return async_initiate<ReadHandler,
void (asio::error_code, std::size_t)>(
initiate_async_receive(this), handler,
buffers, socket_base::message_flags(0));
}
async_read_some()
内部实现分为两步:
- 先是构造并传入了
initiate_async_receive(this)
对象。 - 然后调用
async_initiate()
函数。
(这里 handler
为 asio::use_awaitable
)
先跟踪 initiate_async_receive
类型:
class initiate_async_receive
{
public:
typedef Executor executor_type;
explicit initiate_async_receive(basic_stream_socket* self)
: self_(self)
{
}
const executor_type& get_executor() const noexcept
{
return self_->get_executor();
}
template <typename ReadHandler, typename MutableBufferSequence>
void operator()(ReadHandler&& handler,
const MutableBufferSequence& buffers,
socket_base::message_flags flags) const
{
// If you get an error on the following line it means that your handler
// does not meet the documented type requirements for a ReadHandler.
ASIO_READ_HANDLER_CHECK(ReadHandler, handler) type_check;
detail::non_const_lvalue<ReadHandler> handler2(handler);
self_->impl_.get_service().async_receive(
self_->impl_.get_implementation(), buffers, flags,
handler2.value, self_->impl_.get_executor());
}
private:
basic_stream_socket* self_;
};
主要操作集中在 operator()
,而构造函数并没有特殊的副作用,因此先不关注。
async_initiate()
接下来再分析 async_initiate()
函数。该函数存在 2 个 SFINAE 版本,用以区分 async_result
有没有成员函数。这里用版本 1 和版本 2 表示:
// async_initiate() 版本 1
// 调用 async_result<>::initiate()
// SFINAE 依据 async_result_has_initiate_memfn 选择
// <del>写的什么玩意,崩的要死</del>
template <typename CompletionToken,
ASIO_COMPLETION_SIGNATURE Signature,
typename Initiation, typename... Args>
inline typename enable_if<
detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
ASIO_INITFN_DEDUCED_RESULT_TYPE(CompletionToken, Signature,
(async_result<typename decay<CompletionToken>::type,
Signature>::initiate(declval<ASIO_MOVE_ARG(Initiation)>(),
declval<ASIO_MOVE_ARG(CompletionToken)>(),
declval<ASIO_MOVE_ARG(Args)>()...)))>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
ASIO_MOVE_ARG(Args)... args)
{
return async_result<typename decay<CompletionToken>::type,
Signature>::initiate(ASIO_MOVE_CAST(Initiation)(initiation),
ASIO_MOVE_CAST(CompletionToken)(token),
ASIO_MOVE_CAST(Args)(args)...);
}
版本 1 就是挑选 async_result::initate(initiation, token, args...)
。
// async_initiate() 版本 2
// 调用 initiation(), async_completion<>.result.get();
template <typename CompletionToken,
ASIO_COMPLETION_SIGNATURE Signature,
typename Initiation, typename... Args>
inline typename enable_if<
!detail::async_result_has_initiate_memfn<CompletionToken, Signature>::value,
ASIO_INITFN_RESULT_TYPE(CompletionToken, Signature)>::type
async_initiate(ASIO_MOVE_ARG(Initiation) initiation,
ASIO_NONDEDUCED_MOVE_ARG(CompletionToken) token,
ASIO_MOVE_ARG(Args)... args)
{
async_completion<CompletionToken, Signature> completion(token);
ASIO_MOVE_CAST(Initiation)(initiation)(
ASIO_MOVE_CAST(ASIO_HANDLER_TYPE(CompletionToken,
Signature))(completion.completion_handler),
ASIO_MOVE_CAST(Args)(args)...);
return completion.result.get();
}
版本 2 需要引入 async_completion
类,主要是 async_completion
配合 token
从中推导出 completion_handler
。
调用过程为:
- 通过
token
构造async_completion
,该对象提供handler
。 - 执行
initiation
对象的operator()(handler, args...)
。 - 收集
result
。
至于版本是怎么区分的就看这一堆萃取:
template <typename CompletionToken,
ASIO_COMPLETION_SIGNATURE... Signatures>
struct async_result_has_initiate_memfn
// 根据 sizeof(async_result_initiate_memfn_helper) 大小来判断
// > 1 就有成员函数, == 1 没有
: integral_constant<bool, sizeof(async_result_initiate_memfn_helper<
async_result<decay_t<CompletionToken>, Signatures...>
>(0)) != 1>
{
};
// 而 async_result_initiate_memfn_helper 有 2 个版本
// 不具有实现的函数签名,对应大小为 1
template <typename>
char (&async_result_initiate_memfn_helper(...))[2];
// 如果有实现,那就是 16 字节
// T 为 async_result<>
template <typename T>
char async_result_initiate_memfn_helper(
async_result_memfns_check<
void (async_result_memfns_base::*)(),
&async_result_memfns_derived<T>::initiate>*);
// T 为 void (async_result_memfns_base::*)()
// 传入&async_result_memfns_derived<T>::initiate>*
template <typename T, T>
struct async_result_memfns_check
{
};
// 对类型{T, async_result_memfns_base}构成 mixin
// 此时具有了 initiate() 成员函数
template <typename T>
struct async_result_memfns_derived
: T, async_result_memfns_base
{
};
// 因此就是看是否有 async_result_memfns_derived<async_result>::initiate() 成员函数
// 而这个成员函数是通过 async_result::initiate() 的继承得到的
// 显然在这个 async_read_some() 流程中对应的 async_result<>特化是具有 initiate() 成员函数的
// (为啥写这么绕呢?也许这就是 Asio 吧……)
async_result
async_result 是一个定制点:决定返回类型和具体完成令牌
目前分析得到,流程会调用到 async_result<CompletionToken, Signature>::initiate()
。而这里对应的完成令牌(completion token)即为 asio::use_awaitable_t
,依此可以找到特化模板:
template <typename Executor, typename R, typename... Args>
class async_result<use_awaitable_t<Executor>, R(Args...)>
{
public:
typedef typename detail::awaitable_handler<
Executor, typename decay<Args>::type...> handler_type;
typedef typename handler_type::awaitable_type return_type;
// 调用到的 initiate() 函数
// 注意到这是一个协程,因此要关注 return_type 是怎样的类设计
//
// 另外,initiation 就是原地构造的 initiate_async_receive(socket) 对象
template <typename Initiation, typename... InitArgs>
static return_type initiate(Initiation initiation,
use_awaitable_t<Executor> u, InitArgs... args)
{
(void)u;
co_await [&](auto* frame)
{
ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_));
handler_type handler(frame->detach_thread());
// 实际是在这里处理初始化函数
std::move(initiation)(std::move(handler), std::move(args)...);
return static_cast<handler_type*>(nullptr);
};
for (;;) {} // Never reached.
#if defined(_MSC_VER)
co_return dummy_return<typename return_type::value_type>();
#endif // defined(_MSC_VER)
}
};
async_result<use_awaitable_t>::initiate()
函数使用了 co_await
。根据定义,这是一个协程。对于协程我们需要关注其返回类型 return_type
的推导。
// return_type 即 handler_type::awaitable_type
// 那么 handler_type 又是啥
typedef typename handler_type::awaitable_type return_type;
// handler_type 就是 asio::detail::awaitable_handler
typedef typename detail::awaitable_handler<
Executor, typename decay<Args>::type...> handler_type;
小结
现在先阶段性小结吧。当我们以协程的方式调用一个异步函数时:
asio::awaitable<void> do_stuff_pseudo(auto io_object) {
auto resume_result = co_await io_object.async_operation(..., asio::use_awaitable);
}
- 实际是以
asio::awaitable
作为返回类型的协程的前提下。 - 执行
co_await asio::detail::awaitable_handler::awaitable_type{}
。 - 而异步函数则有
initiation = initiate_do_sth{io_object_ptr}
对象在内部传递。
这里假设
initiate_do_sth
是某个类,该类实现了operator()
,通过构造函数得到initaiton
对象。
需要注意的是,从 do_stuff_pseudo
函数开始,就已经是协程了。Asio 设置的 initial_suspend()
选择返回 suspend_always
。因此本地(当前运行的)上下文的调用必然是直接暂停返回的。
作为小结,这里给出一个大概的内联展开调用关系:
// 本地上下文,直接暂停
do_stuff_pseudo()
...
io_object.async_operation()
return async_initiate(initiation = initiate_do_sth{io_object_ptr})
// 内部存在 co_await,这也是一个协程
return async_result::initiate(initiation, token, ...)
co_await [](frame) { /* 本该在这里展开 initiation(...) */ }
for(;;)
-------------------------------------------------------------------
// 其它上下文,醒来后发现自己跑到别的地方去了
// 熟悉 Asio 的同学也应该猜到是由 executor 给出的上下文
initiation(handler = deduced_from(async_result<token>), ...)
剩下的问题核心就在于 awaitable_type
别名:先剧透一下,毫无意外就是 asio::awaitable
。
asio::awaitable
上面分析到协程的返回类型实际是 asio::detail::awaitable_handler::awaitable_type
。
逐一解析,先从 awaitable_handler
看起吧。这里开始将涉及到若干协程相关数据结构:
awaitable_handler
.awaitable_handler_base
.awaitable_thread
.awaitable_frame
.awaitable_frame_base
.awaitable
.
awaitable_handler
// awaitable_handler 派生于 awaitable_handler_base
// 核心就是多了 operator() 作为 handler
template <typename Executor, typename... Ts>
class awaitable_handler
: public awaitable_handler_base<Executor, std::tuple<Ts...>>
{
public:
using awaitable_handler_base<Executor,
std::tuple<Ts...>>::awaitable_handler_base;
template <typename... Args>
void operator()(Args&&... args)
{
this->frame()->attach_thread(this);
this->frame()->return_values(std::forward<Args>(args)...);
this->frame()->pop_frame();
this->pump();
}
};
// awaitable_handler_base 派生于 awaitable_thread
// 这里看到 awaitable_type 就是 asio::awaitable
template <typename Executor, typename T>
class awaitable_handler_base
: public awaitable_thread<Executor>
{
public:
typedef void result_type;
// 找到你了
typedef awaitable<T, Executor> awaitable_type;
// Construct from the entry point of a new thread of execution.
awaitable_handler_base(awaitable<void, Executor> a, const Executor& ex)
: awaitable_thread<Executor>(std::move(a), ex)
{
}
// Transfer ownership from another awaitable_thread.
explicit awaitable_handler_base(awaitable_thread<Executor>* h)
: awaitable_thread<Executor>(std::move(*h))
{
}
protected:
awaitable_frame<T, Executor>* frame() noexcept
{
return static_cast<awaitable_frame<T, Executor>*>(this->top_of_stack_);
}
};
awaitable_thread
// awaitable_handler 的基类,实际管理 awaitable 和 awaitable_frame
template <typename Executor>
class awaitable_thread
{
public:
typedef Executor executor_type;
// Construct from the entry point of a new thread of execution.
awaitable_thread(awaitable<void, Executor> p, const Executor& ex)
: bottom_of_stack_(std::move(p)),
top_of_stack_(bottom_of_stack_.frame_),
executor_(ex)
{
}
// Transfer ownership from another awaitable_thread.
awaitable_thread(awaitable_thread&& other) noexcept
: bottom_of_stack_(std::move(other.bottom_of_stack_)),
top_of_stack_(std::exchange(other.top_of_stack_, nullptr)),
executor_(std::move(other.executor_))
{
}
// Clean up with a last ditch effort to ensure the thread is unwound within
// the context of the executor.
~awaitable_thread()
{
if (bottom_of_stack_.valid())
{
// Coroutine "stack unwinding" must be performed through the executor.
(post)(executor_,
[a = std::move(bottom_of_stack_)]() mutable
{
awaitable<void, Executor>(std::move(a));
});
}
}
executor_type get_executor() const noexcept
{
return executor_;
}
// Launch a new thread of execution.
void launch()
{
top_of_stack_->attach_thread(this);
pump();
}
protected:
template <typename> friend class awaitable_frame_base;
// Repeatedly resume the top stack frame until the stack is empty or until it
// has been transferred to another resumable_thread object.
void pump()
{
do top_of_stack_->resume(); while (top_of_stack_);
if (bottom_of_stack_.valid())
{
awaitable<void, Executor> a(std::move(bottom_of_stack_));
a.frame_->rethrow_exception();
}
}
awaitable<void, Executor> bottom_of_stack_;
awaitable_frame_base<Executor>* top_of_stack_;
executor_type executor_;
};
awaitable
现在定位到了 awaitable_handler_base::awaitable_type
别名就是 awaitable
。看下实现:
/// The return type of a coroutine or asynchronous operation.
template <typename T, typename Executor = any_io_executor>
class awaitable
{
public:
/// The type of the awaited value.
typedef T value_type;
/// The executor type that will be used for the coroutine.
typedef Executor executor_type;
/// Default constructor.
constexpr awaitable() noexcept
: frame_(nullptr)
{
}
/// Move constructor.
awaitable(awaitable&& other) noexcept
: frame_(std::exchange(other.frame_, nullptr))
{
}
/// Destructor
~awaitable()
{
if (frame_)
frame_->destroy();
}
/// Checks if the awaitable refers to a future result.
bool valid() const noexcept
{
return !!frame_;
}
#if !defined(GENERATING_DOCUMENTATION)
// Support for co_await keyword.
bool await_ready() const noexcept
{
return false;
}
// Support for co_await keyword.
// 不管是暂停还是恢复,都关联到 awaitable_frame
template <class U>
void await_suspend(
detail::coroutine_handle<detail::awaitable_frame<U, Executor>> h)
{
frame_->push_frame(&h.promise());
}
// Support for co_await keyword.
T await_resume()
{
return awaitable(static_cast<awaitable&&>(*this)).frame_->get();
}
#endif // !defined(GENERATING_DOCUMENTATION)
private:
template <typename> friend class detail::awaitable_thread;
template <typename, typename> friend class detail::awaitable_frame;
// Not copy constructible or copy assignable.
awaitable(const awaitable&) = delete;
awaitable& operator=(const awaitable&) = delete;
// Construct the awaitable from a coroutine's frame object.
// 构造一个 awaitable 必须要传递 awaitable_frame
explicit awaitable(detail::awaitable_frame<T, Executor>* a)
: frame_(a)
{
}
detail::awaitable_frame<T, Executor>* frame_;
};
可以看出,awaitable
也实现了 awaiter 接口:
await_ready()
.await_suspend()
.await_resume()
.
它的实现没有复杂的设计:
- 构造可能传入等待帧
awaitable_frame
。 - 在暂停流程中,需要压帧操作。
- 而在恢复流程中,则从帧获取结果。
awaitable_frame
显然协程的具体实现放到等待帧上,还没有梳理上面遗留的数据结构,现在又多了两个问题:这个帧长什么样子的呢?又是从哪里得到帧来构造 awaitable
的呢?
promise_type
首先需要知道一个 awaitable_frame
与 awaitable
的关联:前者是后者的 promise_type
。这个关系可以在萃取中得知:
namespace std {
template <typename T, typename Executor, typename... Args>
struct coroutine_traits<asio::awaitable<T, Executor>, Args...>
{
typedef asio::detail::awaitable_frame<T, Executor> promise_type;
};
} // namespace std
接着看实现。awaitable_frame
具有多个特化,依照目前流程挑选 awaitable_frame<void>
。
// awaitable_frame<T/void>派生于 awaitable_frame_base
// 与 T/void 相关的事务放到 awaitable_frame 类实现
// 其余 promise_type 细节放到 awaitable_frame_base 类实现
template <typename Executor>
class awaitable_frame<void, Executor>
: public awaitable_frame_base<Executor>
{
public:
awaitable<void, Executor> get_return_object()
{
this->coro_ = coroutine_handle<awaitable_frame>::from_promise(*this);
// 问题一的答案:promise_type 传递自身给 awaitable
return awaitable<void, Executor>(this);
};
// 如果是 awaitable_frame<T, Executor>特化
// 那就改为实现 return_value(T)
void return_void()
{
}
void get()
{
this->caller_ = nullptr;
this->rethrow_exception();
}
};
类设计就是简单的继承关系,分离 T 和 promise_type 的职责。
下面按照不同的功能对 awaitable_frame_base
进行代码划分。
operator new
先是内存管理的部分:
// 重载了 operator new 和 operator delete 来定制协程帧的动态内存分配
#if !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
void* operator new(std::size_t size)
{
// thread_info_base 的实现粗略看过,大概是优先使用线程本地的内存资源
return asio::detail::thread_info_base::allocate(
asio::detail::thread_info_base::awaitable_frame_tag(),
asio::detail::thread_context::thread_call_stack::top(),
size);
}
void operator delete(void* pointer, std::size_t size)
{
asio::detail::thread_info_base::deallocate(
asio::detail::thread_info_base::awaitable_frame_tag(),
asio::detail::thread_context::thread_call_stack::top(),
pointer, size);
}
#endif // !defined(ASIO_DISABLE_AWAITABLE_FRAME_RECYCLING)
这部分使用了 Asio 自己的内存池。
promise operations
然后是作为一个承诺类型应该有的基本操作:
// The frame starts in a suspended state until the awaitable_thread object
// pumps the stack.
// 默认是暂停状态,需要 pump() 操作才能唤醒
auto initial_suspend() noexcept
{
return suspend_always();
}
// On final suspension the frame is popped from the top of the stack.
// 最后则弹出自身的帧
auto final_suspend() noexcept
{
struct result
{
// 在下方返回时传递 this 构造 this_成员
awaitable_frame_base* this_;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(coroutine_handle<void>) noexcept
{
// 出帧操作
this->this_->pop_frame();
}
void await_resume() const noexcept
{
}
};
return result{this};
}
void set_except(std::exception_ptr e) noexcept
{
pending_exception_ = e;
}
void set_error(const asio::error_code& ec)
{
this->set_except(std::make_exception_ptr(asio::system_error(ec)));
}
void unhandled_exception()
{
set_except(std::current_exception());
}
void rethrow_exception()
{
if (pending_exception_)
{
std::exception_ptr ex = std::exchange(pending_exception_, nullptr);
std::rethrow_exception(ex);
}
}
await_transform
接下来这部分比较重要,用于执行可等待体的转换操作:
// 众多的 await_transform
// - 对于 awaitable,原样返回
// - 对于 executor,返回关联 executor 类型(好神奇的用法)
// - 对于函数 f,这就稍微复杂了,看下面注释
template <typename T>
auto await_transform(awaitable<T, Executor> a) const
{
return a;
}
// This await transformation obtains the associated executor of the thread of
// execution.
auto await_transform(this_coro::executor_t) noexcept
{
struct result
{
awaitable_frame_base* this_;
bool await_ready() const noexcept
{
return true;
}
void await_suspend(coroutine_handle<void>) noexcept
{
}
auto await_resume() const noexcept
{
return this_->attached_thread_->get_executor();
}
};
return result{this};
}
// This await transformation is used to run an async operation's initiation
// function object after the coroutine has been suspended. This ensures that
// immediate resumption of the coroutine in another thread does not cause a
// race condition.
// 注释提到,这就是一个异步函数实现转换的重载版本
// 在这里实现跨线程并不会引起竞争
template <typename Function>
auto await_transform(Function f,
// 既然注释都说了在这里转换,我就不看这一坨萃取了……
typename enable_if<
is_convertible<
typename result_of<Function(awaitable_frame_base*)>::type,
awaitable_thread<Executor>*
>::value
>::type* = 0)
{
struct result
{
Function function_;
awaitable_frame_base* this_;
bool await_ready() const noexcept
{
return false;
}
void await_suspend(coroutine_handle<void>) noexcept
{
// 异步函数与模拟栈实现关联
// 并且在这里开始执行
// 划重点,总算动起来了
function_(this_);
}
void await_resume() const noexcept
{
}
};
return result{std::move(f), this};
}
帧管理
后续是 frame 和 thread 的管理:
void attach_thread(awaitable_thread<Executor>* handler) noexcept
{
attached_thread_ = handler;
}
awaitable_thread<Executor>* detach_thread() noexcept
{
return std::exchange(attached_thread_, nullptr);
}
// 从作者注释(下面)画的关系图,可以得知模拟的等待帧是在一个 thread 内管理的
// 一个“thread”内的帧都使用同一 executor,并且循环出栈就能跑起来
void push_frame(awaitable_frame_base<Executor>* caller) noexcept
{
caller_ = caller;
attached_thread_ = caller_->attached_thread_;
attached_thread_->top_of_stack_ = this;
caller_->attached_thread_ = nullptr;
}
void pop_frame() noexcept
{
if (caller_)
caller_->attached_thread_ = attached_thread_;
attached_thread_->top_of_stack_ = caller_;
attached_thread_ = nullptr;
caller_ = nullptr;
}
void resume()
{
coro_.resume();
}
void destroy()
{
coro_.destroy();
}
protected:
coroutine_handle<void> coro_ = nullptr;
awaitable_thread<Executor>* attached_thread_ = nullptr;
awaitable_frame_base<Executor>* caller_ = nullptr;
std::exception_ptr pending_exception_ = nullptr;
整体设计
在类的注释中,Asio 作者给出一张图来说明:
// An awaitable_thread represents a thread-of-execution that is composed of one
// or more "stack frames", with each frame represented by an awaitable_frame.
// All execution occurs in the context of the awaitable_thread's executor. An
// awaitable_thread continues to "pump" the stack frames by repeatedly resuming
// the top stack frame until the stack is empty, or until ownership of the
// stack is transferred to another awaitable_thread object.
//
// +------------------------------------+
// | top_of_stack_ |
// | V
// +--------------+---+ +-----------------+
// | | | |
// | awaitable_thread |<---------------------------+ awaitable_frame |
// | | attached_thread_ | |
// +--------------+---+ (Set only when +---+-------------+
// | frames are being |
// | actively pumped | caller_
// | by a thread, and |
// | then only for V
// | the top frame.) +-----------------+
// | | |
// | | awaitable_frame |
// | | |
// | +---+-------------+
// | |
// | | caller_
// | :
// | :
// | |
// | V
// | +-----------------+
// | bottom_of_stack_ | |
// +------------------------------->| awaitable_frame |
// | |
// +-----------------+
这里描述了 awaitable_thread
和 awaitable_frame
的关联。我的理解是作者模拟了线程和栈。awaitable_thread
“线程”管理一组协程,而协程都以“栈”的形式压到同一“线程”内。可以想象得到,用户调用 co_await
时则会入栈,而整个“线程”的运作机制就是不断出栈,当然具体运行的上下文仍是 executor
对应的 context,这就与 Asio 原有的机制挂上钩了。
NOTE: 我认为这里需要“线程”的概念是因为 asio::co_spawn()
模拟了 fork()/execve()
原语。
回到 await_frame
的实现上,作为一个承诺类型,可以看到它的 initial_suspend()
是暂停的,需要由管理线程唤醒。final_suspend()
则维护栈结构的出帧操作。
另外一个关键点是 await_transform
转换。有三种重载:
- 如果是
awaitable
,原地返回。 - 如果
executor_t
,将(不暂停并)直接恢复得到关联的executor
。 - 如果是异步函数
f
,则在暂停后执行f(this_await_frame)
,传递帧指针后执行异步函数。
co_spawn()
说到执行,前面提到的异步函数本身是协程(结论见async_result),而且 do_stuff()
也是协程,其流程都是“懒惰”而尚未实际执行的。我们需要一个 kick off 的契机,但目前还缺少 executor 信息。一般来说,上下文运行于 asio::io_context.run()
对应的线程,而要让协程跑起来还需要 asio::co_spawn(executor, awaitable{}, completion_token)
,可以说这是一个协程开始孵化的地方。现在从头来梳理一遍调用过程。按照文中给的例子,这里 executor
传入 io_context
对象(实际是 context
而非 executor
,没关系,Asio 帮你搞定),awaitable{}
传入 do_stuff
协程对应的可等待体 do_stuff(socket)
,completion_token
设为空置的 asio::detached
表示不关心完成结果。
现在来看 co_spawn()
实现。
// 这里是实现部分,最后一个参数是默认 `=0`(接口分离在另一个文件了,所以函数签名信息不全)
template <typename Executor, typename AwaitableExecutor,
ASIO_COMPLETION_TOKEN_FOR(
void(std::exception_ptr)) CompletionToken>
inline ASIO_INITFN_AUTO_RESULT_TYPE(
CompletionToken, void(std::exception_ptr))
co_spawn(const Executor& ex,
awaitable<void, AwaitableExecutor> a, CompletionToken&& token,
typename enable_if<
(is_executor<Executor>::value || execution::is_executor<Executor>::value)
&& is_convertible<Executor, AwaitableExecutor>::value
>::type*)
{
return async_initiate<CompletionToken, void(std::exception_ptr)>(
detail::initiate_co_spawn<AwaitableExecutor>(AwaitableExecutor(ex)),
token, detail::awaitable_as_function<
void, AwaitableExecutor>(std::move(a)));
}
co_spawn()
的实现再次用到 async_initiate()
。除了传入 initiate_co_spawn
对象以外,还使用 awaitable_as_function
将 awaitable
封装为函数。
async_result
前面提到,async_initiate()
具有 2 个重载版本,使用哪个版本取决于 async_result
是否有 initiate()
函数。我们需要找 token
为 asio::detached_t
的特化版本。
template <typename Signature>
struct async_result<detached_t, Signature>
{
// detached_handler 内部全部是空实现,表示不关心处理结果,仅作为一个 handler 适配器
typedef asio::detail::detached_handler completion_handler_type;
// 返回 void
typedef void return_type;
explicit async_result(completion_handler_type&)
{
}
void get()
{
}
#if defined(ASIO_HAS_VARIADIC_TEMPLATES)
template <typename Initiation, typename RawCompletionToken, typename... Args>
static return_type initiate(
ASIO_MOVE_ARG(Initiation) initiation, // 传入 initiate_co_spawn{ex}
ASIO_MOVE_ARG(RawCompletionToken), // 传入 asio::detached
ASIO_MOVE_ARG(Args)... args) // 传入 awaitable_as_function{a}
{
ASIO_MOVE_CAST(Initiation)(initiation)(
detail::detached_handler(detached_t()),
ASIO_MOVE_CAST(Args)(args)...);
}
#else
// ...略
#endif
};
由于具有 initate()
函数,因此依然是版本 1。将执行 async_result::initate()
,展开为 initiate_co_spawn{ex}.operator()(detached, awaitable_as_function{a})
。
注意和
use_awaitable
版本的异步函数对应的initiate()
有所不同,它并非协程:跳回上面看看。
awaitable_as_function
顺便补充一下 awaitable_as_function
,是一个将可等待体伪造成仿函数的类。
template <typename T, typename Executor>
class awaitable_as_function
{
public:
explicit awaitable_as_function(awaitable<T, Executor>&& a)
: awaitable_(std::move(a))
{
}
// 执行 () 就能还原回 awaitable
awaitable<T, Executor> operator()()
{
return std::move(awaitable_);
}
private:
awaitable<T, Executor> awaitable_;
};
initiate_co_spawn
template <typename Executor>
class initiate_co_spawn
{
public:
typedef Executor executor_type;
template <typename OtherExecutor>
explicit initiate_co_spawn(const OtherExecutor& ex)
: ex_(ex)
{
}
executor_type get_executor() const ASIO_NOEXCEPT
{
return ex_;
}
template <typename Handler, typename F>
void operator()(Handler&& handler, F&& f) const
{
typedef typename result_of<F()>::type awaitable_type;
auto a = (co_spawn_entry_point)(static_cast<awaitable_type*>(nullptr),
ex_, std::forward<F>(f), std::forward<Handler>(handler));
awaitable_handler<executor_type, void>(std::move(a), ex_).launch();
}
private:
Executor ex_;
};
如上述 initiate_co_spawn
代码,operator()(detached, awaitable_as_function{a2})
展开后:
- 先执行
a = co_spawn_entry_point(nullptr, awaitable_as_function{a2}, detached)
得到awaitable a
。 - 再执行
awaitable_handler(a, ex).launch()
。
NOTE: 这里 a2
指的是 co_spawn()
传递的用户协程生成的可等待体。
co_spawn_entry_point()
template <typename Executor, typename F, typename Handler>
awaitable<void, Executor> co_spawn_entry_point(
awaitable<void, Executor>*, Executor ex, F f, Handler handler)
{
// 构造 work guard
// make_co_spawn_work_guard() 这部分代码和 unified executors 关联性较大
// 为 ex 尝试添加 execution::outstanding_work_t::tracked_t 的 property
// 表示将来会有任务提交,避免此时 executor(比如 io context)关闭
auto spawn_work = make_co_spawn_work_guard(ex);
auto handler_work = make_co_spawn_work_guard(
asio::get_associated_executor(handler, ex));
// asio::post(ex, token) 是 execution 接口
// 差不多就是添加一些 property 然后 execute
// 总之,用目前的示例就是转发到 io_context.execute(ex, use_awaitable)
//
// 那么问题来了,这里调用 asio::post() 将返回什么类型?co_await ???
// 我简单剧透,还是 awaitable
(void) co_await (post)(spawn_work.get_executor(),
use_awaitable_t<Executor>{__FILE__, __LINE__, "co_spawn_entry_point"});
std::exception_ptr e = nullptr;
try
{
// 实际执行点
co_await f();
}
catch (...)
{
e = std::current_exception();
}
(dispatch)(handler_work.get_executor(),
[handler = std::move(handler), e]() mutable
{
handler(e);
});
}
co_spawn_entry_point()
是运行时第一个调用到的协程,它的实现有点复杂:
- 先是
co_await post(ex, use_awaitable)
。 - 然后是
co_await f()
。 - 最后是 work guard handler 收尾。
好吧,虽然我们在聊
co_spawn()
,但严格点说do_stuff()
函数才是第一个调用到的协程。
由于 initial_suspend()
的存在,co_spawn_entry_point()
并没有执行,而是直接暂停而返回给上层 initiate_co_spawn::operator()
。
launch()
回到上层,接下来就执行 awaitable_handler(a, ex).launch()
。
在 launch()
前,可以先回顾一下 awaitable_handler
的构造。它是经由 awaitable_handler -> awaitable_handler_base -> awaitable_thread
的委托构造而得到的对象。此时 a
作为 bottom_of_stack_
成员而被 awaitable_handler
持有;top_of_stack_
成员指向 a.frame_
。
这里有个细节是 awaitable.frame_
是怎么得到的问题,这玩意默认构造可是空指针啊。解答见 promise_type 小节。作为承诺类型的 awaitable_frame
在协程调用前即被隐式 operator new
分配;当你在一个协程返回 awaitable
前,awaitable_frame
调用 get_return_object()
保证传递 this
给 awaitable
。而 awaitable
的实际对象 a
是一个已暂停(协程返回)过的可等待体,因此必然有等待帧。
// Launch a new thread of execution.
// awaitable_handler 执行
void launch()
{
top_of_stack_->attach_thread(this);
pump();
}
// awaitable_thread<Executor>* top_of_stack_执行
void attach_thread(awaitable_thread<Executor>* handler) noexcept
{
attached_thread_ = handler;
}
// Repeatedly resume the top stack frame until the stack is empty or until it
// has been transferred to another resumable_thread object.
// awaitable_handler 执行
void pump()
{
// 唤醒
do top_of_stack_->resume(); while (top_of_stack_);
if (bottom_of_stack_.valid())
{
awaitable<void, Executor> a(std::move(bottom_of_stack_));
a.frame_->rethrow_exception();
}
}
显然,这是整体设计里提到的出栈处理。在上一节,co_spawn_entry_point()
因被暂停而压帧(push_frame()
)。现在位于 top_of_stack_
,因此通过 resume()
唤醒,恢复执行。
恢复后的 co_spawn_entry_point()
先构造了 spawn_work
和 handler_work
,然后实际执行 co_await post(use_awaitable)
表达式。
post()
co_await post(ex, use_awaitable)
先执行了 post()
操作,然后对其返回值进行 co_await
操作。
// 这是 post 接口
template <typename Executor,
ASIO_COMPLETION_TOKEN_FOR(void()) CompletionToken>
ASIO_INITFN_AUTO_RESULT_TYPE(CompletionToken, void()) post(
const Executor& ex, ASIO_MOVE_ARG(CompletionToken) token,
typename enable_if<
execution::is_executor<Executor>::value || is_executor<Executor>::value
>::type*)
{
// 再一次,async_initiate
// token 传入 use_awaitable
// 从前面章节的 async_result<use_awaitable_t>可知,这里返回一个 awaitable
return async_initiate<CompletionToken, void()>(
detail::initiate_post_with_executor<Executor>(ex), token);
}
再一次看到了 async_initiate
,而 token
则传入 use_awaitable
,从前面的 async_result
章节 可知,将返回一个 awaitable
。因此,asio::post(use_awaitable)
重载版本的操作也是协程化的。
我们解决了上面留下来的问题,co_await (post)(..., use_awaitable)
仍是 co_await awaitable{...}
。万幸没有更加复杂的转换……
虽然 post()
作为协程是存在暂停现象,但是前面 launch()
函数的 pump()
操作又会将它唤醒,继续往下看展开后的操作。
暂停是因为
async_result<use_awaitable_t>::initiate()
是协程。
kick off
当尝试调用 async_result<use_awaitable_t>::initiate()
,需要注意这一块会关联到前面 async_result 的行为。为了方便再贴一次代码:
template <typename Initiation, typename... InitArgs>
static return_type initiate(Initiation initiation,
use_awaitable_t<Executor> u, InitArgs... args)
{
(void)u;
co_await [&](auto* frame)
{
ASIO_HANDLER_LOCATION((u.file_name_, u.line_, u.function_name_));
handler_type handler(frame->detach_thread());
// 实际是在这里处理初始化函数
std::move(initiation)(std::move(handler), std::move(args)...);
return static_cast<handler_type*>(nullptr);
};
for (;;) {} // Never reached.
}
async_result::initiate()
函数是一个协程,其内部操作有 co_await lambda
的行为。根据承诺对象的转换操作,这里对应 await_transform(f)
的重载:
- 先暂停(
await_ready()
返回false
)。 - 暂停后执行
await_suspend()
操作,传递this_
(等待帧)到lambda(frame = this_)
。 - 构造
awaitable_handler
类型的handler
。 - 执行
initiate_post_with_executor{}.operator()(handler)
。
initiate_post_with_executor
现在看下最后一步,即 initiate_post_with_executor{}.operator()(handler)
的流程:
template <typename Executor>
class initiate_post_with_executor
{
public:
typedef Executor executor_type;
explicit initiate_post_with_executor(const Executor& ex)
: ex_(ex)
{
}
executor_type get_executor() const ASIO_NOEXCEPT
{
return ex_;
}
// 传入 awaitable_handler
template <typename CompletionHandler>
void operator()(ASIO_MOVE_ARG(CompletionHandler) handler,
typename enable_if<
execution::is_executor<
typename conditional<true, executor_type, CompletionHandler>::type
>::value
&&
// 没心思看萃取,重载版本太多,看这个实现吧,大同小异
detail::is_work_dispatcher_required<
typename decay<CompletionHandler>::type,
Executor
>::value
>::type* = 0) const
{
typedef typename decay<CompletionHandler>::type handler_t;
typedef typename associated_executor<
handler_t, Executor>::type handler_ex_t;
handler_ex_t handler_ex((get_associated_executor)(handler, ex_));
typename associated_allocator<handler_t>::type alloc(
(get_associated_allocator)(handler));
// 简单来说,就是 ex.execute(handler);
// 这里使得上下文切换到 ex(比如 io_context)的上下文去执行 handler()
// 因此需要看实际的 awaitable_handler 的 operator() 要做什么
execution::execute(
asio::prefer(
asio::require(ex_, execution::blocking.never),
execution::relationship.fork,
execution::allocator(alloc)),
detail::work_dispatcher<handler_t, handler_ex_t>(
ASIO_MOVE_CAST(CompletionHandler)(handler), handler_ex));
}
// ...省略大量 operator() 重载
private:
Executor ex_;
};
现在可以得知,post(io_context)
操作将 awaitable_handler
投递到 io_context
上下文,投递完成后 co_await
暂停,实现了上下文的切换。
那么接下来呢?接下来 co_spawn()
已经结束流程了,因为返回 launch()
上层后 pump()
也不会再次唤醒任何协程。要问为什么还得看回 co_await lambda
倒数第二步。
context switch
co_await lambda
过程需要注意 handler
的构造函数以及 detach_thread
行为。(跳回 awaitable_handler 小节)
// 为了方便再贴一次代码
// Transfer ownership from another awaitable_thread.
explicit awaitable_handler_base(awaitable_thread<Executor>* h)
: awaitable_thread<Executor>(std::move(*h))
{
}
这里 handler
的构造使用了 std::move()
,引起了“线程”所有权的转移。被偷家的“线程”pump()
过程所用到的“栈”已经置空。
void pump()
{
// 唤醒?不存在的
do top_of_stack_->resume(); while (top_of_stack_);
if (bottom_of_stack_.valid())
{
awaitable<void, Executor> a(std::move(bottom_of_stack_));
a.frame_->rethrow_exception();
}
}
当执行完 post()
后,co_await post()
将使得协程 co_spawn_entry_point()
暂停。此后本地上下文再也不会恢复该协程。
// 其中某一特化 awaitable_handler::operator() 如下
// 注意从前面的 caller 来看,这是一个异步的过程
// 因为我们跟踪的流程是本地上下文执行的 co_spawn(io_context, a, detached)
// 而这个是在 io_context 上下文执行的,现在只是投递一个事件,还没有实际运行
// 等到 io_context.run() 才调用到这个 handler
template <typename... Args>
void operator()(Args&&... args)
{
this->frame()->attach_thread(this);
this->frame()->return_values(std::forward<Args>(args)...);
this->frame()->pop_frame();
this->pump();
}
而上下文将被切换到 executor
所指定的上下文。比如 io_context.run()
,使得 awaitable_handler{}()
被执行,此前暂停的协程可以再次恢复,并且运行所在的上下文已经切换到 io_context
。
再接下来,恢复的 co_spawn_entry_point()
则执行 co_await f()
,这里就是对应的 do_stuff()
用户层接口,真正执行异步函数。
总结
简单的做点总结吧:
- Asio 通过
async_result
萃取和use_awaitable
令牌来完成异步函数的协程化重载。 - Asio 划分协程组为模拟线程(awaitable thread),组内协程的调用关系模拟为栈结构。
- Asio 使用
co_spawn()
创建模拟线程和投递任务,并通过模拟线程迁移完成上下文切换。 - Asio 很抽象,我还远远跟不上作者的思路。
图文无关
按照惯例,文末随机附有内容无关的图。
Cocoa is cute!
References
Coroutines – cppreference
C++ Coroutines: Understanding operator co_await – Asymmetric Transfer
C++20 Coroutines Support – asio C++ library