cover

这是去年 NVIDIA stdexec 作者 Eric Niebler 在油管上的 live coding:一小时教你实现低配版的 std::execution。如果你想了解 std::execution 标准库的设计但是没耐心翻阅一百多页的 P2300,那可以先看下这里。

原视频:Working with Asynchrony Generally: From Zero to Sender/Receiver in ~60 Minutes

What is it?

what-is

什么是 std::execution?你可以理解为一个标准的异步编程模型,旨在让整个 C++ 社区都使用这套方案来描述异步计算。这种模型可以让你有标准的方式指定一个任务该在何处、如何以及何时执行;另外还提供了一些通用的异步算法供用户直接使用,比如 then() 算法可用于完成链式操作。当然它是可扩展的,不管是事件循环、线程池等异步方式还是异构计算(CPU+GPU)都没问题,你可以看后面的第一个示例去了解如何在这个模型中使用第三方库。

Three big things

high-level-degisn

std::execution 中最主要的三个概念就是 scheduler、sender 和 receiver:

  • sender 是 lazy value:这意味着这个值会延迟到需要时才计算。
  • receiver 是 continuation 或者 callback:当 sender 计算完成时,得到的值会传入 receiver 而不是返回给调用方。
  • scheduler 是资源的描述符:实际上可能是个简单的指针。

Example

example

接下来简单用一个例子来阐述三大件的编写思路。这个例子是在线程池并发运行三个任务:

  1. 声明一个第三方的 execution context,然后从中获取 scheduler。在这里 execution context 指的是第三方库 libunifex 提供的 static_thread_pool 线程池。
  2. 通过 schedule() 创建空的任务单元。这意味着任务将运行在 static_thread_pool 的上下文中。
  3. schedule() 返回一个 sender,可以接着调用 then() 算法。此前的空任务会切换执行某种任务比如 compute_intensive()then() 同样也会返回一个 sender。
  4. 这里尝试三个任务并发执行,因此使用 when_all()when_all() 返回一个 sender。
  5. 最后,我们执行 sync_wait() 来同步等待并得到结果。注意在调用这个函数前任何任务都没有实际执行(整个 sender/receiver 模型是 lazy 设计)。sync_wait() 返回 std::optional<std::tuple<T>>,因此你可以继续调用 std::optional<>::value()

Big idea

从前面的例子可以看出,整体用法就是:

  • scheduler 生成 sender。
  • 异步算法(比如 then())接受和返回 sender。

总之就是一种组合的思路:把 sender 丢到一种异步算法里面,然后会得到另一个 sender,如此往复直到最后等待结果。

Lifecycle

lifecycle

怎样才能完成一整套异步操作?上图展示一个异步操作的生命周期:

  1. 第一步需要一个 scheduler,通过 schedule() 得到一个 sender。
  2. sender 通过 connect() 连接到 receiver,得到一个 operation state。
  3. operation state 需要在整个异步操作中维护生命周期。
  4. 此时仍未启动任何任务,直到你在 operation 调用 start(),一般来说任务就会入队直到一段时间过去后,操作完成,就会通知 receiver。

NOTE: 虚线部分是算法实现的细节(比如 then() 算法),因此对于一个库用户而非实现者来说只需要关注 scheduler 和 sender。

notification

还需补充一点是 receiver 的接收方式有三种:

  • set_value():任务正常完成。
  • set_error():任务失败。
  • set_stopped():任务被中途取消。

Concepts

concepts

从前面的描述也可以总结出上图的 concept,实现 sender/receiver 就是要实现以上接口。

just() sender factory

现在我们来实操,先写一个简单的 just sender factory。

///////////////////////////////////////////
// Some utility code
///////////////////////////////////////////

// 这些是公用的代码,后面会反复用到

// 不可移动类
struct immovable {
    immovable() = default;
    immovable(immovable&&) = delete;
};

// 简单萃取 connect() 的返回类型
template <class S, class R>
using connect_result_t = decltype(connect(std::declval<S>(), std::declval<R>()));

// 简单萃取 sender<result_t>模板中的 result_t
template <class S>
using sender_result_t = typename S::result_t;

///////////////////////////////////////////
// just(T) sender factory
///////////////////////////////////////////

// 3. connect() 后得到的产物
template <class R, class T>
struct just_operation : immovable {
    R rec;
    T value;

    // operation 需要实现 start()
    // 由于 just() 操作是 start() 后立刻完成的,只需直接通知 receiver
    friend void start(just_operation& self) {
        // 通过调用 set_value() 实现完成通知
        set_value(self.rec, self.value);
    }
};

// 2. just sender 存放 value
template <class T>
struct just_sender {
    using result_t = T;
    T value;

    // sender 需要实现 connect(),返回 operation state
    template <class R>
    friend just_operation<R, T> connect(just_sender self, R rec) {
        return { {}, rec, self.value };
    }
};

// 1. 要实现的 sender factory 接口,接受一个值,传递到 sender 然后立刻完成
template <class T>
just_sender<T> just(T t) {
    return {t};
}

这样就实现了 just sender,后面看下如何运作。

//
// start test code
//
struct cout_receiver {
    friend void set_value(cout_receiver self, auto val) {
        // 这个 receiver 什么都不干,只对收集到的结果输出
        std::cout << "Result: " << val << '\n';
    }

    friend void set_error(cout_receiver self, std::exception_ptr err) {
        std::terminate();
    }

    friend void set_stopped(cout_receiver self) {
        std::terminate();
    }
};

int main() {
    // just() factory 生成 just sender
    auto s = just(42);
    // just sender 连接 cout receiver 生成 operation
    auto op = connect(s, cout_receiver{});
    // 运行 operation
    start(op);
}

上面的代码大概是这个意思:生成值 42,然后输出。

just 流程图

很显然这个测试只是一份输出 42 的无聊代码,但后面的迭代很快会支持链式操作。

then() sender adaptor

现在来实现一个 then() 算法,then(sender, f) 表示 sender 的任务链接到另一个任务 f,这样能支持链式操作。

///////////////////////////////////////////
// then(Sender, Function) sender adaptor
///////////////////////////////////////////

// 4.
template <class R, class F>
struct then_receiver {
    R rec;
    F f;

    // 当通知完成时,再将收集的值 val 转换为 f(val)
    friend void set_value(then_receiver self, auto val) {
        set_value(self.rec, self.f(val));
    }

    friend void set_error(then_receiver self, std::exception_ptr err) {
        set_error(self.rec, err);
    }

    friend void set_stopped(then_receiver self) {
        set_stopped(self.rec);
    }
};

// 3.
template <class S, class R, class F>
struct then_operation : immovable {
    // S 为内部 sender
    // 这里需要实现 then_receiver
    connect_result_t<S, then_receiver<R, F>> op;

    friend void start(then_operation& self) {
        // 先执行内部 start,就能启动链式操作
        start(self.op);
        // 实际后面也没事情干了
    }
};

// 2. 现在要实现 then() 返回的 then_sender
template <class S, class F>
struct then_sender {
    using result_t = std::invoke_result_t<F, sender_result_t<S>>;
    // 需要暂存 then() 传递过来的内部 sender 和 function
    S s;
    // function 需要内部 sender 产生的 value 才能传递下去
    // 并且 f 需要传递给 receiver
    F f;

    template <class R>
    friend then_operation<S, R, F> connect(then_sender self, R rec) {
        // connect() 操作先完成内部 sender 与 then_receiver 的 connect()
        // 这里需要实现一个 then_operation 和 then_receiver
        // NOTE:
        // 虽然 operation 是 immovable,但是 C++17 有 guaranteed copy elision
        // 因此返回是没有问题
        return { {}, connect(self.s, then_receiver<R, F>{rec, self.f}) };
    }
};

// 1. 要实现的 then() 接口
// 需要返回 then_sender
template <class S, class F>
then_sender<S, F> then(S s, F f) {
    // 简单的构造 sender
    return {s, f};
}

int main() {
    auto s2 = then(just(42), [](int i) { return i + 1; });
    auto op2 = connect(s2, cout_receiver{});
    start(op2);
}

这样就得到了一个链式操作:先计算得到 42,然后链式传递给某个匿名函数得到 43 并输出。附上一幅帮助理解的流程图。

then 画得有点复杂,尽力了

sync_wait() sender consumer

现在要为异步操作增加一个同步等待的特性,即 sync_wait()(尽管这里还没有异步操作……)。

//////////////////////////////////////////
// sync_wait() sender consumer
///////////////////////////////////////////

// 2. 考虑到异步操作可以在不同的线程间执行
// 因此需要一个 control block
struct sync_wait_data {
    std::mutex mtx;
    std::condition_variable cv;
    std::exception_ptr err;
    bool done = false;
};

// 3. 需要一个 receiver 来实现 value 收集和通知
template <class T>
struct sync_wait_receiver {
    sync_wait_data& data;
    std::optional<T>& value;

    friend void set_value(sync_wait_receiver self, auto val) {
        std::unique_lock lk{self.data.mtx};
        self.value.emplace(val);
        self.data.done = true;
        self.data.cv.notify_one();
    }

    friend void set_error(sync_wait_receiver self, std::exception_ptr err) {
        std::unique_lock lk{self.data.mtx};
        self.data.err = err;
        self.data.done = true;
        self.data.cv.notify_one();
    }

    friend void set_stopped(sync_wait_receiver self) {
        std::unique_lock lk{self.data.mtx};
        self.data.done = true;
        self.data.cv.notify_one();
    }
};

// 1. 从前面的例子可看到,sync_wait 接收一个任务,即 sender
template <class S>
std::optional<sender_result_t<S>> sync_wait(S s) {
    // 通过前面 2 份代码可以看出,sender 一般会实现 result_t 方便萃取
    using T = sender_result_t<S>;
    // 需要一个 control block 来保证线程安全(见 2.)
    sync_wait_data data;
    std::optional<T> value;

    // 接下来的问题就是如何得到 value

    auto op = connect(s, sync_wait_receiver<T>{data, value});
    start(op);

    // 提供一个同步点,在 sync_wait_receiver 对应的 set_value 处唤醒(见 3.)
    std::unique_lock lk{data.mtx};
    data.cv.wait(lk, [&]{return data.done;});

    if (data.err)
        std::rethrow_exception(data.err);

    return value;
}

int main() {
    auto s3 = then(just(42), [](int i) { return i + 1; });
    int val3 = sync_wait(s3).value();
    std::cout << "Result: " << val3 << '\n';
}

这个例子是比较简单的,就是依靠一个 control block 还有 receiver 来完成同步等待操作。从 main() 函数可以看出,对于库调用者来说并不需要关心 sender 以外的事情,比如手动 connect() 还有 receiver 的实现都只出现在库内部。

run_loop execution context

我们现在还缺少一个真正意义的异步操作,那就提供一个执行上下文来完成这件事情,比如来一个 FIFO & MPSC 的队列。

///////////////////////////////////////////
// run_loop execution context
///////////////////////////////////////////
struct run_loop : immovable {
    struct none{};

    // 用于支持 operation 泛型擦除,从而放到链表上
    struct task : immovable {
        task* next = this;
        virtual void execute() {}
    };

    // 支持绑定不同 receiver 的 operation
    template <class R>
    struct operation : task {
        R rec;
        run_loop& loop;

        operation(R rec, run_loop& loop)
          : rec(rec), loop(loop) {}

        void execute() override final {
            // 为了简化示例,value 都用 none 来代替
            set_value(rec, none{});
        }

        // operation 需要支持 start()
        // 这里就是插入链表
        friend void start(operation& self) {
            self.loop.push_back(&self);
        }
    };

    // 一个 FIFO 链表
    // 作者提到 sender/receiver 的设计是避免了动态分配的可能
    // 因此使用的是侵入式链表
    // 你可以看到实际 task/operation 是在 main() 函数栈上分配的
    task head;
    // 使用循环链表,让代码更加简洁
    task* tail = &head;
    bool finishing = false;
    // 保证线程安全
    std::mutex mtx;
    std::condition_variable cv;

    void push_back(task* op) {
        std::unique_lock lk(mtx);
        op->next = &head;
        tail = tail->next = op;
        cv.notify_one();
    }

    task* pop_front() {
        std::unique_lock lk(mtx);
        cv.wait(lk, [this]{return head.next != &head || finishing;});
        if (head.next == &head)
            return nullptr;
        return std::exchange(head.next, head.next->next);
    }

    struct sender {
        using result_t = none;
        run_loop* loop;

        template <class R>
        friend operation<R> connect(sender self, R rec) {
            return {rec, *self.loop};
        }
    };

    // 用于指向一个上下文
    struct scheduler {
        run_loop* loop;
        // schedule() 生成一个关联上下文的 sender
        friend sender schedule(scheduler self) {
            return {self.loop};
        }
    };

    scheduler get_scheduler() {
        return {this};
    }

    // 执行上下文位于 run() 调用者所在的线程
    void run() {
        while (auto* op = pop_front())
            op->execute();
    }

    // 表示未来不会再加入任何 task
    void finish() {
        std::unique_lock lk(mtx);
        finishing = true;
        cv.notify_all();
    }
};

int main() {
    run_loop loop;
    auto sched4 = loop.get_scheduler();
    auto s4 = then(schedule(sched4), [](auto) { return 42; });
    auto op4 = connect(s4, cout_receiver{});
    // start() 仅表示入队
    start(op4);
    auto s5 = then(schedule(sched4), [](auto) { return 43; });
    auto op5 = connect(s5, cout_receiver{});
    start(op5);

    loop.finish();
    // 真正执行任务
    loop.run();
}

这里展示了 scheduler(或者说是 execution context)和 sender/receiver 的组合使用。除去 context 实现以外,基本就是 schedule() 构造一个关联上下文的 sender 即可。但是用法过于细节了,实际上可以封装一层简化使用。

thread_context execution context

这里基于上面的 run_loop 实现封装了一个在独立的线程执行任务的上下文。

///////////////////////////////////////////
// thread_context execution context
///////////////////////////////////////////
class thread_context : run_loop {
    std::thread th{[this]{ run(); }};
public:
    using run_loop::get_scheduler;
    using run_loop::finish;
    void join() {
        th.join();
    }
};

int main() {
    thread_context th;
    auto sched6 = th.get_scheduler();

    auto s6 = then(schedule(sched6), [](auto) { return 42; });
    auto s7 = then(s6, [](int i){return i+1;});
    auto val7 = sync_wait(s7).value();
    th.finish();
    th.join();

    std::cout << val7 << '\n';
}

这是最后一份代码了,现在已经接近 stdexec 里面的示例,限于这是一个低配版实现,也算是做得足够简单实用了。你也可以尝试实现 when_all() 算法和 operator|

作者现场写的完整代码可以在 godbolt 获得。

后记

其实也没啥好翻译的,主要还是 Read The Friendly Source Code。视频结尾主持人拉家常的部分(聊到 Range 库的历史去了)我没怎么看,如果你感兴趣也可以看下原视频。


本文已转发到知乎:C++ Execution:从零开始的 Sender/Receiver [译] - 知乎 (zhihu.com)