Home [翻译] 从零开始的Sender/Receiver
Post
Cancel

[翻译] 从零开始的Sender/Receiver

cover

这是去年NVIDIA stdexec作者Eric Niebler在油管上的live coding:一小时教你实现低配版std::execution。如果你想了解std::execution标准库的设计但是没耐心翻阅一百多页的P2300,那可以先看下这里。原视频:Working with Asynchrony Generally: From Zero to Sender/Receiver in ~60 Minutes

What is it?

what-is

什么是std::execution?你可以理解为一个标准的异步编程模型,旨在让整个C++社区都使用这套方案来描述异步计算。这种模型可以让你有标准的方式指定一个任务该在何处、如何以及何时执行;另外还提供了一些通用的异步算法供用户直接使用,比如then()算法可用于完成链式操作。当然它是可扩展的,不管是事件循环、线程池等异步方式还是异构计算(CPU+GPU)都没问题,你可以看后面的第一个示例去了解如何在这个模型中使用第三方库

Three big things

high-level-degisn

std::execution中最主要的三个概念就是scheduler、sender和receiver:

  • sender是lazy value:这意味着这个值会延迟到需要时才计算
  • receiver是continuation或者callback:当sender计算完成时,得到的值会传入receiver而不是返回给调用方
  • scheduler是资源的描述符:实际上可能是个简单的指针

Example

example

接下来简单用一个例子来阐述三大件的编写思路。这个例子是在线程池并发运行三个任务:

  1. 声明一个第三方的execution context,然后从中获取scheduler。在这里execution context指的是第三方库libunifex提供的static_thread_pool线程池
  2. 通过schedule()创建空的任务单元。这意味着任务将运行在static_thread_pool的上下文中
  3. schedule()返回一个sender,可以接着调用then()算法。此前的空任务会切换执行某种任务比如compute_intensive()then()同样也会返回一个sender
  4. 这里尝试三个任务并发执行,因此使用when_all()when_all()返回一个sender
  5. 最后,我们执行sync_wait()来同步等待并得到结果。注意在调用这个函数前任何任务都没有实际执行(整个sender/receiver模型是lazy设计)。sync_wait()返回std::optional<std::tuple<T>>,因此你可以继续调用std::optional<>::value()

Big idea

从前面的例子可以看出,整体用法就是:

  • scheduler生成sender
  • 异步算法(比如then())接受和返回sender

总之就是一种组合的思路:把sender丢到一种异步算法里面,然后会得到另一个sender,如此往复直到最后等待结果

Lifecycle

lifecycle

怎样才能完成一整套异步操作?上图展示一个异步操作的生命周期:

  1. 第一步需要一个scheduler,通过schedule()得到一个sender
  2. sender通过connect()连接到receiver,得到一个operation state
  3. operation state需要在整个异步操作中维护生命周期
  4. 此时仍未启动任何任务,直到你在operation调用start(),一般来说任务就会入队直到一段时间过去后,操作完成,就会通知receiver

NOTE: 虚线部分是算法实现的细节(比如then()算法),因此对于一个库用户而非实现者来说只需要关注scheduler和sender

notification

还需补充一点是receiver的接收方式有三种:

  • set_value():任务正常完成
  • set_error():任务失败
  • set_stopped():任务被中途取消

Concepts

concepts

从前面的描述也可以总结出上图的concept,实现sender/receiver就是要实现以上接口

just() sender factory

现在我们来实操,先写一个简单的just sender factory

///////////////////////////////////////////
// Some utility code
///////////////////////////////////////////

// 这些是公用的代码,后面会反复用到

// 不可移动类
struct immovable {
    immovable() = default;
    immovable(immovable&&) = delete;
};

// 简单萃取connect()的返回类型
template <class S, class R>
using connect_result_t = decltype(connect(std::declval<S>(), std::declval<R>()));

// 简单萃取sender<result_t>模板中的result_t
template <class S>
using sender_result_t = typename S::result_t;

///////////////////////////////////////////
// just(T) sender factory
///////////////////////////////////////////

// 3. connect()后得到的产物
template <class R, class T>
struct just_operation : immovable {
    R rec;
    T value;

    // operation需要实现start()
    // 由于just()操作是start()后立刻完成的,只需直接通知receiver
    friend void start(just_operation& self) {
        // 通过调用set_value()实现完成通知
        set_value(self.rec, self.value);
    }
};

// 2. just sender存放value
template <class T>
struct just_sender {
    using result_t = T;
    T value;

    // sender需要实现connect(),返回operation state
    template <class R>
    friend just_operation<R, T> connect(just_sender self, R rec) {
        return { {}, rec, self.value };
    }
};

// 1. 要实现的sender factory接口,接受一个值,传递到sender然后立刻完成
template <class T>
just_sender<T> just(T t) {
    return {t};
}

这样就实现了just sender,后面看下如何运作

//
// start test code
//
struct cout_receiver {
    friend void set_value(cout_receiver self, auto val) {
        // 这个receiver什么都不干,只对收集到的结果输出
        std::cout << "Result: " << val << '\n';
    }

    friend void set_error(cout_receiver self, std::exception_ptr err) {
        std::terminate();
    }

    friend void set_stopped(cout_receiver self) {
        std::terminate();
    }
};

int main() {
    // just() factory生成just sender
    auto s = just(42);
    // just sender连接cout receiver生成operation
    auto op = connect(s, cout_receiver{});
    // 运行operation
    start(op);
}

上面的代码大概是这个意思:生成值42,然后输出

just 画图真累啊

很显然这个测试只是一份输出42的无聊代码,但后面的迭代很快会支持链式操作

then() sender adaptor

现在来实现一个then()算法,then(sender, f)表示sender的任务链接到另一个任务f,这样能支持链式操作

///////////////////////////////////////////
// then(Sender, Function) sender adaptor
///////////////////////////////////////////

// 4.
template <class R, class F>
struct then_receiver {
    R rec;
    F f;

    // 当通知完成时,再将收集的值val转换为f(val)
    friend void set_value(then_receiver self, auto val) {
        set_value(self.rec, self.f(val));
    }

    friend void set_error(then_receiver self, std::exception_ptr err) {
        set_error(self.rec, err);
    }

    friend void set_stopped(then_receiver self) {
        set_stopped(self.rec);
    }
};

// 3.
template <class S, class R, class F>
struct then_operation : immovable {
    // S为内部sender
    // 这里需要实现then_receiver
    connect_result_t<S, then_receiver<R, F>> op;

    friend void start(then_operation& self) {
        // 先执行内部start,就能启动链式操作
        start(self.op);
        // 实际后面也没事情干了
    }
};

// 2. 现在要实现then()返回的then_sender
template <class S, class F>
struct then_sender {
    using result_t = std::invoke_result_t<F, sender_result_t<S>>;
    // 需要暂存then()传递过来的内部sender和function
    S s;
    // function需要内部sender产生的value才能传递下去
    // 并且f需要传递给receiver
    F f;

    template <class R>
    friend then_operation<S, R, F> connect(then_sender self, R rec) {
        // connect()操作先完成内部sender与then_receiver的connect()
        // 这里需要实现一个then_operation和then_receiver
        // NOTE:
        // 虽然operation是immovable,但是C++17有guaranteed copy elision
        // 因此返回是没有问题
        return { {}, connect(self.s, then_receiver<R, F>{rec, self.f}) };
    }
};

// 1. 要实现的then()接口
// 需要返回then_sender
template <class S, class F>
then_sender<S, F> then(S s, F f) {
    // 简单的构造sender
    return {s, f};
}

int main() {
    auto s2 = then(just(42), [](int i) { return i + 1; });
    auto op2 = connect(s2, cout_receiver{});
    start(op2);
}

这样就得到了一个链式操作:先计算得到42,然后链式传递给某个匿名函数得到43并输出。附上一幅帮助理解的流程图(画得有点复杂,尽力了)

then

sync_wait() sender consumer

现在要为异步操作增加一个同步等待的特性,即sync_wait()(尽管这里还没有异步操作……)

//////////////////////////////////////////
// sync_wait() sender consumer
///////////////////////////////////////////

// 2. 考虑到异步操作可以在不同的线程间执行
// 因此需要一个control block
struct sync_wait_data {
    std::mutex mtx;
    std::condition_variable cv;
    std::exception_ptr err;
    bool done = false;
};

// 3. 需要一个receiver来实现value收集和通知
template <class T>
struct sync_wait_receiver {
    sync_wait_data& data;
    std::optional<T>& value;

    friend void set_value(sync_wait_receiver self, auto val) {
        std::unique_lock lk{self.data.mtx};
        self.value.emplace(val);
        self.data.done = true;
        self.data.cv.notify_one();
    }

    friend void set_error(sync_wait_receiver self, std::exception_ptr err) {
        std::unique_lock lk{self.data.mtx};
        self.data.err = err;
        self.data.done = true;
        self.data.cv.notify_one();
    }

    friend void set_stopped(sync_wait_receiver self) {
        std::unique_lock lk{self.data.mtx};
        self.data.done = true;
        self.data.cv.notify_one();
    }
};

// 1. 从前面的例子可看到,sync_wait接收一个任务,即sender
template <class S>
std::optional<sender_result_t<S>> sync_wait(S s) {
    // 通过前面2份代码可以看出,sender一般会实现result_t方便萃取
    using T = sender_result_t<S>;
    // 需要一个control block来保证线程安全(见2.)
    sync_wait_data data;
    std::optional<T> value;

    // 接下来的问题就是如何得到value

    auto op = connect(s, sync_wait_receiver<T>{data, value});
    start(op);

    // 提供一个同步点,在sync_wait_receiver对应的set_value处唤醒(见3.)
    std::unique_lock lk{data.mtx};
    data.cv.wait(lk, [&]{return data.done;});

    if (data.err)
        std::rethrow_exception(data.err);

    return value;
}

int main() {
    auto s3 = then(just(42), [](int i) { return i + 1; });
    int val3 = sync_wait(s3).value();
    std::cout << "Result: " << val3 << '\n';
}

这个例子是比较简单的,就是依靠一个control block还有receiver来完成同步等待操作。从main()函数可以看出,对于库调用者来说并不需要关心sender以外的事情,比如手动connect()还有receiver的实现都只出现在库内部

run_loop execution context

我们现在还缺少一个真正意义的异步操作,那就提供一个执行上下文来完成这件事情,比如来一个FIFO & MPSC的队列

///////////////////////////////////////////
// run_loop execution context
///////////////////////////////////////////
struct run_loop : immovable {
    struct none{};

    // 用于支持operation泛型擦除,从而放到链表上
    struct task : immovable {
        task* next = this;
        virtual void execute() {}
    };

    // 支持绑定不同receiver的operation
    template <class R>
    struct operation : task {
        R rec;
        run_loop& loop;

        operation(R rec, run_loop& loop)
          : rec(rec), loop(loop) {}

        void execute() override final {
            // 为了简化示例,value都用none来代替
            set_value(rec, none{});
        }

        // operation需要支持start()
        // 这里就是插入链表
        friend void start(operation& self) {
            self.loop.push_back(&self);
        }
    };

    // 一个FIFO链表
    // 作者提到sender/receiver的设计是避免了动态分配的可能
    // 因此使用的是侵入式链表
    // 你可以看到实际task/operation是在main()函数栈上分配的
    task head;
    // 使用循环链表,让代码更加简洁
    task* tail = &head;
    bool finishing = false;
    // 保证线程安全
    std::mutex mtx;
    std::condition_variable cv;

    void push_back(task* op) {
        std::unique_lock lk(mtx);
        op->next = &head;
        tail = tail->next = op;
        cv.notify_one();
    }

    task* pop_front() {
        std::unique_lock lk(mtx);
        cv.wait(lk, [this]{return head.next != &head || finishing;});
        if (head.next == &head)
            return nullptr;
        return std::exchange(head.next, head.next->next);
    }

    struct sender {
        using result_t = none;
        run_loop* loop;

        template <class R>
        friend operation<R> connect(sender self, R rec) {
            return {rec, *self.loop};
        }
    };

    // 用于指向一个上下文
    struct scheduler {
        run_loop* loop;
        // schedule()生成一个关联上下文的sender
        friend sender schedule(scheduler self) {
            return {self.loop};
        }
    };

    scheduler get_scheduler() {
        return {this};
    }

    // 执行上下文位于run()调用者所在的线程
    void run() {
        while (auto* op = pop_front())
            op->execute();
    }

    // 表示未来不会再加入任何task
    void finish() {
        std::unique_lock lk(mtx);
        finishing = true;
        cv.notify_all();
    }
};

int main() {
    run_loop loop;
    auto sched4 = loop.get_scheduler();
    auto s4 = then(schedule(sched4), [](auto) { return 42; });
    auto op4 = connect(s4, cout_receiver{});
    // start()仅表示入队
    start(op4);
    auto s5 = then(schedule(sched4), [](auto) { return 43; });
    auto op5 = connect(s5, cout_receiver{});
    start(op5);

    loop.finish();
    // 真正执行任务
    loop.run();
}

这里展示了scheduler(或者说是execution context)和sender/receiver的组合使用。除去context实现以外,基本就是schedule()构造一个关联上下文的sender即可。但是用法过于细节了,实际上可以封装一层简化使用

thread_context execution context

这里基于上面的run_loop实现封装了一个在独立的线程执行任务的上下文

///////////////////////////////////////////
// thread_context execution context
///////////////////////////////////////////
class thread_context : run_loop {
    std::thread th{[this]{ run(); }};
public:
    using run_loop::get_scheduler;
    using run_loop::finish;
    void join() {
        th.join();
    }
};

int main() {
    thread_context th;
    auto sched6 = th.get_scheduler();

    auto s6 = then(schedule(sched6), [](auto) { return 42; });
    auto s7 = then(s6, [](int i){return i+1;});
    auto val7 = sync_wait(s7).value();
    th.finish();
    th.join();

    std::cout << val7 << '\n';
}

这是最后一份代码了,现在已经接近stdexec里面的示例,限于这是一个低配版实现,也算是做得足够简单实用了。你也可以尝试实现when_all()算法和operator|

作者现场写的完整代码可以在godbolt获得

后记

其实也没啥好翻译的,主要还是Read The Friendly Source Code。视频结尾主持人拉家常的部分(聊到Range库的历史去了)我没怎么看,如果你感兴趣也可以看下原视频


本文已转发到知乎:C++ Execution:从零开始的Sender/Receiver [译] - 知乎 (zhihu.com)

This post is licensed under CC BY 4.0 by the author.
Contents