本文是 Talking Async Ep2: Cancellation in depth 的个人观片笔记,演讲者为 Asio 作者 Christopher Kohlhoff,主要内容是介绍 Asio 对异步操作的取消设计。
前提说明
我真的很想结束这个话题,之前的笔记零零散散讨论过很多次取消点了,总是不满意,所以还是翻了点库存资料再调研一遍。但是这种抠细节的事情抠多了没意思,我暂时也没有改进 uring_exec 的计划,这回简单写点草稿算了。如有错误,还请指正。
另外一提,EP1 讨论了 Asio 的异步接口演变历史,话题相对轻松很多。我也做了简单的文本转译。
Man-in-the-middle proxy
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
tcp::socket server(client.get_executor());
steady_clock::time_point deadline {};
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
if (!e) {
// 结合了 C++20 协程和 operator || 重载的取消设计
co_await (
transfer(client, server, deadline) ||
transfer(server, client, deadline) ||
watchdog(deadline)
);
}
}
EP1 展示了 Asio 使用 C++20 协程 的取消设计,非常的直观:任意一个异步操作成功,就取消其余的异步操作。但这只是一种高层抽象的设计,作者专门录制了 EP2 演讲来说明这种设计的低层设施是怎样的,以及想要定制行为的话又该怎么做。
注:不会提到 Asio 内部复杂的源码实现,主要是看个思路。
讨论用的示例依然是和 EP1 相同的流程图,也就是做一个 proxy server,实现
这里每个红色的有向边都代表一个异步操作,流程从最上方的 async_accept()
开始执行。
Step 0: basic server
// https://github.com/chriskohlhoff/talking-async/blob/master/episode2/step_0.cpp
class proxy
: public std::enable_shared_from_this<proxy>
{
// ...省略一些实现,
// 程序开始时通过 accept 获得 client socket,
// 然后构造 proxy{client_socket}
// 用户主动 proxy.connect_to_server(target) 来启动上述流程图
// 共有 2 条异步链路:self->read_from_client() 和 self->read_from_server()
// 分别对应于上述流程图的两个循环,下面只展示 read_from_server
private:
void read_from_server()
{
auto self = shared_from_this();
server_.async_read_some(
buffer(data_from_server_),
[self](std::error_code error, std::size_t n)
{
if (!error)
{
self->write_to_client(n);
}
else
{
self->stop();
}
}
);
}
void write_to_client(std::size_t n)
{
auto self = shared_from_this();
async_write(
client_,
buffer(data_from_server_, n),
[self](std::error_code error, std::size_t /*n*/)
{
if (!error)
{
self->read_from_server();
}
else
{
self->stop();
}
}
);
}
void stop()
{
client_.close();
server_.close();
}
tcp::socket client_;
tcp::socket server_;
std::array<char, 1024> data_from_client_;
std::array<char, 1024> data_from_server_;
};
暂且不提协程,先回到传统的 handler 回调形式。流程图其中的一个 read from server 到 write to client 循环如上所示。现在的需求是,我们要基于这份代码增加一个超时取消的实现。
Step 1: race condition
class proxy
: public std::enable_shared_from_this<proxy>
{
// 只需在 proxy.connect_to_server(target) 中新增一条 self->watchdog() 异步链路
void watchdog()
{
auto self = shared_from_this();
watchdog_timer_.expires_at(deadline_);
watchdog_timer_.async_wait(
[self](std::error_code /*error*/)
{
if (!self->is_stopped())
{
auto now = steady_clock::now();
if (self->deadline_ > now)
{
// watchdog 也是一个异步循环
self->watchdog();
}
else
{
self->stop();
}
}
}
);
}
// 修改 stop,需要对 watchdog 自己的状态也做处理
void stop()
{
client_.close();
server_.close();
// 因为该接口不只是 watchdog 主动调用,
// 因此要处理 watchdog 关联的任何有状态成员
watchdog_timer_.cancel();
}
// 表示 watchdog 当中的 self->stop() 已经执行过
// (或者因其他因素,client 和 server 可能主动关闭)
bool is_stopped() const
{
// 使用 socket 状态表示,省掉一个单独的状态变量
return !client_.is_open() && !server_.is_open();
}
void read_from_client()
{
deadline_ = std::max(deadline_, steady_clock::now() + 5s);
// ...剩下操作保持不变
}
// 新增的数据结构
steady_clock::time_point deadline_;
asio::steady_timer watchdog_timer_;
};
思路在 EP1 也见识过,就是使用单独的 watchdog 异步链路来完成超时取消。所以这里只要静置超过五秒就会关掉连接。
// Klemens 的意见
void read_from_client()
{
watchdog_timer_.expires_at(steady_clock::now() + 5s);
watchdog_timer_.async_wait(/* 判断正常触发还是 aborted */);
// ...剩下操作保持不变
}
Step 2: completion condition
没有取消需求的情况,非常简单
取消操作,意味着存在竞态条件
前面的代码是不太可取的,在深入前需要先解释一下取消所带来的竞态条件(race condition)。取消操作是必须异步实现的(这方面没有讨论的余地),这意味着用户可能在发出取消请求后,仍然会收到成功而非被取消的 completion handler(理解为回调就好了)。这里看似影响不大,但是潜在更深层次的问题:如果你在 handler 当中也尝试发出异步操作,那么整个异步操作链可能永远无法结束。
// step 1 用到的代码
void stop()
{
client_.close();
server_.close();
watchdog_timer_.cancel();
}
但是在前面的 step 1 当中,却不存在这种问题。这是因为 stop()
依赖了 socket 的状态,这是直接暴力「关闭」了连接,并非「取消」对应的操作。
void stop()
{
client_.cancel();
server_.cancel();
watchdog_timer_.cancel();
}
如果使用了实际上的 cancel 取消操作,那么将引发上述的竞态条件问题。而且恰好是 read-write-loop 模式,整个异步操作链可能永远无法结束。
void stop()
{
client_.cancel();
server_.cancel();
watchdog_timer_.cancel();
// 新增!
stopped_ = true;
}
bool is_stopped() const
{
// 不能依赖 socket 状态
// 注:作者的原文件应该是忘记改了,个人手动修复
return stopped_;
}
void read_from_server()
{
deadline_ = std::max(deadline_, steady_clock::now() + 5s);
auto self = shared_from_this();
server_.async_read_some(
buffer(data_from_server_),
[self](std::error_code error, std::size_t n)
{
// 新增判断,注意前面讨论过,竞态条件不一定返回错误
if (!error && !self->is_stopped())
{
self->write_to_client(n);
}
else
{
self->stop();
}
}
);
}
void write_to_client(std::size_t n)
{
auto self = shared_from_this();
async_write(
client_,
buffer(data_from_server_, n),
[self](std::error_code error, std::size_t /*n*/)
{
// 新增判断
if (!error && !self->is_stopped())
{
self->read_from_server();
}
else
{
self->stop();
}
}
);
}
但是可以观察到,前面 step 1 没有问题,是恰好利用状态来解决的竞态条件。那么简单点我们单独在 handler 里面设置一个外部的、不再依赖于 socket 本身的状态检查是否可行?
事实上,竞态条件依然存在。因此异步写操作(async_write)实际上是由多个异步部分写操作(async_write_some)组成的,这是面向内核提供性能优化的设计。这意味着我们的状态无法在完整的异步写操作完成前检测出来。
NOTE: 性能优化指的是内核 buffer 一般是页对齐分配的,用户可以主动避免读/写跨页导致额外开销。
void write_to_client(std::size_t n)
{
auto self = shared_from_this();
async_write(
client_,
buffer(data_from_server_, n),
// 新增 completion condition!
[self](std::error_code error, std::size_t n) -> std::size_t
{
auto completion_condition = asio::transfer_all();
// 我们现在有机会进行状态判断
if (!self->is_stopped())
{
// 表示下一次尽最大努力交付(默认行为),
// n 为迄今已经传输的字节数,而函数对象返回尽可能大的值
return completion_condition(error, n);
}
else
{
// 没有剩余字节数需要写,是时候提交 completion handler 了
return 0;
}
},
// 这是之前的 completion handler
[self](std::error_code error, std::size_t /*n*/)
{
if (!error && !self->is_stopped())
{
self->read_from_server();
}
else
{
self->stop();
}
}
);
}
要解决这个问题,Asio 引入了 completion condition。这是一个函数对象,将会在每一个 async_write_some 之间调用。其基本作用就是用于检测下一次尝试写操作的最大字节数(就是上面提到的优化机会,应该也有 QoS 之类的用途吧),但是我们可以在里面添加用户定义的行为,比如我们现在需要的状态判断,此时通过返回 0 字节来提示写操作应该结束。
至少这个程序总算是解决了竞态条件问题。作者建议如果是简单的需求,shutdown/close 其实也是容许的。
Step 3: cancellation slot
basic_stream_socket::cancel
Cancel all asynchronous operations associated with the socket.
但是我们还没有达到想要的粒度。如 cancel 注释所言,它取消的是该 IO 对象(socket)关联的所有异步操作。我们需要更加精细的 per-operation cancellation。
class proxy
: public std::enable_shared_from_this<proxy>
{
// 现在 proxy 有 4 条异步链路,
// 除了 step 1 已有的 3 条,还新增 heartbeat
void heartbeat()
{
auto self = shared_from_this();
heartbeat_timer_.expires_after(1s);
heartbeat_timer_.async_wait(
[self](std::error_code /*error*/)
{
if (!self->is_stopped())
{
// 先不管 asio::cancellation_type
self->heartbeat_signal_.emit(asio::cancellation_type::total);
self->heartbeat();
}
}
);
}
void read_from_server()
{
auto self = shared_from_this();
server_.async_read_some(
buffer(data_from_server_),
// 通过 bind_cancellation_slot 组成新的 completion token
asio::bind_cancellation_slot(
// 使得该操作关联到 heartbeat_signal_ 的信号槽
heartbeat_signal_.slot(),
// 这里依然是 completion handler 的内容
[self](std::error_code error, std::size_t n)
{
if (!error)
{
self->num_heartbeats_ = 0;
self->write_to_client(n);
}
// 当 heartbeat 发出 emit 时,会走到该路径
else if (error == asio::error::operation_aborted)
{
++self->num_heartbeats_;
// 随便写点内容到 client
self->write_heartbeat_to_client();
}
else
{
self->stop();
}
}
)
);
}
void write_to_client(std::size_t n)
{
auto self = shared_from_this();
async_write(
client_,
buffer(data_from_server_, n),
[self](std::error_code error, std::size_t /*n*/)
{
if (!error)
{
self->read_from_server();
}
else
{
self->stop();
}
}
);
}
void write_heartbeat_to_client()
{
std::size_t n = asio::buffer_copy(
buffer(data_from_server_),
std::array<asio::const_buffer, 3>{
buffer("<heartbeat "),
buffer(std::to_string(num_heartbeats_)),
buffer(">\r\n")
}
);
write_to_client(n);
}
asio::steady_timer heartbeat_timer_;
asio::cancellation_signal heartbeat_signal_;
std::size_t num_heartbeats_ = 0;
};
Asio 通过引入 cancellation slot 来达成 per-operation cancellation。需要注意 cancellation slot 并不能直接解决前面的竞态条件问题(很明显,异步操作完成后 emit 是没用的)。作者提到此前在 EP1 展示的重载操作符其实也是内部使用 cancellation slot 来实现,但是会有额外标记(类似前面的 stopped_)来处理竞态条件。也就是说,cancellation slot 只是解决了 cancel 的粒度问题。
个人粗看了一眼 async_write 实现,对于前面提到的 async_write_some 组合问题,cancellation slot 是可以解决的(见链接,每一步都有检查)。但是这种机制作为单个异步操作粒度的设计,应该没办法解决 step 2 图中关于两个异步操作(分为 2 个不同的 slot)的竞态条件。
cancellation slot 是符合零成本抽象的,不使用 bind_cancellation_slot 将不会有任何运行时开销。作者还提到这种机制除了编译时检测以外还有运行时检测,以及使用后的 slot 是可回收的设计。总之这是一个比用户手写流程更好的选择。
cancellation slot 是可回收复用的
作者建议不要直接使用 cancellation slot 这种为高精度取消而实现的低层构建块,应该使用面向用户的高层设计:EP1 当中的协程就是一个好例子。
Step 4: parallel group
void read_from_server()
{
heartbeat_timer_.expires_after(1s);
auto self = shared_from_this();
// 直接省掉了单独的 heartbeat(),
// 通过 make_parallel_group 描述两个并行的异步链路
asio::experimental::make_parallel_group(
[this](auto token)
{
return server_.async_read_some(
buffer(data_from_server_),
token
);
},
[this](auto token)
{
return heartbeat_timer_.async_wait(token);
}
).async_wait(
// 异步等待的行为是可定制的(Asio 目前提供 4 种选项,也可以自己定制)
// 包括 wait_for_all、wait_for_one、wait_for_one_error 和 wait_for_one_success
// 这里表示只等待任意一方的完成,然后取消其他剩下的异步操作
asio::experimental::wait_for_one(),
[self](
std::array<std::size_t, 2> order,
std::error_code read_error, std::size_t n,
std::error_code /*timer_error*/)
{
// 看谁完成
switch (order[0])
{
case 0: // read
if (!read_error)
{
self->num_heartbeats_ = 0;
self->write_to_client(n);
}
else
{
self->stop();
}
break;
case 1: // timer
++self->num_heartbeats_;
self->write_heartbeat_to_client();
break;
}
}
);
}
除了协程以外,parallel group 也是一种面向用户的选择。它不再需要用户声明 asio::cancellation_signal,并且能帮你内部处理好事务。
Step 5: async initiate
在视频演示时,Asio 还没有集成好对异步信号等待的取消支持,接下来的 step 5 和 step 6 会展示如何将 cancellation slot 集成到用户定义的异步操作当中,从而实现满血版 async_wait for signal。
template <typename CompletionToken>
auto async_wait_for_signal(
asio::signal_set& sigset,
CompletionToken&& token)
{
return asio::async_initiate<CompletionToken,
void(std::error_code, std::string)>(
[&sigset](auto handler)
{
auto executor =
asio::get_associated_executor(
handler,
sigset.get_executor()
);
// 做点简单的定制,比如转换 signo 到 signame
auto intermediate_handler =
[handler = std::move(handler)](
std::error_code error,
int signo
) mutable
{
std::string signame;
switch (signo)
{
case SIGABRT: signame = "SIGABRT"; break;
case SIGFPE: signame = "SIGFPE"; break;
case SIGILL: signame = "SIGILL"; break;
case SIGINT: signame = "SIGINT"; break;
case SIGSEGV: signame = "SIGSEGV"; break;
case SIGTERM: signame = "SIGTERM"; break;
default: signame = "<other>"; break;
}
// handler 可以是 rvalue 形式的,只会调用一次
std::move(handler)(error, signame);
};
sigset.async_wait(
asio::bind_executor(
executor,
std::move(intermediate_handler)
)
);
},
// 别忘了 completion token
token
);
}
awaitable<void> timed_wait_for_signal()
{
asio::signal_set sigset(co_await this_coro::executor, SIGINT, SIGTERM);
asio::steady_timer timer(co_await this_coro::executor, 5s);
auto result = co_await (
async_wait_for_signal(sigset, use_awaitable) ||
timer.async_wait(use_awaitable)
);
switch (result.index())
{
case 0:
std::cout << "signal finished first: " << std::get<0>(result) << "\n";
break;
case 1:
std::cout << "timer finished first\n";
break;
}
}
int main()
{
asio::io_context ctx;
co_spawn(ctx, timed_wait_for_signal(), detached);
ctx.run();
}
step 5 展示的是一个简单的用户定义异步操作,还没有任何取消支持。具体细节可以了解 asio::async_initiate,至少你可以从例子得知,实现一个用户流程后,就可以作为 Asio 协程去使用。
Step 6: cancellation types
template <typename CompletionToken>
auto async_wait_for_signal(
asio::signal_set& sigset,
CompletionToken&& token)
{
return asio::async_initiate<CompletionToken,
void(std::error_code, std::string)>(
[&sigset](auto handler)
{
auto cancellation_slot =
asio::get_associated_cancellation_slot(
handler,
asio::cancellation_slot()
);
// 为了方便,这里直接使用了运行时检测的方式
if (cancellation_slot.is_connected())
{
// 可以理解为注册一个取消回调,当外部注入取消类型对象时,就会触发 cancel
cancellation_slot.assign(
[&sigset](asio::cancellation_type /*type*/)
{
sigset.cancel();
}
);
}
auto executor = // ...
auto intermediate_handler = // ...
sigset.async_wait(
asio::bind_executor(
executor,
std::move(intermediate_handler)
)
);
},
token
);
}
接下来基于 step 5 程序继续增加取消支持。cancellation slot 与 executor 的设计相似,都是通过关联(associate)handler 去获取,接着只需要注册一个取消回调。
最后讨论的是一直被忽略的 cancellation type。我们需要理解取消并不是简单的停止操作,还要考虑取消成功后对应的 IO 对象的状态影响。如果我们的取消是通过 close 做到的,那还意味着该对象以后不得有任何后续操作;所以要想保证取消后仍可以向对象发出新的异步操作,那就要得知取消操作后的对象是否仍然合法。
根据副作用造成的影响,Asio 提供了三种 cancellation type:terminal、partial 和 total。
位 | 取消成功时的保证 | 使用场景 |
---|---|---|
terminal | 该操作具有未明确的副作用,唯一安全的做法是关闭或销毁 I/O 对象 | 有状态的消息分帧协议实现(如异步操作发送/接收完整消息)。如果在消息体传输中途取消操作,无法向完成处理程序报告合理状态 |
partial | 该操作具有明确定义的副作用,完成处理程序会具体说明这些副作用 | 组合操作如 async_read 和 async_write 。如果在所有字节传输完成前取消,完成处理程序会收到已传输的总字节数。调用方可利用此信息发起新操作传输剩余字节
|
total | 该操作没有可通过 API 观察到的副作用 | - 传输零/非零字节的低级系统调用 - 即使成功也没有副作用的等待就绪操作 - 完全缓冲的消息帧协议(可存储部分消息供下次操作重用) |
个人理解 Asio 除了在尝试处理取消场景下碰到的合法状态问题,还顺手解决了部分成功问题。用户使用 partial,就是流式场景允许给你返回成功的部分值(这是主动 emit 的,也是知情的);而 total 是用了不可观测的黑盒,上层得到的只会是完整值,因此尤其适合上表提到的消息帧协议;terminal 没啥好说的,既然不可控,直接结束流程算了。这里的难点在于实现是靠库和用户协作的,下一步的例子可以感受一下。
Step 7: message frame example
// 假设你现在是一个 server,每次监听都能得到一个 client 并创建 session
awaitable<void> session(tcp::socket client)
{
message_reader<tcp::socket> reader(client);
for (;;)
{
std::string message = co_await reader.read_message();
if (!message.empty())
{
std::cout << "received: " << message << "\n";
}
else
{
co_return;
}
}
}
// 这个 session reader 就是不断的读取用户输入,通过 '|' 分帧
template <typename Stream>
class message_reader
{
public:
message_reader(Stream& stream)
: stream_(stream)
{
}
awaitable<std::string> read_message()
{
auto [e, n] =
co_await asio::async_read_until(
stream_,
dynamic_buffer(message_buffer_),
'|',
use_nothrow_awaitable
);
if (e)
{
co_return std::string();
}
std::string message(message_buffer_.substr(0, n));
message_buffer_.erase(0, n);
co_return message;
}
private:
Stream& stream_;
std::string message_buffer_;
};
为了说明 cancellation type 的使用,作者给了一个协程实现的消息帧模拟器。目前没什么特别的。
Step 8: coroutine cancellation state
awaitable<void> timeout(steady_clock::duration duration)
{
asio::steady_timer timer(co_await this_coro::executor);
timer.expires_after(duration);
co_await timer.async_wait(use_nothrow_awaitable);
}
awaitable<void> session(tcp::socket client)
{
message_reader<tcp::socket> reader(client);
for (;;)
{
// session 和此前类似,但是多了超时管理
auto result = co_await (
reader.read_message() ||
timeout(5s)
);
switch (result.index())
{
case 0:
if (!std::get<0>(result).empty())
{
std::cout << "received: " << std::get<0>(result) << "\n";
}
else
{
co_return;
}
break;
case 1:
std::cout << "timed out\n";
break;
}
}
}
template <typename Stream>
class message_reader
{
public:
message_reader(Stream& stream)
: stream_(stream)
{
}
awaitable<std::string> read_message()
{
co_await this_coro::reset_cancellation_state(
// 一个 reset filter,将 total 转换为 partial
[](cancellation_type requested)
{
// 这里的意思是,asio::async_read_until 并不支持 total 取消(详见文档)
// 但是我们要让整个 read_message 协议支持 total 语义
if ((requested & cancellation_type::total) != cancellation_type::none)
{
return cancellation_type::partial;
}
else
{
return requested;
}
}
);
// 转换后的 cancellation state 将影响到下面的操作
auto [e, n] =
co_await asio::async_read_until(
stream_,
dynamic_buffer(message_buffer_),
'|',
use_nothrow_awaitable
);
// 还可以得知是否取消,以及取消的类型
if ((co_await this_coro::cancellation_state).cancelled() != cancellation_type::none)
{
co_return std::string();
}
// 大概是协程中表示 per-operation 的意思
co_await this_coro::reset_cancellation_state(); // Reset to default, which is terminal only.
if (e)
{
co_return std::string();
}
std::string message(message_buffer_.substr(0, n));
message_buffer_.erase(0, n);
co_return message;
}
private:
Stream& stream_;
std::string message_buffer_;
};
这里在 step 7 的基础上尝试为 read message 支持 total 取消类型。asio::async_read_until 本身仅支持 terminal/partial 取消类型,但是我们通过用户协作的内部缓冲管理,依然能够为达成整体 total 取消的支持。也就是说,read message 在能支持异步取消的同时,要么提供完整帧,要么提供空帧,而不会是部分帧。
总结
Asio 的取消设计算是简单过目了,官方示例还是比较少,希望没理解错。其实说简单点,取消设计的一个通用思路就是总是在 IO 操作前去确认取消状态(提点文章无关的,Kotlin 协程就是这样直接而且遥遥领先),但是 Asio 考虑得更深,比如一开始就说明了确认谁的状态这种竞态条件问题,还有更多的细节比如粒度、IO 对象、定制还有不同异步模式的兼容,所有因素组合起来就很夸张了。另外,Asio 的取消定制不管是 cancellation condition 还是 cancellation slot 都缺少不了用户协作。要是没有特殊需求的话,直接使用 Asio 协程或者 parallel group 就足够了。
(所以 Asio 还是一如既往的有 🤏 点抽象啊。)