cover

这是早段时间 Asio 作者 Christopher Kohlhoff 在油管上的演讲。如果你对 Asio 的发展历史感兴趣,或者想了解如何使用 modern 的方式来进行 C++20 网络编程,那可以看下这篇文本翻译。

Asio 的接口演变

cpp11-callback-style

先回到前 C++11 时代,如果要实现一个异步回调,那么一般可选三种形式:可以是 operator()std::bind() 或者是 lambda

asio-api-old

此时,Asio 提供的异步函数 API 如上图 foo 所示[1],用户提供的回调称为 completion handler[2]。回调函数签名的设计也易于使用:

  • 参数列表可以提供是否成功或者传输了多少字节等信息。
  • 返回类型为 void,原因是回调函数在后台执行,不需要关注它的返回。

[1] 更准确点说 foo 应该是 initiating function,用户主动调用它来启动 asynchronous operation。后面会把 initiating function 视为异步函数。

[2] asynchronous operation 在完成时调用 completion handler。这方面的概念细节可以看 Asio 官网的异步模型介绍。

asio-api-new

在 2014-2015 年,作者对 API 增加了返回类型推导以提高灵活性。一方面是异步函数接收 completion token(而不是 completion handler),另一方面返回类型是基于 completion token 类型得到的 DEDUCED 类型。

asio-api-use

新的 API 具有灵活性是因为 Asio 在内部提供了一个 async_result 的定制点,它通过以下三个输入来萃取生成异步操作:

  • Asio 异步函数的签名。
  • Asio 异步函数的实现。
  • 用户提供的 completion token。

use-callback

最简单的例子就是传入一个回调(比如 lambda),这个时候因为 async_result 定制点采用了默认的行为,推导出来的 completion token 等同于之前的 completion handler,返回类型也同样是 void,异步操作也依然是在后台运行任务以及通过回调收集得到的结果。因此对于用户来说,以前的代码并不需要作出任何改动。

use-future

那多出来的抽象不是毫无意义?当然不是。为了让你知道 completion token 能做什么,上图展示了一个开箱即用的 use_future(它也是 completion token)。use_future 本身只是一个占位符而非某种回调,当作为 completion token 在内部输入给 async_result 后,生成的异步操作将返回一个 future,因此你可以用它来等待操作的完成。

use-future-2

这个时候我们就多了一种新的异步编程方式。这个例子执行异步函数 async_read_some(),它返回的字节数可以通过 future.get() 的方式去得到(而不是回调的方式)。

use-yield-context

不仅如此,我们可以轻松的支持更多异步编程方式。在这个例子中,我们支持了有栈协程的编程方式,这里的 completion token 并不是一个占位符,而是一个有栈协程的上下文 yield,因此异步操作的实现实际是通过挂起/恢复来完成,直到异步操作完成才返回给你传输的字节数。这样做可以让你编写形如同步风格的异步代码。

use-boost-fiber

还有一个类似的是支持 boost.fiber,意思相近,这里就不多介绍了。现在你可以知道,通过 completion token 不仅能轻易支持多种异步编程方式,还可以使得第三方库的定制变得更加简单。

use-awaitable

最后当然少不了 C++20 coroutine。后面将展示如何用 Asio with C++20 写出 awesome 的代码。

Man-in-the-middle proxy

example

在后面的多个版本的代码中我们都使用同一个流程图,也就是做一个 proxy server,实现

\[client \leftrightarrows server \leftrightarrows target\]

这里每个红色的有向边都代表一个异步操作,流程从最上方的 async_accept() 开始执行。

Step 0: C++11

在踏出第一步前,先品鉴一下 C++11 时代用到的代码风格。代码注释中的顺序编号可以帮助你更好理解异步代码:

#include <array>
#include <asio.hpp>
#include <iostream>
#include <memory>

using asio::buffer;
using asio::ip::tcp;

// proxy 类记录 client 和 server
class proxy : public std::enable_shared_from_this<proxy> {
  public:
  proxy(tcp::socket client)
      : client_(std::move(client))
      // server 和 client 将运行在同一个 IO context
      , server_(client_.get_executor())
  {
  }

  // 2. 这是构造的 proxy 首先调用的异步函数(async_connect)
  // server 将作为一个客户端连接到远程的 target(作者在示例中使用 www.boost.org:80)
  void connect_to_server(tcp::endpoint target)
  {
    auto self = shared_from_this();
    // 当连接到 target 完成后,执行 read_from_client() 和 read_from_server()
    // 实际上 read_from_client() 和 read_from_server() 内部只调用 async_read_some()
    // 因此在 io_context 中是并发的执行 2 条 async_read_some() 异步操作链
    server_.async_connect(target, [self](std::error_code error) {
      if (!error) {
        self->read_from_client();
        self->read_from_server();
      }
    });
  }

  private:
  void stop()
  {
    client_.close();
    server_.close();
  }

  // 3. 当你使用 client 写入数据,比如用 telnet 写入"GET / HTTP/1.0"
  // Asio 就会实际处理 read 操作,读到的数据放入 data_from_client_缓冲中
  // 后续将通过 write_to_server() 将读到的 n 个字节写到远程 target
  void read_from_client()
  {
    auto self = shared_from_this();
    client_.async_read_some(buffer(data_from_client_),
        [self](std::error_code error, std::size_t n) {
          if (!error) {
            self->write_to_server(n);
          } else {
            self->stop();
          }
        });
  }

  // 4. 通过 async_write() 将 data_from_client_缓冲中的数据写到远程 target
  // 写完后重新从 client 读取数据,形成了流程图中的循环
  void write_to_server(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(server_, buffer(data_from_client_, n),
        [self](std::error_code ec, std::size_t /*n*/) {
          if (!ec) {
            self->read_from_client();
          } else {
            self->stop();
          }
        });
  }

  // 5. 这个分叉的另一条链路
  // 当 proxy server 执行第 4 步时,远程 target 也会给出 response
  // 因此会读数据到 data_from_server_缓冲
  void read_from_server()
  {
    auto self = shared_from_this();
    server_.async_read_some(asio::buffer(data_from_server_),
        [self](std::error_code error, std::size_t n) {
          if (!error) {
            self->write_to_client(n);
          } else {
            self->stop();
          }
        });
  }

  // 6. proxy 转发 data_from_server_内容给 client
  void write_to_client(std::size_t n)
  {
    auto self = shared_from_this();
    async_write(client_, buffer(data_from_server_, n),
        [self](std::error_code ec, std::size_t /*n*/) {
          if (!ec) {
            self->read_from_server();
          } else {
            self->stop();
          }
        });
  }

  tcp::socket client_;
  tcp::socket server_;
  std::array<char, 1024> data_from_client_;
  std::array<char, 1024> data_from_server_;
};

void listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  // 1. 第一个异步操作,accept 得到的 client 在异步操作完成时传入回调
  // 然后构造 proxy server,并尝试连接到 target
  acceptor.async_accept(
      [&acceptor, target](std::error_code error, tcp::socket client) {
        if (!error) {
          std::make_shared<proxy>(std::move(client))->connect_to_server(target);
        }

        // 这是 Asio 常用的一种“递归”的写法,可以再次 accept 另一个 client
        // 因此可以处理多个客户端
        listen(acceptor, target);
      });
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    listen(acceptor, target_endpoint);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

代码稍微有点长,但这个代码只需把流程图白框的内容视为 completion handler,那就是一个照抄的流程。

NOTE: 如果有些同学不熟悉 Asio,我在这里简单总结一下:

  • 如何描述异步:使用 Asio 的异步函数来启动异步操作,用户传入完成异步操作时需要的回调。
  • 如何描述回调:使用 lambda
  • 如何描述错误:使用 std::error_code
  • 异步操作在哪里执行:io_context.run() 对应的线程。
  • 异步操作链之间有何关联:存在共享的 proxy 对象。

Step 1: coroutine

现在实现一个基本的 Asio with awesome C++20 代码:将回调改成协程。

#include <array>
#include <asio.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::use_awaitable;
using asio::ip::tcp;

// 现在 proxy 不需要共享的读写缓冲
struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  try {
    std::array<char, 1024> data;

    for (;;) {
      auto n
          = co_await state->client.async_read_some(buffer(data), use_awaitable);

      co_await async_write(state->server, buffer(data, n), use_awaitable);
    }
  } catch (const std::exception& e) {
    state->client.close();
    state->server.close();
  }
}

// 3. 使用协程处理转发
awaitable<void> server_to_client(proxy_state_ptr state)
{
  // 协程默认使用 exception 来处理错误
  try {
    // 由于不需要拆分函数,因此缓冲可以是局部变量
    std::array<char, 1024> data;

    // 描述循环的异步操作链
    for (;;) {
      // 读后写的操作
      // 整个函数看起来是不是非常的“同步”?
      auto n
          = co_await state->server.async_read_some(buffer(data), use_awaitable);

      co_await async_write(state->client, buffer(data, n), use_awaitable);
    }
  } catch (const std::exception& e) {
    state->client.close();
    state->server.close();
  }
}

// 2. server 主动连接到远程 target
// 建立连接后并发执行 client_to_server() 和 server_to_client()
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  co_await state->server.async_connect(target, use_awaitable);

  auto ex = state->client.get_executor();
  // 作者提示这里不应使用 co_await client_to_server(state)
  // 因为并不是等到 client_to_server() 执行完才执行 server_to_client()
  // 所以要使用 co_spawn 生成独立的异步操作链
  co_spawn(ex, client_to_server(state), detached);

  co_await server_to_client(state);
}

// 1. listen() 需要返回 awaitable<void>
awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  // 现在我们不需要“递归”
  for (;;) {
    // 使用 use_awaitable 即 C++20 协程来替代回调
    // co_await 会让出协程,当返回时,已经 accept 得到 client
    // 默认 client 和 acceptor 有相同的 executor
    auto client = co_await acceptor.async_accept(use_awaitable);

    auto ex = client.get_executor();
    // 使用 co_spawn 生成新的异步操作链,现在一条链路处理 accept,另一条处理 proxy
    // 首个参数接受 executor,因此指定了异步操作链的执行上下文
    // 使用 spawn 出来的 proxy() 协程处理连接
    // detached 表示不关心协程的结果,本质是 completion token 生成了一个空的 completion handler
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    // 将在 context 执行首个 listen() coroutine
    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

现在这个版本的 proxy 使用了 use_awaitable 的 completion token,代码风格接近同步编程。

相比之下,可以看下有什么不同:

  • 如何描述异步:使用 co_spawn 创建协程,使用 co_await 收集异步结果。
  • 如何描述回调:没有回调!
  • 如何描述错误:使用 try-catch
  • 异步操作链之间有何关联:存在共享的 proxy 对象,但要求的状态变少了。

当然这只是第一步,我们后面会继续打磨。

Step 2: completion token adapter

前面看到 Step 1 处理错误需要使用异常,这在 Asio 当中并非必要。你可以利用 completion token 的扩展性,通过封装简单的 adapter 来改变这种行为。

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;

// 1. 新的 completion token
constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    // 2. 现在返回 tuple,第一个 element 就是 error code
    auto [e1, n1] = co_await state->client.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->server, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

awaitable<void> server_to_client(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    auto [e1, n1] = co_await state->server.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->client, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  auto [e]
      = co_await state->server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    auto ex = state->client.get_executor();
    co_spawn(ex, client_to_server(state), detached);

    co_await server_to_client(state);
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

这段代码改动很少,但利用 completion token 的扩展性可以轻松改变错误处理的行为。如今可以搭配结构化绑定来返回得到 error code,而不是必须处理异常。

Step 3: timeout

超时管理也是必要的,有没有办法让客户端在一定时间内没操作就关掉连接呢?当然可以,这里给出了基本的操作。

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
  // 1. 引入新的共享时间戳
  steady_clock::time_point deadline;
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    // 2. 在任何操作前,先更新 deadline
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->client.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->server, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

awaitable<void> server_to_client(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->server.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      break;

    auto [e2, n2] = co_await async_write(
        state->client, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      break;
  }

  state->client.close();
  state->server.close();
}

// 3. 添加新的协程 watchdog,超时则关闭连接
awaitable<void> watchdog(proxy_state_ptr state)
{
  asio::steady_timer timer(state->client.get_executor());

  auto now = steady_clock::now();
  while (state->deadline > now) {
    timer.expires_at(state->deadline);
    co_await timer.async_wait(use_nothrow_awaitable);
    now = steady_clock::now();
  }

  state->client.close();
  state->server.close();
}

// 4. 现在 proxy() 里面有 3 条异步操操作链,相比之前的版本新增一个 watchdog 超时管理
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  auto [e]
      = co_await state->server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    auto ex = state->client.get_executor();
    co_spawn(ex, client_to_server(state), detached);
    co_spawn(ex, server_to_client(state), detached);

    // 当发生超时现象,这里执行 close
    co_await watchdog(state);
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

思路很简单,插入一个 watchdog 协程即可。但这仅是一个低配版的代码,称不上 awesome。作者提到如果希望更加精细的控制取消行为(per-operation cancellation)而不是直接全部关掉,也可以使用 Asio 的 cancellation slot 机制,但大部分人并不会直接使用这种偏底层的方法。

Step 4: awaitable operators

作者用了一个更加高层的抽象来执行取消操作,具体的办法就是使用逻辑或来组合异步操作。

awaitable operator

这里仍然是并行的执行,但任意一个协程完成(逻辑或短路),另外两个协程将被取消。

完整代码如下:

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using namespace asio::experimental::awaitable_operators;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

struct proxy_state {
  proxy_state(tcp::socket client)
      : client(std::move(client))
  {
  }

  tcp::socket client;
  tcp::socket server { client.get_executor() };
  steady_clock::time_point deadline;
};

using proxy_state_ptr = std::shared_ptr<proxy_state>;

awaitable<void> client_to_server(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->client.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      co_return;

    auto [e2, n2] = co_await async_write(
        state->server, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      co_return;
  }
}

awaitable<void> server_to_client(proxy_state_ptr state)
{
  std::array<char, 1024> data;

  for (;;) {
    state->deadline = std::max(state->deadline, steady_clock::now() + 5s);

    auto [e1, n1] = co_await state->server.async_read_some(
        buffer(data), use_nothrow_awaitable);
    if (e1)
      co_return;

    auto [e2, n2] = co_await async_write(
        state->client, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      co_return;
  }
}

// 这里 watchdog 的逻辑也简化了,只检查 timer
awaitable<void> watchdog(proxy_state_ptr state)
{
  asio::steady_timer timer(state->client.get_executor());

  auto now = steady_clock::now();
  while (state->deadline > now) {
    timer.expires_at(state->deadline);
    co_await timer.async_wait(use_nothrow_awaitable);
    now = steady_clock::now();
  }
}

awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  auto state = std::make_shared<proxy_state>(std::move(client));

  auto [e]
      = co_await state->server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    // 这里有 3 个并行且循环执行的异步操作链
    co_await (
        client_to_server(state) || server_to_client(state) || watchdog(state));

    // 只有任意一个协程完成才能往下走
    // 关闭连接的逻辑放在这里
    state->client.close();
    state->server.close();
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

那有没有办法是等待所有协程执行完?当然没问题,使用逻辑与即可。

Step 5: no shared state

前面我们的代码一直使用 shared state 的方式来描述一个 proxy 对象,但是显然前面版本的 proxy() 能通过一个 co_await 来保证三个异步操作链中对象的生命周期,因此我们能干掉 shared state。

#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
using namespace asio::experimental::awaitable_operators;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

// 1. 不需要区分 client_to_server() 和 server_to_client()
// 合并为一个函数
awaitable<void> transfer(
    tcp::socket& from, tcp::socket& to, steady_clock::time_point& deadline)
{
  std::array<char, 1024> data;

  for (;;) {
    deadline = std::max(deadline, steady_clock::now() + 5s);

    auto [e1, n1]
        = co_await from.async_read_some(buffer(data), use_nothrow_awaitable);
    if (e1)
      co_return;

    auto [e2, n2]
        = co_await async_write(to, buffer(data, n1), use_nothrow_awaitable);
    if (e2)
      co_return;
  }
}

awaitable<void> watchdog(steady_clock::time_point& deadline)
{
  // 使用 `this_coro::executor` 来获取当前上下文对应的 executor
  // 进一步干掉 shared state
  asio::steady_timer timer(co_await this_coro::executor);

  auto now = steady_clock::now();
  while (deadline > now) {
    timer.expires_at(deadline);
    co_await timer.async_wait(use_nothrow_awaitable);
    now = steady_clock::now();
  }
}

// 2. 使用局部变量即可
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  tcp::socket server(client.get_executor());
  steady_clock::time_point deadline {};

  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    co_await (
      transfer(client, server, deadline) ||
      transfer(server, client, deadline) ||
      watchdog(deadline)
    );
  }
  // 使用 RAII 管理资源,不需要手动 close()
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

因为消灭了 shared state,我们的代码其实是更易于维护,你甚至不需要手动执行 close()

Step 6: more awaitable operators

前面说到我们希望更精细的控制每个连接的行为而不是共用同一个超时管理,这通过 awesome 的 awaitable operator 其实是轻而易举的。

step-6

现在只需每一个连接用内部的 watchdog 来管理即可。

Step 7: return value

刚才的示例都回避了 co_await 的返回值,多个 awaitable 允许不同逻辑的组合,那该如何返回?

一个简单的结论是:

  • 如果使用逻辑与,那就是 std::tuple
  • 如果使用逻辑或,那就是 std::variant
#include <array>
#include <asio.hpp>
#include <asio/experimental/as_tuple.hpp>
#include <asio/experimental/awaitable_operators.hpp>
#include <iostream>
#include <memory>

using asio::awaitable;
using asio::buffer;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
namespace this_coro = asio::this_coro;
using namespace asio::experimental::awaitable_operators;
using std::chrono::steady_clock;
using namespace std::literals::chrono_literals;

constexpr auto use_nothrow_awaitable
    = asio::experimental::as_tuple(asio::use_awaitable);

awaitable<void> timeout(steady_clock::duration duration)
{
  asio::steady_timer timer(co_await this_coro::executor);
  timer.expires_after(duration);
  co_await timer.async_wait(use_nothrow_awaitable);
}

awaitable<void> transfer(tcp::socket& from, tcp::socket& to)
{
  std::array<char, 1024> data;

  for (;;) {
    // 超时管理精细到 per-operation
    auto result1 = co_await (
        from.async_read_some(buffer(data), use_nothrow_awaitable) ||
        timeout(5s)
      );

    // 处理 variant
    if (result1.index() == 1)
      co_return; // timed out

    auto [e1, n1] = std::get<0>(result1);
    if (e1)
      break;

    auto result2 = co_await (
        async_write(to, buffer(data, n1), use_nothrow_awaitable) ||
        timeout(1s)
      );

    if (result2.index() == 1)
      co_return; // timed out

    auto [e2, n2] = std::get<0>(result2);
    if (e2)
      break;
  }
}

// proxy 实现也变得更加简单
awaitable<void> proxy(tcp::socket client, tcp::endpoint target)
{
  tcp::socket server(client.get_executor());

  auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);
  if (!e) {
    co_await (
        transfer(client, server) ||
        transfer(server, client)
      );
  }
}

awaitable<void> listen(tcp::acceptor& acceptor, tcp::endpoint target)
{
  for (;;) {
    auto [e, client] = co_await acceptor.async_accept(use_nothrow_awaitable);
    if (e)
      break;

    auto ex = client.get_executor();
    co_spawn(ex, proxy(std::move(client), target), detached);
  }
}

int main(int argc, char* argv[])
{
  try {
    if (argc != 5) {
      std::cerr << "Usage: proxy";
      std::cerr << " <listen_address> <listen_port>";
      std::cerr << " <target_address> <target_port>\n";
      return 1;
    }

    asio::io_context ctx;

    auto listen_endpoint
        = *tcp::resolver(ctx).resolve(argv[1], argv[2], tcp::resolver::passive);

    auto target_endpoint = *tcp::resolver(ctx).resolve(argv[3], argv[4]);

    tcp::acceptor acceptor(ctx, listen_endpoint);

    co_spawn(ctx, listen(acceptor, target_endpoint), detached);

    ctx.run();
  } catch (std::exception& e) {
    std::cerr << "Exception: " << e.what() << "\n";
  }
}

通过 awaitable operator 与推导的返回值组合,现在我们不再需要多余的 watchdog,并且在超时管理上能做到更加细粒度,也更易于使用。

而这就是我们最终版本的 awesome proxy server。

THE END

如果你对 awesome 的 Asio 感兴趣,那就赶紧下载吧!

官方网站:think-async.com