这是去年 NVIDIA stdexec 作者 Eric Niebler 在油管上的 live coding:一小时教你实现低配版的 std::execution
。如果你想了解 std::execution
标准库的设计但是没耐心翻阅一百多页的 P2300,那可以先看下这里。
原视频:Working with Asynchrony Generally: From Zero to Sender/Receiver in ~60 Minutes。
What is it?
什么是 std::execution
?你可以理解为一个标准的异步编程模型,旨在让整个 C++ 社区都使用这套方案来描述异步计算。这种模型可以让你有标准的方式指定一个任务该在何处、如何以及何时执行;另外还提供了一些通用的异步算法供用户直接使用,比如 then()
算法可用于完成链式操作。当然它是可扩展的,不管是事件循环、线程池等异步方式还是异构计算(CPU+GPU)都没问题,你可以看后面的第一个示例去了解如何在这个模型中使用第三方库。
Three big things
std::execution
中最主要的三个概念就是 scheduler、sender 和 receiver:
- sender 是 lazy value:这意味着这个值会延迟到需要时才计算。
- receiver 是 continuation 或者 callback:当 sender 计算完成时,得到的值会传入 receiver 而不是返回给调用方。
- scheduler 是资源的描述符:实际上可能是个简单的指针。
Example
接下来简单用一个例子来阐述三大件的编写思路。这个例子是在线程池并发运行三个任务:
- 声明一个第三方的 execution context,然后从中获取 scheduler。在这里 execution context 指的是第三方库
libunifex
提供的static_thread_pool
线程池。 - 通过
schedule()
创建空的任务单元。这意味着任务将运行在static_thread_pool
的上下文中。 schedule()
返回一个 sender,可以接着调用then()
算法。此前的空任务会切换执行某种任务比如compute_intensive()
。then()
同样也会返回一个 sender。- 这里尝试三个任务并发执行,因此使用
when_all()
。when_all()
返回一个 sender。 - 最后,我们执行
sync_wait()
来同步等待并得到结果。注意在调用这个函数前任何任务都没有实际执行(整个 sender/receiver 模型是 lazy 设计)。sync_wait()
返回std::optional<std::tuple<T>>
,因此你可以继续调用std::optional<>::value()
。
Big idea
从前面的例子可以看出,整体用法就是:
- scheduler 生成 sender。
- 异步算法(比如
then()
)接受和返回 sender。
总之就是一种组合的思路:把 sender 丢到一种异步算法里面,然后会得到另一个 sender,如此往复直到最后等待结果。
Lifecycle
怎样才能完成一整套异步操作?上图展示一个异步操作的生命周期:
- 第一步需要一个 scheduler,通过
schedule()
得到一个 sender。 - sender 通过
connect()
连接到 receiver,得到一个 operation state。 - operation state 需要在整个异步操作中维护生命周期。
- 此时仍未启动任何任务,直到你在 operation 调用
start()
,一般来说任务就会入队直到一段时间过去后,操作完成,就会通知 receiver。
NOTE: 虚线部分是算法实现的细节(比如 then()
算法),因此对于一个库用户而非实现者来说只需要关注 scheduler 和 sender。
还需补充一点是 receiver 的接收方式有三种:
set_value()
:任务正常完成。set_error()
:任务失败。set_stopped()
:任务被中途取消。
Concepts
从前面的描述也可以总结出上图的 concept,实现 sender/receiver 就是要实现以上接口。
just() sender factory
现在我们来实操,先写一个简单的 just sender factory。
///////////////////////////////////////////
// Some utility code
///////////////////////////////////////////
// 这些是公用的代码,后面会反复用到
// 不可移动类
struct immovable {
immovable() = default;
immovable(immovable&&) = delete;
};
// 简单萃取 connect() 的返回类型
template <class S, class R>
using connect_result_t = decltype(connect(std::declval<S>(), std::declval<R>()));
// 简单萃取 sender<result_t>模板中的 result_t
template <class S>
using sender_result_t = typename S::result_t;
///////////////////////////////////////////
// just(T) sender factory
///////////////////////////////////////////
// 3. connect() 后得到的产物
template <class R, class T>
struct just_operation : immovable {
R rec;
T value;
// operation 需要实现 start()
// 由于 just() 操作是 start() 后立刻完成的,只需直接通知 receiver
friend void start(just_operation& self) {
// 通过调用 set_value() 实现完成通知
set_value(self.rec, self.value);
}
};
// 2. just sender 存放 value
template <class T>
struct just_sender {
using result_t = T;
T value;
// sender 需要实现 connect(),返回 operation state
template <class R>
friend just_operation<R, T> connect(just_sender self, R rec) {
return { {}, rec, self.value };
}
};
// 1. 要实现的 sender factory 接口,接受一个值,传递到 sender 然后立刻完成
template <class T>
just_sender<T> just(T t) {
return {t};
}
这样就实现了 just sender,后面看下如何运作。
//
// start test code
//
struct cout_receiver {
friend void set_value(cout_receiver self, auto val) {
// 这个 receiver 什么都不干,只对收集到的结果输出
std::cout << "Result: " << val << '\n';
}
friend void set_error(cout_receiver self, std::exception_ptr err) {
std::terminate();
}
friend void set_stopped(cout_receiver self) {
std::terminate();
}
};
int main() {
// just() factory 生成 just sender
auto s = just(42);
// just sender 连接 cout receiver 生成 operation
auto op = connect(s, cout_receiver{});
// 运行 operation
start(op);
}
上面的代码大概是这个意思:生成值 42,然后输出。
流程图
很显然这个测试只是一份输出 42 的无聊代码,但后面的迭代很快会支持链式操作。
then() sender adaptor
现在来实现一个 then()
算法,then(sender, f)
表示 sender
的任务链接到另一个任务 f
,这样能支持链式操作。
///////////////////////////////////////////
// then(Sender, Function) sender adaptor
///////////////////////////////////////////
// 4.
template <class R, class F>
struct then_receiver {
R rec;
F f;
// 当通知完成时,再将收集的值 val 转换为 f(val)
friend void set_value(then_receiver self, auto val) {
set_value(self.rec, self.f(val));
}
friend void set_error(then_receiver self, std::exception_ptr err) {
set_error(self.rec, err);
}
friend void set_stopped(then_receiver self) {
set_stopped(self.rec);
}
};
// 3.
template <class S, class R, class F>
struct then_operation : immovable {
// S 为内部 sender
// 这里需要实现 then_receiver
connect_result_t<S, then_receiver<R, F>> op;
friend void start(then_operation& self) {
// 先执行内部 start,就能启动链式操作
start(self.op);
// 实际后面也没事情干了
}
};
// 2. 现在要实现 then() 返回的 then_sender
template <class S, class F>
struct then_sender {
using result_t = std::invoke_result_t<F, sender_result_t<S>>;
// 需要暂存 then() 传递过来的内部 sender 和 function
S s;
// function 需要内部 sender 产生的 value 才能传递下去
// 并且 f 需要传递给 receiver
F f;
template <class R>
friend then_operation<S, R, F> connect(then_sender self, R rec) {
// connect() 操作先完成内部 sender 与 then_receiver 的 connect()
// 这里需要实现一个 then_operation 和 then_receiver
// NOTE:
// 虽然 operation 是 immovable,但是 C++17 有 guaranteed copy elision
// 因此返回是没有问题
return { {}, connect(self.s, then_receiver<R, F>{rec, self.f}) };
}
};
// 1. 要实现的 then() 接口
// 需要返回 then_sender
template <class S, class F>
then_sender<S, F> then(S s, F f) {
// 简单的构造 sender
return {s, f};
}
int main() {
auto s2 = then(just(42), [](int i) { return i + 1; });
auto op2 = connect(s2, cout_receiver{});
start(op2);
}
这样就得到了一个链式操作:先计算得到 42,然后链式传递给某个匿名函数得到 43 并输出。附上一幅帮助理解的流程图。
画得有点复杂,尽力了
sync_wait() sender consumer
现在要为异步操作增加一个同步等待的特性,即 sync_wait()
(尽管这里还没有异步操作……)。
//////////////////////////////////////////
// sync_wait() sender consumer
///////////////////////////////////////////
// 2. 考虑到异步操作可以在不同的线程间执行
// 因此需要一个 control block
struct sync_wait_data {
std::mutex mtx;
std::condition_variable cv;
std::exception_ptr err;
bool done = false;
};
// 3. 需要一个 receiver 来实现 value 收集和通知
template <class T>
struct sync_wait_receiver {
sync_wait_data& data;
std::optional<T>& value;
friend void set_value(sync_wait_receiver self, auto val) {
std::unique_lock lk{self.data.mtx};
self.value.emplace(val);
self.data.done = true;
self.data.cv.notify_one();
}
friend void set_error(sync_wait_receiver self, std::exception_ptr err) {
std::unique_lock lk{self.data.mtx};
self.data.err = err;
self.data.done = true;
self.data.cv.notify_one();
}
friend void set_stopped(sync_wait_receiver self) {
std::unique_lock lk{self.data.mtx};
self.data.done = true;
self.data.cv.notify_one();
}
};
// 1. 从前面的例子可看到,sync_wait 接收一个任务,即 sender
template <class S>
std::optional<sender_result_t<S>> sync_wait(S s) {
// 通过前面 2 份代码可以看出,sender 一般会实现 result_t 方便萃取
using T = sender_result_t<S>;
// 需要一个 control block 来保证线程安全(见 2.)
sync_wait_data data;
std::optional<T> value;
// 接下来的问题就是如何得到 value
auto op = connect(s, sync_wait_receiver<T>{data, value});
start(op);
// 提供一个同步点,在 sync_wait_receiver 对应的 set_value 处唤醒(见 3.)
std::unique_lock lk{data.mtx};
data.cv.wait(lk, [&]{return data.done;});
if (data.err)
std::rethrow_exception(data.err);
return value;
}
int main() {
auto s3 = then(just(42), [](int i) { return i + 1; });
int val3 = sync_wait(s3).value();
std::cout << "Result: " << val3 << '\n';
}
这个例子是比较简单的,就是依靠一个 control block 还有 receiver 来完成同步等待操作。从 main()
函数可以看出,对于库调用者来说并不需要关心 sender 以外的事情,比如手动 connect()
还有 receiver 的实现都只出现在库内部。
run_loop execution context
我们现在还缺少一个真正意义的异步操作,那就提供一个执行上下文来完成这件事情,比如来一个 FIFO & MPSC 的队列。
///////////////////////////////////////////
// run_loop execution context
///////////////////////////////////////////
struct run_loop : immovable {
struct none{};
// 用于支持 operation 泛型擦除,从而放到链表上
struct task : immovable {
task* next = this;
virtual void execute() {}
};
// 支持绑定不同 receiver 的 operation
template <class R>
struct operation : task {
R rec;
run_loop& loop;
operation(R rec, run_loop& loop)
: rec(rec), loop(loop) {}
void execute() override final {
// 为了简化示例,value 都用 none 来代替
set_value(rec, none{});
}
// operation 需要支持 start()
// 这里就是插入链表
friend void start(operation& self) {
self.loop.push_back(&self);
}
};
// 一个 FIFO 链表
// 作者提到 sender/receiver 的设计是避免了动态分配的可能
// 因此使用的是侵入式链表
// 你可以看到实际 task/operation 是在 main() 函数栈上分配的
task head;
// 使用循环链表,让代码更加简洁
task* tail = &head;
bool finishing = false;
// 保证线程安全
std::mutex mtx;
std::condition_variable cv;
void push_back(task* op) {
std::unique_lock lk(mtx);
op->next = &head;
tail = tail->next = op;
cv.notify_one();
}
task* pop_front() {
std::unique_lock lk(mtx);
cv.wait(lk, [this]{return head.next != &head || finishing;});
if (head.next == &head)
return nullptr;
return std::exchange(head.next, head.next->next);
}
struct sender {
using result_t = none;
run_loop* loop;
template <class R>
friend operation<R> connect(sender self, R rec) {
return {rec, *self.loop};
}
};
// 用于指向一个上下文
struct scheduler {
run_loop* loop;
// schedule() 生成一个关联上下文的 sender
friend sender schedule(scheduler self) {
return {self.loop};
}
};
scheduler get_scheduler() {
return {this};
}
// 执行上下文位于 run() 调用者所在的线程
void run() {
while (auto* op = pop_front())
op->execute();
}
// 表示未来不会再加入任何 task
void finish() {
std::unique_lock lk(mtx);
finishing = true;
cv.notify_all();
}
};
int main() {
run_loop loop;
auto sched4 = loop.get_scheduler();
auto s4 = then(schedule(sched4), [](auto) { return 42; });
auto op4 = connect(s4, cout_receiver{});
// start() 仅表示入队
start(op4);
auto s5 = then(schedule(sched4), [](auto) { return 43; });
auto op5 = connect(s5, cout_receiver{});
start(op5);
loop.finish();
// 真正执行任务
loop.run();
}
这里展示了 scheduler(或者说是 execution context)和 sender/receiver 的组合使用。除去 context 实现以外,基本就是 schedule()
构造一个关联上下文的 sender 即可。但是用法过于细节了,实际上可以封装一层简化使用。
thread_context execution context
这里基于上面的 run_loop
实现封装了一个在独立的线程执行任务的上下文。
///////////////////////////////////////////
// thread_context execution context
///////////////////////////////////////////
class thread_context : run_loop {
std::thread th{[this]{ run(); }};
public:
using run_loop::get_scheduler;
using run_loop::finish;
void join() {
th.join();
}
};
int main() {
thread_context th;
auto sched6 = th.get_scheduler();
auto s6 = then(schedule(sched6), [](auto) { return 42; });
auto s7 = then(s6, [](int i){return i+1;});
auto val7 = sync_wait(s7).value();
th.finish();
th.join();
std::cout << val7 << '\n';
}
这是最后一份代码了,现在已经接近 stdexec
里面的示例,限于这是一个低配版实现,也算是做得足够简单实用了。你也可以尝试实现 when_all()
算法和 operator|
。
作者现场写的完整代码可以在 godbolt 获得。
后记
其实也没啥好翻译的,主要还是 Read The Friendly Source Code。视频结尾主持人拉家常的部分(聊到 Range 库的历史去了)我没怎么看,如果你感兴趣也可以看下原视频。
本文已转发到知乎:C++ Execution:从零开始的 Sender/Receiver [译] - 知乎 (zhihu.com)。