本文是 Talking Async Ep2: Cancellation in depth 的个人观片笔记,演讲者为 Asio 作者 Christopher Kohlhoff,主要内容是介绍 Asio 对异步操作的取消设计。

前提说明

我真的很想结束这个话题,之前的笔记零零散散讨论过很多次取消点了,总是不满意,所以还是翻了点库存资料再调研一遍。但是这种抠细节的事情抠多了没意思,我暂时也没有改进 uring_exec 的计划,这回简单写点草稿算了。如有错误,还请指正。

另外一提,EP1 讨论了 Asio 的异步接口演变历史,话题相对轻松很多。我也做了简单的文本转译

Man-in-the-middle proxy

awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  tcp::socket server(client.get_executor());
  steady_clock::time_point deadline {};

  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    // 结合了 C++20 协程和 operator || 重载的取消设计
    co_await (
      transfer(client, server, deadline) ||
      transfer(server, client, deadline) ||
      watchdog(deadline)
    );
  }
}

EP1 展示了 Asio 使用 C++20 协程 的取消设计,非常的直观:任意一个异步操作成功,就取消其余的异步操作。但这只是一种高层抽象的设计,作者专门录制了 EP2 演讲来说明这种设计的低层设施是怎样的,以及想要定制行为的话又该怎么做。

注:不会提到 Asio 内部复杂的源码实现,主要是看个思路。

example

讨论用的示例依然是和 EP1 相同的流程图,也就是做一个 proxy server,实现

clientservertarget

这里每个红色的有向边都代表一个异步操作,流程从最上方的 async_accept() 开始执行。

Step 0: basic server

// https://github.com/chriskohlhoff/talking-async/blob/master/episode2/step_0.cpp

class proxy
  : public std::enable_shared_from_this<proxy>
{
  // ...省略一些实现,
  // 程序开始时通过 accept 获得 client socket,
  // 然后构造 proxy{client_socket}
  // 用户主动 proxy.connect_to_server(target) 来启动上述流程图
  // 共有 2 条异步链路:self->read_from_client() 和 self->read_from_server()
  // 分别对应于上述流程图的两个循环,下面只展示 read_from_server

private:
  void read_from_server()
  {
    auto self = shared_from_this();
    server_.async_read_some(
        buffer(data_from_server_),
        [self](std::error_code error, std::size_t n)
        {
          if (!error)
          {
            self->write_to_client(n);
          }
          else
          {
            self->stop();
          }
        }
      );
  }

  void write_to_client(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(
        client_,
        buffer(data_from_server_, n),
        [self](std::error_code error, std::size_t /*n*/)
        {
          if (!error)
          {
            self->read_from_server();
          }
          else
          {
            self->stop();
          }
        }
      );
  }

  void stop()
  {
    client_.close();
    server_.close();
  }

  tcp::socket client_;
  tcp::socket server_;
  std::array<char, 1024> data_from_client_;
  std::array<char, 1024> data_from_server_;
};

暂且不提协程,先回到传统的 handler 回调形式。流程图其中的一个 read from server 到 write to client 循环如上所示。现在的需求是,我们要基于这份代码增加一个超时取消的实现。

Step 1: race condition

class proxy
  : public std::enable_shared_from_this<proxy>
{
  // 只需在 proxy.connect_to_server(target) 中新增一条 self->watchdog() 异步链路
  void watchdog()
  {
    auto self = shared_from_this();
    watchdog_timer_.expires_at(deadline_);
    watchdog_timer_.async_wait(
        [self](std::error_code /*error*/)
        {
          if (!self->is_stopped())
          {
            auto now = steady_clock::now();
            if (self->deadline_ > now)
            {
              // watchdog 也是一个异步循环
              self->watchdog();
            }
            else
            {
              self->stop();
            }
          }
        }
      );
  }

  // 修改 stop,需要对 watchdog 自己的状态也做处理
  void stop()
  {
    client_.close();
    server_.close();
    // 因为该接口不只是 watchdog 主动调用,
    // 因此要处理 watchdog 关联的任何有状态成员
    watchdog_timer_.cancel();
  }

  // 表示 watchdog 当中的 self->stop() 已经执行过
  // (或者因其他因素,client 和 server 可能主动关闭)
  bool is_stopped() const
  {
    // 使用 socket 状态表示,省掉一个单独的状态变量
    return !client_.is_open() && !server_.is_open();
  }

  void read_from_client()
  {
    deadline_ = std::max(deadline_, steady_clock::now() + 5s);
    // ...剩下操作保持不变
  }

  // 新增的数据结构
  steady_clock::time_point deadline_;
  asio::steady_timer watchdog_timer_;
};

思路在 EP1 也见识过,就是使用单独的 watchdog 异步链路来完成超时取消。所以这里只要静置超过五秒就会关掉连接。

  // Klemens 的意见
  void read_from_client()
  {
    watchdog_timer_.expires_at(steady_clock::now() + 5s);
    watchdog_timer_.async_wait(/* 判断正常触发还是 aborted */);
    // ...剩下操作保持不变
  }
Klemens
能不能直接点,每次异步 IO 时更新 timer,这样就不用赋值 deadline,并且也不用 watchdog 链路,顺便 async_wait 里面也不用检查 deadline 了?(这位是懂装不懂的高手)
Christopher
可以的,但是这样做会(在每次修改 expires_at 时)有不必要的定时器唤醒,所以……
Klemens
所以说,expires_at 被更新就是内部做一个 .cancel() 然后跑到 async_wait 的操作,然后触发 asio::error::operation_aborted。总之使用 deadline 就最小化唤醒次数对不。大师说得太对了!

Step 2: completion condition

simple-case 没有取消需求的情况,非常简单

race-condition 取消操作,意味着存在竞态条件

前面的代码是不太可取的,在深入前需要先解释一下取消所带来的竞态条件(race condition)。取消操作是必须异步实现的(这方面没有讨论的余地),这意味着用户可能在发出取消请求后,仍然会收到成功而非被取消的 completion handler(理解为回调就好了)。这里看似影响不大,但是潜在更深层次的问题:如果你在 handler 当中也尝试发出异步操作,那么整个异步操作链可能永远无法结束。

  // step 1 用到的代码
  void stop()
  {
    client_.close();
    server_.close();
    watchdog_timer_.cancel();
  }

但是在前面的 step 1 当中,却不存在这种问题。这是因为 stop() 依赖了 socket 的状态,这是直接暴力「关闭」了连接,并非「取消」对应的操作。

  void stop()
  {
    client_.cancel();
    server_.cancel();
    watchdog_timer_.cancel();
  }

如果使用了实际上的 cancel 取消操作,那么将引发上述的竞态条件问题。而且恰好是 read-write-loop 模式,整个异步操作链可能永远无法结束。

Klemens
这不是真的取消,这是关闭。
Christopher
在 Asio 当中,关闭是一种取消,但是它永久地改变了状态。
  void stop()
  {
    client_.cancel();
    server_.cancel();
    watchdog_timer_.cancel();
    // 新增!
    stopped_ = true;
  }

  bool is_stopped() const
  {
    // 不能依赖 socket 状态
    // 注:作者的原文件应该是忘记改了,个人手动修复
    return stopped_;
  }

  void read_from_server()
  {
    deadline_ = std::max(deadline_, steady_clock::now() + 5s);

    auto self = shared_from_this();
    server_.async_read_some(
        buffer(data_from_server_),
        [self](std::error_code error, std::size_t n)
        {
          // 新增判断,注意前面讨论过,竞态条件不一定返回错误
          if (!error && !self->is_stopped())
          {
            self->write_to_client(n);
          }
          else
          {
            self->stop();
          }
        }
      );
  }

  void write_to_client(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(
        client_,
        buffer(data_from_server_, n),
        [self](std::error_code error, std::size_t /*n*/)
        {
          // 新增判断
          if (!error && !self->is_stopped())
          {
            self->read_from_server();
          }
          else
          {
            self->stop();
          }
        }
      );
  }

但是可以观察到,前面 step 1 没有问题,是恰好利用状态来解决的竞态条件。那么简单点我们单独在 handler 里面设置一个外部的、不再依赖于 socket 本身的状态检查是否可行?

事实上,竞态条件依然存在。因此异步写操作(async_write)实际上是由多个异步部分写操作(async_write_some)组成的,这是面向内核提供性能优化的设计。这意味着我们的状态无法在完整的异步写操作完成前检测出来。

NOTE: 性能优化指的是内核 buffer 一般是页对齐分配的,用户可以主动避免读/写跨页导致额外开销。

Klemens
你的意思是说,async_write() = multiple async_write_some()s。然后 async_write_some 完成又恰好存在竞态条件时,只会在用户不可见的内部再次异步地发出下一个 async_write_some,所以不会有机会判断 stopped_ 状态。这导致必须完成整个异步写操作,也意味着取消操作实质上不存在。
Christopher
是。
Klemens
这是多线程才有的问题吗?
Christopher
不是。
Klemens
我懂了。这是关于 Asio 并行的异步链路之间的竞态条件,只要异步,即使是单线程也会存在这种问题。同时,Asio 是通过 asio::post 投递的 completion token/handler,因此,异步操作完成时到异步操作完成通知至用户时是存在时间延迟的。并且我的 asio::io_context 越是繁忙,内部队列就会拉得越长,问题就会倾斜得更加严重!不愧是大师,说得太深入了!
  void write_to_client(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(
        client_,
        buffer(data_from_server_, n),
        // 新增 completion condition!
        [self](std::error_code error, std::size_t n) -> std::size_t
        {
          auto completion_condition = asio::transfer_all();
          // 我们现在有机会进行状态判断
          if (!self->is_stopped())
          {
            // 表示下一次尽最大努力交付(默认行为),
            // n 为迄今已经传输的字节数,而函数对象返回尽可能大的值
            return completion_condition(error, n);
          }
          else
          {
            // 没有剩余字节数需要写,是时候提交 completion handler 了
            return 0;
          }
        },
        // 这是之前的 completion handler
        [self](std::error_code error, std::size_t /*n*/)
        {
          if (!error && !self->is_stopped())
          {
            self->read_from_server();
          }
          else
          {
            self->stop();
          }
        }
      );
  }

要解决这个问题,Asio 引入了 completion condition。这是一个函数对象,将会在每一个 async_write_some 之间调用。其基本作用就是用于检测下一次尝试写操作的最大字节数(就是上面提到的优化机会,应该也有 QoS 之类的用途吧),但是我们可以在里面添加用户定义的行为,比如我们现在需要的状态判断,此时通过返回 0 字节来提示写操作应该结束。

至少这个程序总算是解决了竞态条件问题。作者建议如果是简单的需求,shutdown/close 其实也是容许的。

Step 3: cancellation slot

basic_stream_socket::cancel

Cancel all asynchronous operations associated with the socket.

但是我们还没有达到想要的粒度。如 cancel 注释所言,它取消的是该 IO 对象(socket)关联的所有异步操作。我们需要更加精细的 per-operation cancellation。

class proxy
  : public std::enable_shared_from_this<proxy>
{
  // 现在 proxy 有 4 条异步链路,
  // 除了 step 1 已有的 3 条,还新增 heartbeat
  void heartbeat()
  {
    auto self = shared_from_this();
    heartbeat_timer_.expires_after(1s);
    heartbeat_timer_.async_wait(
        [self](std::error_code /*error*/)
        {
          if (!self->is_stopped())
          {
            // 先不管 asio::cancellation_type
            self->heartbeat_signal_.emit(asio::cancellation_type::total);
            self->heartbeat();
          }
        }
      );
  }

  void read_from_server()
  {
    auto self = shared_from_this();
    server_.async_read_some(
        buffer(data_from_server_),
        // 通过 bind_cancellation_slot 组成新的 completion token
        asio::bind_cancellation_slot(
          // 使得该操作关联到 heartbeat_signal_ 的信号槽
          heartbeat_signal_.slot(),
          // 这里依然是 completion handler 的内容
          [self](std::error_code error, std::size_t n)
          {
            if (!error)
            {
              self->num_heartbeats_ = 0;
              self->write_to_client(n);
            }
            // 当 heartbeat 发出 emit 时,会走到该路径
            else if (error == asio::error::operation_aborted)
            {
              ++self->num_heartbeats_;
              // 随便写点内容到 client
              self->write_heartbeat_to_client();
            }
            else
            {
              self->stop();
            }
          }
        )
      );
  }

  void write_to_client(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(
        client_,
        buffer(data_from_server_, n),
        [self](std::error_code error, std::size_t /*n*/)
        {
          if (!error)
          {
            self->read_from_server();
          }
          else
          {
            self->stop();
          }
        }
      );
  }

  void write_heartbeat_to_client()
  {
    std::size_t n = asio::buffer_copy(
        buffer(data_from_server_),
        std::array<asio::const_buffer, 3>{
          buffer("<heartbeat "),
          buffer(std::to_string(num_heartbeats_)),
          buffer(">\r\n")
        }
      );

    write_to_client(n);
  }

  asio::steady_timer heartbeat_timer_;
  asio::cancellation_signal heartbeat_signal_;
  std::size_t num_heartbeats_ = 0;
};

Asio 通过引入 cancellation slot 来达成 per-operation cancellation。需要注意 cancellation slot 并不能直接解决前面的竞态条件问题(很明显,异步操作完成后 emit 是没用的)。作者提到此前在 EP1 展示的重载操作符其实也是内部使用 cancellation slot 来实现,但是会有额外标记(类似前面的 stopped_)来处理竞态条件。也就是说,cancellation slot 只是解决了 cancel 的粒度问题。

个人粗看了一眼 async_write 实现,对于前面提到的 async_write_some 组合问题,cancellation slot 是可以解决的(见链接,每一步都有检查)。但是这种机制作为单个异步操作粒度的设计,应该没办法解决 step 2 图中关于两个异步操作(分为 2 个不同的 slot)的竞态条件。

cancellation slot 是符合零成本抽象的,不使用 bind_cancellation_slot 将不会有任何运行时开销。作者还提到这种机制除了编译时检测以外还有运行时检测,以及使用后的 slot 是可回收的设计。总之这是一个比用户手写流程更好的选择。

Klemens
除了编译时检测以外还有运行时检测?我懂了,你指的是……
Christopher
指的是协程内部使用了类型擦除的设计,它必须要运行时检测。

cancellation-slot cancellation slot 是可回收复用的

作者建议不要直接使用 cancellation slot 这种为高精度取消而实现的低层构建块,应该使用面向用户的高层设计:EP1 当中的协程就是一个好例子。

Step 4: parallel group

  void read_from_server()
  {
    heartbeat_timer_.expires_after(1s);

    auto self = shared_from_this();
    // 直接省掉了单独的 heartbeat(),
    // 通过 make_parallel_group 描述两个并行的异步链路
    asio::experimental::make_parallel_group(
        [this](auto token)
        {
          return server_.async_read_some(
              buffer(data_from_server_),
              token
            );
        },
        [this](auto token)
        {
          return heartbeat_timer_.async_wait(token);
        }
      ).async_wait(
        // 异步等待的行为是可定制的(Asio 目前提供 4 种选项,也可以自己定制)
        // 包括 wait_for_all、wait_for_one、wait_for_one_error 和 wait_for_one_success
        // 这里表示只等待任意一方的完成,然后取消其他剩下的异步操作
        asio::experimental::wait_for_one(),
        [self](
          std::array<std::size_t, 2> order,
          std::error_code read_error, std::size_t n,
          std::error_code /*timer_error*/)
        {
          // 看谁完成
          switch (order[0])
          {
          case 0: // read
            if (!read_error)
            {
              self->num_heartbeats_ = 0;
              self->write_to_client(n);
            }
            else
            {
              self->stop();
            }
            break;
          case 1: // timer
            ++self->num_heartbeats_;
            self->write_heartbeat_to_client();
            break;
          }
        }
      );
  }

除了协程以外,parallel group 也是一种面向用户的选择。它不再需要用户声明 asio::cancellation_signal,并且能帮你内部处理好事务。

Klemens
所以 Asio 协程的 || 和 && 重载其实是基于 parallel group 实现的,对吧?
Christopher
没错。

Step 5: async initiate

在视频演示时,Asio 还没有集成好对异步信号等待的取消支持,接下来的 step 5 和 step 6 会展示如何将 cancellation slot 集成到用户定义的异步操作当中,从而实现满血版 async_wait for signal。

template <typename CompletionToken>
auto async_wait_for_signal(
    asio::signal_set& sigset,
    CompletionToken&& token)
{
  return asio::async_initiate<CompletionToken,
    void(std::error_code, std::string)>(
      [&sigset](auto handler)
      {
        auto executor =
          asio::get_associated_executor(
              handler,
              sigset.get_executor()
            );

        // 做点简单的定制,比如转换 signo 到 signame
        auto intermediate_handler =
          [handler = std::move(handler)](
              std::error_code error,
              int signo
            ) mutable
          {
            std::string signame;
            switch (signo)
            {
            case SIGABRT: signame = "SIGABRT"; break;
            case SIGFPE: signame = "SIGFPE"; break;
            case SIGILL: signame = "SIGILL"; break;
            case SIGINT: signame = "SIGINT"; break;
            case SIGSEGV: signame = "SIGSEGV"; break;
            case SIGTERM: signame = "SIGTERM"; break;
            default: signame = "<other>"; break;
            }

            // handler 可以是 rvalue 形式的,只会调用一次
            std::move(handler)(error, signame);
          };

        sigset.async_wait(
            asio::bind_executor(
              executor,
              std::move(intermediate_handler)
            )
          );
      },
      // 别忘了 completion token
      token
    );
}

awaitable<void> timed_wait_for_signal()
{
  asio::signal_set sigset(co_await this_coro::executor, SIGINT, SIGTERM);
  asio::steady_timer timer(co_await this_coro::executor, 5s);

  auto result = co_await (
      async_wait_for_signal(sigset, use_awaitable) ||
      timer.async_wait(use_awaitable)
    );

  switch (result.index())
  {
  case 0:
    std::cout << "signal finished first: " << std::get<0>(result) << "\n";
    break;
  case 1:
    std::cout << "timer finished first\n";
    break;
  }
}

int main()
{
  asio::io_context ctx;
  co_spawn(ctx, timed_wait_for_signal(), detached);
  ctx.run();
}

step 5 展示的是一个简单的用户定义异步操作,还没有任何取消支持。具体细节可以了解 asio::async_initiate,至少你可以从例子得知,实现一个用户流程后,就可以作为 Asio 协程去使用。

Klemens
所以 async_initiate 是用于定制单个异步操作,而 async_compose 是用于组合多个已有的异步操作,对吧?
Christopher
没错。
Klemens
所以 handler 在这个例子中是一个定制点吗?比如我要是使用了 use_awaitable 的 completion token,那么该 handler 实现将会推导为一个恢复协程的回调,对吧?
Christopher
没错。取决于 token 的选择。
Klemens
所以 associated_executor 在这个例子中也是一个定制点吗?比如 handler 没有特殊要求的话,那么将使用与 sigset 关联的 executor,对吧?
Christopher
没错。取决于 handler 的选择。
Klemens
所以 bind_executor 在这个例子中就能让这个内部实现的回调运行到可能由用户定义的 executor 指向的执行上下文当中,对吧?
Christopher
没错。

Step 6: cancellation types

template <typename CompletionToken>
auto async_wait_for_signal(
    asio::signal_set& sigset,
    CompletionToken&& token)
{
  return asio::async_initiate<CompletionToken,
    void(std::error_code, std::string)>(
      [&sigset](auto handler)
      {
        auto cancellation_slot =
          asio::get_associated_cancellation_slot(
              handler,
              asio::cancellation_slot()
            );

        // 为了方便,这里直接使用了运行时检测的方式
        if (cancellation_slot.is_connected())
        {
          // 可以理解为注册一个取消回调,当外部注入取消类型对象时,就会触发 cancel
          cancellation_slot.assign(
              [&sigset](asio::cancellation_type /*type*/)
              {
                sigset.cancel();
              }
            );
        }

        auto executor = // ...

        auto intermediate_handler = // ...

        sigset.async_wait(
            asio::bind_executor(
              executor,
              std::move(intermediate_handler)
            )
          );
      },
      token
    );
}

接下来基于 step 5 程序继续增加取消支持。cancellation slot 与 executor 的设计相似,都是通过关联(associate)handler 去获取,接着只需要注册一个取消回调。

cancellation-types

最后讨论的是一直被忽略的 cancellation type。我们需要理解取消并不是简单的停止操作,还要考虑取消成功后对应的 IO 对象的状态影响。如果我们的取消是通过 close 做到的,那还意味着该对象以后不得有任何后续操作;所以要想保证取消后仍可以向对象发出新的异步操作,那就要得知取消操作后的对象是否仍然合法。

根据副作用造成的影响,Asio 提供了三种 cancellation type:terminal、partial 和 total。

Klemens
使用场景?
Christopher
官网写了。
取消成功时的保证 使用场景
terminal 该操作具有未明确的副作用,唯一安全的做法是关闭或销毁 I/O 对象 有状态的消息分帧协议实现(如异步操作发送/接收完整消息)。如果在消息体传输中途取消操作,无法向完成处理程序报告合理状态
partial 该操作具有明确定义的副作用,完成处理程序会具体说明这些副作用 组合操作如 async_readasync_write。如果在所有字节传输完成前取消,完成处理程序会收到已传输的总字节数。调用方可利用此信息发起新操作传输剩余字节
total 该操作没有可通过 API 观察到的副作用 - 传输零/非零字节的低级系统调用
- 即使成功也没有副作用的等待就绪操作
- 完全缓冲的消息帧协议(可存储部分消息供下次操作重用)

个人理解 Asio 除了在尝试处理取消场景下碰到的合法状态问题,还顺手解决了部分成功问题。用户使用 partial,就是流式场景允许给你返回成功的部分值(这是主动 emit 的,也是知情的);而 total 是用了不可观测的黑盒,上层得到的只会是完整值,因此尤其适合上表提到的消息帧协议;terminal 没啥好说的,既然不可控,直接结束流程算了。这里的难点在于实现是靠库和用户协作的,下一步的例子可以感受一下。

Step 7: message frame example

// 假设你现在是一个 server,每次监听都能得到一个 client 并创建 session
awaitable<void> session(tcp::socket client)
{
  message_reader<tcp::socket> reader(client);

  for (;;)
  {
    std::string message = co_await reader.read_message();

    if (!message.empty())
    {
      std::cout << "received: " << message << "\n";
    }
    else
    {
      co_return;
    }
  }
}

// 这个 session reader 就是不断的读取用户输入,通过 '|' 分帧
template <typename Stream>
class message_reader
{
public:
  message_reader(Stream& stream)
    : stream_(stream)
  {
  }

  awaitable<std::string> read_message()
  {
    auto [e, n] =
      co_await asio::async_read_until(
        stream_,
        dynamic_buffer(message_buffer_),
        '|',
        use_nothrow_awaitable
      );

    if (e)
    {
      co_return std::string();
    }

    std::string message(message_buffer_.substr(0, n));
    message_buffer_.erase(0, n);
    co_return message;
  }

private:
  Stream& stream_;
  std::string message_buffer_;
};

为了说明 cancellation type 的使用,作者给了一个协程实现的消息帧模拟器。目前没什么特别的。

Step 8: coroutine cancellation state

awaitable<void> timeout(steady_clock::duration duration)
{
  asio::steady_timer timer(co_await this_coro::executor);
  timer.expires_after(duration);
  co_await timer.async_wait(use_nothrow_awaitable);
}

awaitable<void> session(tcp::socket client)
{
  message_reader<tcp::socket> reader(client);

  for (;;)
  {
    // session 和此前类似,但是多了超时管理
    auto result = co_await (
        reader.read_message() ||
        timeout(5s)
      );

    switch (result.index())
    {
    case 0:
      if (!std::get<0>(result).empty())
      {
        std::cout << "received: " << std::get<0>(result) << "\n";
      }
      else
      {
        co_return;
      }
      break;
    case 1:
      std::cout << "timed out\n";
      break;
    }
  }
}

template <typename Stream>
class message_reader
{
public:
  message_reader(Stream& stream)
    : stream_(stream)
  {
  }

  awaitable<std::string> read_message()
  {
    co_await this_coro::reset_cancellation_state(
        // 一个 reset filter,将 total 转换为 partial
        [](cancellation_type requested)
        {
          // 这里的意思是,asio::async_read_until 并不支持 total 取消(详见文档)
          // 但是我们要让整个 read_message 协议支持 total 语义
          if ((requested & cancellation_type::total) != cancellation_type::none)
          {
            return cancellation_type::partial;
          }
          else
          {
            return requested;
          }
        }
      );
    
    // 转换后的 cancellation state 将影响到下面的操作

    auto [e, n] =
      co_await asio::async_read_until(
        stream_,
        dynamic_buffer(message_buffer_),
        '|',
        use_nothrow_awaitable
      );

    // 还可以得知是否取消,以及取消的类型
    if ((co_await this_coro::cancellation_state).cancelled() != cancellation_type::none)
    {
      co_return std::string();
    }

    // 大概是协程中表示 per-operation 的意思
    co_await this_coro::reset_cancellation_state(); // Reset to default, which is terminal only.

    if (e)
    {
      co_return std::string();
    }

    std::string message(message_buffer_.substr(0, n));
    message_buffer_.erase(0, n);
    co_return message;
  }

private:
  Stream& stream_;
  std::string message_buffer_;
};

这里在 step 7 的基础上尝试为 read message 支持 total 取消类型。asio::async_read_until 本身仅支持 terminal/partial 取消类型,但是我们通过用户协作的内部缓冲管理,依然能够为达成整体 total 取消的支持。也就是说,read message 在能支持异步取消的同时,要么提供完整帧,要么提供空帧,而不会是部分帧。

Klemens
Asio 协程 operator || 引起的取消,也就是内部发出了 emit(),那么这个 emit 对应的取消是什么类型的呢?
Christopher
cancellation type 是位图设计,默认是所有的类型(bit)全部开启。
Klemens
那么 reader 作为被取消方,它该响应哪个取消类型?
Christopher
被取消方是响应保证最强的取消类型。比如在这个例子中,reader 通过 reset filter 使得其保证最强的取消类型为 partial。但是需要注意,默认情况下保证最强的取消类型为 terminal。

总结

Asio 的取消设计算是简单过目了,官方示例还是比较少,希望没理解错。其实说简单点,取消设计的一个通用思路就是总是在 IO 操作前去确认取消状态(提点文章无关的,Kotlin 协程就是这样直接而且遥遥领先),但是 Asio 考虑得更深,比如一开始就说明了确认谁的状态这种竞态条件问题,还有更多的细节比如粒度、IO 对象、定制还有不同异步模式的兼容,所有因素组合起来就很夸张了。另外,Asio 的取消定制不管是 cancellation condition 还是 cancellation slot 都缺少不了用户协作。要是没有特殊需求的话,直接使用 Asio 协程或者 parallel group 就足够了。

(所以 Asio 还是一如既往的有 🤏 点抽象啊。)