背景一

dispatchpostdefer 是 Asio 提供的任务提交接口。它们的注释都相同(Submits a completion token or function object for execution),但是意图是相当的不同。考虑到这可能是一个关于任务执行的通用问题,本文标题就不特别标明 Asio 了。

// 可以更换为 dispatch 或者 defer
asio::post(executor, [] { /* do stuff. */ });

对于没了解过 Asio 的读者,你可以简单理解任务提交为上述接口形式。executor 可能是某个指定线程、线程池或者其他类似的概念,任务(lambda)并不局限于 I/O 函数。至于任务参数类型、函数内是否保证线程安全、返回确定方式是 one way 还是 two way 等细节由 executor 决定,本文不讨论。

背景二

众所周知 Asio 的文档只能用简洁抽象来形容,就别指望从里面寻找答案了。其实 Asio 的任务执行模型(executor)有相当一部分的设计说明存放在多年前的 C++ Networking TS 提案(非常零散)里面,你可以从中找到答案。本文简单整理 N4242 给出的说明。

dispatch

dispatch 表示采用 eager 模式:任务在不影响执行流的正确性的前提下尽可能立刻执行。

int main() {
    // ...

    auto thread_pool_ex = /* ... */;
    asio::dispatch(thread_pool_ex, [] { /*...*/ })

    // ...
}

// 某个已经在 thread pool 运行的任务
void manager::stuff() {
    asio::dispatch(thread_pool_ex_, stuff_imm);
}

这里「正确性」指的是调用者(caller)所在的上下文是否与 executor 相同。比如,你在 main 函数的上下文当中企图提交任务到线程池,但是线程池本身还没有开启,那肯定不会立刻执行;如果线程池已经启动,你也不能确定任务是否立刻执行,线程池的资源可能全部繁忙。而对于已经处于线程池上下文的任务来说,它也可以进行任务提交,此时的 dispatch 等价于直接执行 manager::stuff_imm

class manager {
    // 表明用户保证,该 manager 的流程肯定只在线程池的上下文中执行
    manager(thread_pool_t &thread_pool): thread_pool_ex_{thread_pool.get_executor(inline_tag)} {}
    // 该 executor 由模板或者函数重载等手段推导出具体类型,
    // thread_pool_t::inline_executor::dispatch(auto f) 实现可以是 f();
    decltype(/*...*/) thread_pool_ex_;
};

这种立刻执行的 eager 模式,使得编译器能够提供优化机会:整个 dispatch(ex, stuff) 接口可以直接内联为 stuff() 任务调用 [1]。比如,上面 thread_pool_ex_ 是数据成员,实际可以通过简单的函数重载使得编译时地推导为一个内联执行器。

[1] 这一块可能需要提前理解 executor 模型的 execution context 与 lightweight executor 的区别。如果说 thread_pool 是一个执行上下文实例,那么 thread_pool_executor 可以只是一个指涉到 thread_pool 的描述符。前者通常是不可复制的;后者是可复制的且复制成本低廉(以当作值来使用),并且一致地通过 get_executor 接口获取。更多信息还请自行谷歌,比如这里 inline_tag 不算标准设计,但是 Asio 确实提供了描述 property 的方式帮你更聪明且一致地推断出来等等。(关键字 require/prefer)

post

与 dispatch 相反,post 表示采用 lazy 模式:绝不可能在 post 提交内部流程中执行该任务。

很简单的概念,就是任务必须接受调度。具体的实现方式可以只是把任务丢到某个队列当中。

这种接口至少给用户提供一个公平调度的选择(取决于具体的调度器),而 dispatch 允许没有调度,也可近似认为后者就是非公平调度。

NOTE: 这里的 lazy 并不符合某种语境中的 lazy,比如上一篇文章提到的结构化并发规则,后者是延迟到用户等待完成结果时才会提交任务,在这种语境中前者也是 eager 模式。

defer

defer 继承了 post 的性质,但是进一步表明了 caller/callee 之间的续体关系(continuation)。

// 某个已经在 thread pool 运行的任务
// 但是 post 并__不__保证任务调度是先做 do_stuff(),然后是 do_other_stuff()
void manager::stuff() {
    // 在这里,caller 为 manager::stuff,callee 为 do_other_stuff
    asio::post(thread_pool_ex_, do_other_stuff);
    do_stuff();
}

前面的 post 设计并不保证 caller/callee 之间的续体关系,因此 do_other_stuff 和 do_stuff 完全可以是并发执行的。

void manager::stuff() {
    asio::defer(thread_pool_ex_, do_other_stuff);
    do_stuff();
}

而 defer 能明确保证 do_other_stuff() 的执行在 do_stuff() 执行结束之后 [2]

[2] 实际执行流为: do_stuff → caller 退出 → … → 执行流回到调度器 → callee(do_other_stuff),所以说这里 do_stuff 只是 caller 的一部分,而 defer 保证的是 caller 和 callee 的完整先后顺序。有部分提案也指出了其先后顺序还要给出 synchronizes-with 保证,这些内存模型的细节我们就不讨论了。

void read_loop(Socket socket, Buffer buffer)
{
  async_read(socket, buffer,
    [&](error_code, size_t n) {
      process_data(buffer, n);
      read_loop(socket, buffer);
    });
}

// 一个简化的 async_read 实现
template <class Handler>
void async_read(Socket socket, Buffer buffer, Handler handler)
{
  // 先做一个试探性
  error_code ec;
  size_t n = non_blocking_read(socket, buffer, ec);
  // 可能早已在内核缓冲区里了
  if (ec != would_block)
  {
    // 此读操作可以立刻完成,
    ex = get_associated_executor(handler);
    // 并且使用 post 来做完成通知
    post(ex, [=]{ handler(ec, n); });
  }
  else
  {
    // Wait for socket to become readable.
    // ...
  }
}

// 结合上面的 async_read 实现,该场景的 read_loop 可以简化为
void read_loop(socket, buffer)
{
  // 也可以是 defer,后面有对比两种实现的不同
  ex.post([&]{ // #1 
      read_loop(socket, buffer);
    });
}

不仅如此,该接口可以潜在提供并发性能优化。Asio 作者使用了一个异步读循环操作的场景来说明 post 和 defer 的性能差异。示例 read_loop 的递归回调(handler)是一种常见的异步循环模式,这种场景不管是 post 还是 defer 都是符合语义的。

post 语义和对应线程池实现
class my_thread_pool
{
public:
  class executor_type
  {
  public:
    // ...

    template <class Func, class Alloc>
    void post(Func f, const Alloc& a)
    {
      auto p(std::allocate_shared<item<Func>>(a, std::move(f)));
      std::lock_guard<std::mutex> lock(pool_.mutex_); // #2 
      pool_.queue_.push_back(std::move(p)); // #3 
      pool_.condition_.notify_one(); // #4 
      // #5 
    }

    // ...
  };

  // ...

  void run()
  {
    for (;;)
    {
      std::unique_lock<std::mutex> lock(mutex_); // #6 
      condition_.wait(lock, [&]{ !queue_.empty(); });
      auto p(std::move(queue_.front())); // #7 
      queue_.pop_front();
      lock.unlock(); // #8 
      p->execute_(p); // #9 
    }
  }

private:
  std::mutex mutex_;
  std::condition_variable condition_;
  std::deque<std::shared_ptr<item_base>> queue_;
};
#6 — lock 第一对锁操作
#7 — dequeue read_loop
#8 — unlock
#9 — call read_loop
    #1 — call post
        #2 — lock 第二对锁操作
        #3 — enqueue read_loop
        #4 — notify 唤醒操作
        #5 — unlock
(start of next cycle)
#6 — lock
#7 — dequeue read_loop
#8 — unlock
#9 — call read_loop
...

假设执行上下文是一个基于锁实现的线程池,那么 post 会使得每一次循环都要经历两次(对)锁操作和一次唤醒操作。

defer 语义和对应线程池实现
class my_thread_pool
{
public:
  class executor_type
  {
  public:
    // ...

    template <class Func, class Alloc>
    void defer(Func f, const Alloc& a)
    {
      if (pool_.thread_local_queue_)
      {
        auto p(std::allocate_shared<item<Func>>(a, std::move(f)));
        pool_.thread_local_queue_->push_back(std::move(p)); // #2 
      }
      else
        post(std::move(f), a);
    }

    // ...
  };

  // ...

  void run()
  {
    std::deque<std::shared_ptr<item_base>> local_queue;
    thread_local_queue_ = &local_queue;
    for (;;)
    {
      std::unique_lock<std::mutex> lock(mutex_); // #3 
      while (!local_queue.empty()) // #4 
      {
        queue_.push(std::move(local_queue.front()));
        local_queue.pop_front();
      }
      condition_.wait(lock, [&]{ !queue_.empty(); });
      auto p(std::move(queue_.front())); // #5 
      queue_.pop_front();
      lock.unlock(); // #6 
      p->execute_(p); // #7 
    }
  }

private:
  std::mutex mutex_;
  std::condition_variable condition_;
  std::deque<std::shared_ptr<item_base>> queue_;
  static thread_local std::deque<std::shared_ptr<item_base>>* thread_local_queue_;
};
#3 — lock 只有这一对锁操作
#4 — copy contents of thread-local queue to main queue
#5 — dequeue read_loop
#6 — unlock
#7 — call read_loop
    #1 — call defer
        #2 — enqueue read_loop to thread-local queue 不需要锁和唤醒
(start of next cycle)
#3 — lock
#4 — copy contents of thread-local queue to main queue
#5 — dequeue read_loop
#6 — unlock
#7 — call read_loop
...

而使用 defer 来描述任务的延续性,则可以通过线程私有的方式将 callee 延续到 worker thread 重新调度时再执行,其提交流程可以减少一对锁开销和一次唤醒开销。

void start_leader() {
    auto socket = longrunning_accept();
    // Promote a follower thread
    // to become the new leader.
    asio::post(executor, [] { start_leader() });
    longrunning_process(std::move(socket));
}

需要注意 defer 是要看场景优化的。如果你是写一个 leader/follower 模式,callee 提交是要求 fork/spawn 的执行方式,那自然是不能将 callee 视为续体了;另外一点是 post 其实并发程度可以比 defer 高,毕竟后者是存在控制依赖的。

总结

任务提交 场景选择
dispatch 最小化延迟(接受阻塞)
post 默认选择;不阻塞当前进展;最大化并发
defer 链式操作;post 优化(如果可以)

如果你觉得精挑细选很麻烦,那么选择 post 通常是最省事的。

后记

atelier-exprience 有经验的玩家都知道,海胆长在树上

其实这些概念都是 Asio 作者十几年前玩剩下的,所以说人与人之间的经验差距还是太大了……