简单的造点示例级轮子,规格上分为普通版和 pro 版:
- 普通版提供 200 行内的 C++20 协程、io_uring 后端,以及类似 Asio 的使用方式(
co_spawn
、co_await initiating_function
和io_context
); - pro 版在此基础上添加了 io_uring 的高级特性使用,以及多线程的协程上下文切换支持。
既然不算完整的库,基本介绍就这样了。下面不再废话,直接看代码。
快速过目
// READ -> WRITE -> [CLOSE]
Task echo(io_uring *uring, int client_fd) {
char buf[4096];
for(;;) {
auto n = co_await async_read(uring, client_fd, buf, std::size(buf)) | nofail("read");
auto printer = std::ostream_iterator<char>{std::cout};
std::ranges::copy_n(buf, n, printer);
n = co_await async_write(uring, client_fd, buf, n) | nofail("write");
bool close_proactive = n > 2 && buf[0] == 'Z' && buf[1] == 'z';
bool close_reactive = (n == 0);
if(close_reactive || close_proactive) {
co_await async_close(uring, client_fd);
break;
}
}
}
// ACCEPT -> ACCEPT
// -> ECHO
Task server(io_uring *uring, Io_context &io_context, int server_fd) {
for(;;) {
auto client_fd = co_await async_accept(uring, server_fd) | nofail("accept");
// Fork a new connection.
co_spawn(io_context, echo(uring, client_fd));
}
}
int main() {
// CREATE and LISTEN
auto server_fd = make_server({.port=8848});
auto server_fd_cleanup = defer([&](...) { close(server_fd); });
io_uring uring;
constexpr size_t ENTRIES = 256;
io_uring_queue_init(ENTRIES, &uring, 0);
auto uring_cleanup = defer([&](...) { io_uring_queue_exit(&uring); });
Io_context io_context(uring);
// Kick off!
co_spawn(io_context, server(&uring, io_context, server_fd));
io_context.run();
}
这是一个 echo 服务器,是不是有 Asio 内味了。
协程实现
coroutine
struct Task {
struct promise_type;
constexpr Task(std::coroutine_handle<promise_type> handle) noexcept: _handle(handle) {}
~Task() { if(_handle) _handle.destroy(); }
auto detach() noexcept { return std::exchange(_handle, {}); }
// Move ctor only.
Task(Task &&rhs) noexcept: _handle(rhs.detach()) {}
Task(const Task&) = delete;
Task& operator=(const Task&) = delete;
Task& operator=(Task&&) = delete;
auto operator co_await() && noexcept;
private:
std::coroutine_handle<promise_type> _handle;
};
struct Task::promise_type {
constexpr auto initial_suspend() const noexcept { return std::suspend_always{}; }
constexpr void return_void() const noexcept { /*exception_ptr...*/ }
void unhandled_exception() { /*exception_ptr...*/ }
Task get_return_object() noexcept {
auto h = std::coroutine_handle<promise_type>::from_promise(*this);
return {h};
}
struct Final_suspend {
constexpr bool await_ready() const noexcept { return false; }
auto await_suspend(auto callee) const noexcept {
auto caller = callee.promise()._caller;
// Started task (at least once) will kill itself in final_suspend.
callee.destroy();
return caller;
}
// Never reached.
constexpr auto await_resume() const noexcept {}
};
constexpr auto final_suspend() const noexcept { return Final_suspend{}; }
void push(std::coroutine_handle<> caller) noexcept { _caller = caller; }
std::coroutine_handle<> _caller {std::noop_coroutine()};
};
// Multi-task support (rvalue only).
// Examples:
// GOOD:
// co_await make_task(...);
// ////////////////////////////
// Task task = make_task(...);
// co_await std::move(task);
// BAD:
// Task task = make_task(...); // Compilable but meaningless.
// co_await task; // Error. Rejected by compiler.
inline auto Task::operator co_await() && noexcept {
struct awaiter {
bool await_ready() const noexcept { return !_handle || _handle.done(); }
auto await_suspend(std::coroutine_handle<> caller) noexcept {
_handle.promise().push(caller);
// Multi-tasks are considered as a single operation in io_contexts.
return _handle;
}
constexpr auto await_resume() const noexcept {}
std::coroutine_handle<Task::promise_type> _handle;
};
return awaiter{detach()};
}
io_context
// A quite simple io_context.
class Io_context {
public:
explicit Io_context(io_uring &uring): uring(uring) {}
Io_context(const Io_context &) = delete;
Io_context& operator=(const Io_context &) = delete;
void run() { for(_stop = false; running(); run_once()); }
// Once = submit + reap.
template <bool Exactly_once = false>
void run_once() {
auto some = Exactly_once ? take_once() : take_batch();
namespace views = std::ranges::views;
for(auto _ : views::iota(0) | views::take(some)) {
auto op = _operations.front();
_operations.pop();
op.resume();
// Unused.
[](...){}(_);
}
if((_inflight += io_uring_submit(&uring)) == 0) {
hang();
return;
}
// Some cqes are in-flight,
// even if we currently have no any h.resume().
// Just continue along the path!
io_uring_cqe *cqe;
unsigned head;
unsigned done = 0;
// Reap one operation / multiple operations.
// NOTE: One operation can also generate multiple cqes (awaiters).
io_uring_for_each_cqe(&uring, head, cqe) {
done++;
// For io_uring_prep_cancel().
if(cqe->res == -ECANCELED) [[unlikely]] continue;
auto user_data = std::bit_cast<Async_user_data*>(cqe->user_data);
user_data->cqe = cqe;
user_data->h.resume();
}
done ? io_uring_cq_advance(&uring, done) : hang();
assert(_inflight >= done);
_inflight -= done;
}
// Some observable IO statistics.
// These APIs are not affected by stop flag.
auto pending() const { return _operations.size(); }
auto inflight() const noexcept { return _inflight; }
bool drained() const { return !pending() && !inflight(); }
// Only affect the run() interface.
// The stop flag will be reset upon re-run().
//
// Some in-flight operations will be suspended when calling stop().
// This provides the opportunity to do fire-and-forget tasks.
//
// So it is the responsibility of users to ensure the correctness of this function.
// What users can do if they want to complete all tasks:
// 1. blocking method: re-run() agagin.
// 2. non-blocking method: while(!drained()) run_once();
void stop() noexcept { _stop = true; }
bool stopped() const { return _stop && !pending(); }
bool running() const { return !stopped(); }
friend void co_spawn(Io_context &io_context, Task &&task) {
io_context._operations.emplace(task.detach());
}
private:
void hang() {
// TODO: config option, eventfd.
constexpr bool ENABLE_BUSY_LOOP = false;
if constexpr (!ENABLE_BUSY_LOOP) {
// FIXME: yield() in a single thread makes no sense.
using namespace std::chrono_literals;
std::this_thread::sleep_for(1ns);
}
}
size_t take_batch() const {
constexpr size_t /*same type*/ BATCH_MAX = 32;
return std::min(BATCH_MAX, _operations.size());
}
size_t take_once() const {
return !_operations.empty();
}
io_uring ů
std::queue<std::coroutine_handle<>> _operations;
size_t _inflight {};
bool _stop {false};
};
async_operation
struct Async_user_data {
io_uring *uring;
io_uring_sqe *sqe {};
io_uring_cqe *cqe {};
// io_contexts may submit before setting up `h` in await_suspend().
// Therefore:
// 1. Operations need a check in await_ready().
// 2. `h` should be initially `std::noop-`, which is safe (and no effect) to resume.
std::coroutine_handle<> h {std::noop_coroutine()};
Async_user_data(io_uring *uring) noexcept: uring(uring) {}
};
struct Async_operation {
constexpr bool await_ready() const noexcept {
// No allocation error and no eager completion.
if(user_data.sqe && !user_data.cqe) [[likely]] {
return false;
}
return true;
}
void await_suspend(std::coroutine_handle<> h) noexcept {
user_data.h = h;
}
auto await_resume() const noexcept {
if(!user_data.sqe) [[unlikely]] {
return -ENOMEM;
}
return user_data.cqe->res;
}
Async_operation(io_uring *uring, auto uring_prep_fn, auto &&...args) noexcept: user_data(uring) {
// If !sqe, return -ENOMEM immediately. (await_ready() => true.)
if((user_data.sqe = io_uring_get_sqe(uring))) [[likely]] {
uring_prep_fn(user_data.sqe, std::forward<decltype(args)>(args)...);
// https://man7.org/linux/man-pages/man3/io_uring_cqe_get_data.3.html
// For Linux v5.15, data must be set AFTER prep_fn();
// otherwise, io_uring will return an inaccessible CQE.
// This problem does not exist in Linux v6.1.
// However, according to the man page,
// set_data() only needs to be called before submit().
// Fine, it just works...
io_uring_sqe_set_data(user_data.sqe, &user_data);
}
}
Async_user_data user_data;
};
inline auto async_operation(io_uring *uring, auto uring_prep_fn, auto &&...args) noexcept {
return Async_operation(uring, uring_prep_fn, std::forward<decltype(args)>(args)...);
}
inline auto async_accept(io_uring *uring, int server_fd,
sockaddr *addr, socklen_t *addrlen, int flags = 0) noexcept {
return async_operation(uring,
io_uring_prep_accept, server_fd, addr, addrlen, flags);
}
inline auto async_accept(io_uring *uring, int server_fd, int flags = 0) noexcept {
return async_operation(uring,
io_uring_prep_accept, server_fd, nullptr, nullptr, flags);
}
// On files that support seeking, if the `offset` is set to -1, the read operation commences
// at the file offset, and the file offset is incremented by the number of bytes read. See read(2)
// for more details. Note that for an async API, reading and updating the current file offset may
// result in unpredictable behavior, unless access to the file is serialized.
// It is **not encouraged** to use this feature, if it's possible to provide the desired IO offset
// from the application or library.
inline auto async_read(io_uring *uring, int fd, void *buf, size_t n, uint64_t offset = 0) noexcept {
return async_operation(uring,
io_uring_prep_read, fd, buf, n, offset);
}
inline auto async_write(io_uring *uring, int fd, const void *buf, size_t n, uint64_t offset = 0) {
return async_operation(uring,
io_uring_prep_write, fd, buf, n, offset);
}
inline auto async_close(io_uring *uring, int fd) noexcept {
return async_operation(uring,
io_uring_prep_close, fd);
}
去除注释后为 164 行。
辅助实现
utils 主要是方便错误处理和提供免除类设计的 RAII。可以不关心这些。
// C++-style check for syscall.
// Failed on ret < 0 by default.
//
// INT_ASYNC_CHECK: helper for liburing (-ERRNO) and other syscalls (-1).
// It may break generic programming (forced to int).
template <typename Comp = std::less<int>, auto V = 0, bool INT_ASYNC_CHECK = true>
struct nofail {
std::string_view reason;
// Examples:
// fstat(...) | nofail("fstat"); // Forget the if-statement and ret!
// int fd = open(...) | nofail("open"); // If actually need a ret, here you are!
friend decltype(auto) operator|(auto &&ret, nofail nf) {
if(Comp{}(ret, V)) [[unlikely]] {
// Hack errno.
if constexpr (INT_ASYNC_CHECK) {
using T = std::decay_t<decltype(ret)>;
static_assert(std::is_convertible_v<T, int>);
// -ERRNO
if(ret != -1) errno = -ret;
}
perror(nf.reason.data());
std::terminate();
}
return std::forward<decltype(ret)>(ret);
};
};
// Make clang happy.
nofail(...) -> nofail<std::less<int>, 0, true>;
// Go-style, move-safe defer.
[[nodiscard("defer() is not allowed to be temporary.")]]
inline auto defer(auto func) {
// Make STL happy.
auto dummy = reinterpret_cast<void*>(0x1);
return std::unique_ptr<void, decltype(func)>{dummy, std::move(func)};
}
// For make_server().
struct make_server_option_t {
int port {8848};
int backlog {128};
bool nonblock {false};
bool reuseaddr {true};
bool reuseport {false};
};
// Do some boring stuff and return a server fd.
inline int make_server(make_server_option_t option) {
int socket_flag = option.nonblock ? SOCK_NONBLOCK : 0;
int socket_fd = socket(AF_INET, SOCK_STREAM, socket_flag) | nofail("socket");
auto setsock = [enable = 1, fd = socket_fd](int optname) {
setsockopt(fd, SOL_SOCKET, optname, &enable, sizeof(int)) | nofail("setsockopt");
};
if(option.reuseaddr) setsock(SO_REUSEADDR);
if(option.reuseport) setsock(SO_REUSEPORT);
sockaddr_in addr {};
addr.sin_family = AF_INET;
addr.sin_port = htons(option.port);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
// About the strict aliasing rule:
// `reinterpret_cast` is OK if we don't derenference it (with the wrong type).
// But what about the implementation of `bind()`?
// - `bind()` must dereference the object with a wrong type.
// Can we memcpy an actual `sockaddr` from `sockaddr_xxx` then pass it to `bind()`?
// - Normally, memcpy is a good way to make type punning safe.
// - It is guaranteed by the C (and C++) standard to access through an object.
// - But answer is NO, because `sizeof(sockaddr_xxx) <= sizeof(sockaddr)` is not necessarily true.
// - How can you access an object without proper size and alignment?
// - `launder()` with cast / `start_lifetime_as()` cannot solve the `sizeof()` problem either.
// Does it violate the strict aliasing rule?
// - Maybe. It depends on the library side. We cant do any more.
// - But many people explicitly cast types with UNIX interfaces.
// - And compilers should not offend users, especially for legacy codes.
// - So practically, it is OK.
auto no_alias_addr = reinterpret_cast<const sockaddr*>(&addr);
bind(socket_fd, no_alias_addr, sizeof(addr)) | nofail("bind");
listen(socket_fd, option.backlog) | nofail("listen");
return socket_fd;
}
实现要点
io_uring 作者 Jens Axboe 的演讲
- co_await 既支持 awaiter,也支持协程本身,类似于
asio::awaitable
,虽说不如 Asio 精妙。 - 被 co_spawn 或内部 co_await 的协程会放弃协程描述符的所有权,在最后的暂停点再自行销毁。
- 左值的 awaiter 不值得使用,会增加很多不必要的异步处理,但是轮子中并没有完全禁用。
- 比如先构造了一个 awaiter,发起了异步请求。
- 然后 co_await 另一个 awaiter。
- 原 awaiter 可能无法在 await_suspend 中 liburing submit 前插入新的 flag。
- 有可能是因完成而被收割,更有可能错过了 resume 设置,需要 await_ready 弥补判断。
- 而对于右值 awaiter,这些操作是整体原子的,没有这种麻烦事和额外开销。
- 左值当然可以延迟请求,但需要存储转发参数并强制模板类设计,因此没有采用。
- co_spawn 投递协程,run 在当前上下文执行协程,但是 io_uring 在多种特性下是无法得知正确的已提交异步操作数,比如:
- multishot 可以单个 sqe 提交并生成多个 cqe,cqe 自然无法在实际完成前得知。
- SQPOLL 可以交由内核线程去处理 sqe 提交,因此用户侧提交数本身就是不正确的,这点在 man 手册和社区都有提到。
- 协程的定制点使得特性添加变得容易,比如 multishot 和 IO drain 使用基本的继承就能轻松完成。
- 人生苦短,千万别直接硬上 io_uring,多用点 liburing。 (Use it! Don’t be a hero.)
特性使用
这部分算是番外篇。io_uring 的特性非常丰富,可玩性相当的高(可惜的是这方面 man 手册写得真不怎样)。因此这些适配也都尽可能写了,在源码仓库中我尝试一个特性对应一个示例。当然了,用上这些特性就远不只两百行了。
简单的再给出一点使用示例吧。
// Multishot, 只对内核启动一个异步操作,但是允许多次完成
// https://manpages.debian.org/unstable/liburing-dev/io_uring_prep_multishot_accept.3.en.html
Task server(io_uring *uring, Io_context &io_context, int server_fd) {
// A multishot (submit once) awaiter.
auto awaiter = async_multishot_accept(uring, server_fd);
for(;;) {
// No need to submit more sqes.
auto client_fd = co_await awaiter | nofail("multishot_accept");
co_spawn(io_context, echo(uring, client_fd));
}
}
// IO link,使用&或者 | 来链式同步操作,无需多次 co_await
// https://manpages.debian.org/unstable/liburing-dev/io_uring_sqe_set_flags.3.en.html#IOSQE_IO_LINK
Task sayhello(io_uring *uring, int client_fd) {
using namespace std::literals;
auto hello = "hello "sv;
auto world = "world!\n"sv;
// Actually co_await only for the last one. (by proxy)
// But still keep that orders.
co_await (async_write(uring, client_fd, hello.data(), hello.size())
& async_write(uring, client_fd, world.data(), world.size())
& async_close(uring, client_fd));
}
// IO drain,完成通知前需先完成此前启动的异步操作
// https://manpages.debian.org/unstable/liburing-dev/io_uring_sqe_set_flags.3.en.html#IOSQE_IO_DRAIN
Task server(io_uring *uring, Io_context &io_context, int server_fd) {
for(;;) {
auto client_fd = co_await async_drain_accept(uring, server_fd) | nofail("accept");
co_spawn(io_context, echo(uring, client_fd));
}
}
// provided buffers,无需在异步操作前预备 buffer
// https://manpages.debian.org/unstable/liburing-dev/io_uring_setup_buf_ring.3.en.html
Task echo(io_uring *uring, int client_fd, auto provided_buffers_token, auto buffer_helpers) {
const auto &[buffer_finder, buffer_size, _] = buffer_helpers;
for(;;) {
// We don’t need to prepare a buffer before completing the operation.
auto [n, bid] = co_await async_read(provided_buffers_token, client_fd, nullptr, buffer_size);
auto rejoin = defer([&](...) {
buffer_rejoin(provided_buffers_token, buffer_helpers, bid);
});
const auto buf = buffer_finder(bid);
co_await async_write(uring, client_fd, buf, n) | nofail("write");
// ...
}
}
// (不算特性,只是协程的封装)通过 switch_to() 来回切换不同的 io_context 所在线程
Task just_print(io_uring (&uring)[2], Io_context (&io_context)[2]) {
for(size_t i = 0; ; i ^= 1) {
std::ostringstream oss;
oss << "current thread is "
<< std::this_thread::get_id()
<< std::endl;
auto string = oss.str();
co_await async_write(&uring[i], 1, string.data(), string.size());
// Switch to the thread where io_context[i^1] is running.
co_await switch_to(io_context[i ^ 1]);
}
}
Repo
源码存放在 Github/Caturra000/io_uring-examples-cpp,示例代码的目的仅用于分析 io_uring 的行为,并不考虑低内核版本的兼容性,更新随缘。