实现一个 xyz 系列算是本博客的月指活了,这次写的是 lockfree 容器(硬核程度越来越高,以后咋低成本水文章啊)。
声明
不建议自己从零造 lockfree 轮子,至少需要有论文支撑,或者从已有的项目中改进,不然代码的正确性基本没有任何保证。为此本文参考了模板库 boost::lockfree
的实现以及 MS Queue 的论文(Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms)。
另外本文实现的容器只关注 lockfree 部分。因此,作为类设计就是不及格的,比如完全无视异常安全、const
语义和严格别名等等。
无锁 freelist
万事开头难,freelist
虽然实现上较为朴素,但是需要讨论基本的无锁实现注意事项还是挺多的。
freelist 的关键在于:使用过程中获取到的内存资源绝不返还给 OS/libc(这不是内存泄漏,会在析构时再全部返还)。
为什么要这么做呢?这是因为无锁条件下的处理内存回收是个棘手问题(不要相信你的直觉,我翻过车!),能用的手段可以是:
- Hazard pointer.
- Lockfree reference counting.
- Epoch based reclamation.
每一个算法都让人头痛,因此反过来的思路就是:绝不内存回收,而是地址复用。这也是为啥造轮子先造 freelist 的原因。
但是地址(对象)复用的也有问题:那就是著名的 ABA。因为这里判断一个对象是否相同的依据是地址,而前者 A 和后者 A 本质不是相同对象。
为了解决这个 ABA 问题,可以从“地址”的角度去动手,那就是经典的 tagged pointer。它的实现是基于 x86-64 在四级分页下只使用低 48 位有效地址的事实前提来完成的,虽然不具有可移植性,但在这里可以轻易解决问题:地址的高位用了不同的标签(版本号)去区分,因此算法的过程中并不会把复用的对象视为相同,从而避开 ABA。
// 48bit pointer
// 只适用于 x86_64 四级分页的情况
// 使用高 16bit 作为 tag 避免 ABA 问题
template <typename T>
class Tagged_ptr {
public:
Tagged_ptr() = default;
Tagged_ptr(T *p, uint16_t t = 0): _ptr_tag(make(p, t)) {}
uint16_t get_tag() { return _ptr_tag >> 48; }
uint16_t next_tag() { return (get_tag() + 1) & TAG_MASK;}
T* get_ptr() { return reinterpret_cast<T*>(_ptr_tag & PTR_MASK); }
void set_ptr(T *p) { _ptr_tag = make(p, get_tag()); }
void set_tag(uint16_t t) { _ptr_tag = make(get_ptr(), t); }
T* operator->() { return get_ptr(); }
T& operator*() { return *get_ptr(); }
operator bool() { return !!get_ptr(); }
bool operator==(T *p) { return make(get_ptr(), 0) == make(p, 0); }
bool operator!=(T *p) { return !operator==(p); }
bool operator==(std::nullptr_t) { return !operator bool(); }
bool operator!=(std::nullptr_t) { return operator bool(); }
private:
constexpr static size_t PTR_MASK = (1ull<<48)-1;
constexpr static size_t TAG_MASK = (1ull<<16)-1;
static uint64_t make(T *p, uint16_t t) {
union {uint64_t x; uint16_t y[4];};
x = uint64_t(p);
y[3] = t;
return x;
}
private:
uint64_t _ptr_tag;
};
还有放入 freelist
的元素将直接通过类型双关的技巧来直接作为链表节点使用,以减少内存分配的压力。因此,freelist
容器的内部类 Node
节点类型只有一个 next
成员:
struct Node { Tagged_ptr<Node> next; };
但是需要注意的是,类模板 freelist<T, Alloc>
中的元素类型 T
需要本身大小大于等于 sizeof(Node)
,因此要么是在 allocator
上动手脚,要么是 T
在内部做封装:
template <typename T, typename Alloc_of_T = std::allocator<T>>
struct Wrapped_elem {
using Padding = std::byte[64];
union { T data; Padding _; };
Wrapped_elem(): data() {}
template <typename ...Args>
Wrapped_elem(Args &&...args): data(std::forward<Args>(args)...) {}
~Wrapped_elem() {data.~T();}
// allocator for wrapped_elem
using Internel_alloc = typename std::allocator_traits<Alloc_of_T>
::template rebind_alloc<Wrapped_elem<T, Alloc_of_T>>;
struct Alloc: private Internel_alloc {
T* allocate(size_t n, const void* h = 0) {
[](...){}(h); // unused
auto ptr = Internel_alloc::allocate(n);
return &ptr->data;
}
void deallocate(T *p, size_t n) {
Internel_alloc::deallocate(reinterpret_cast<Wrapped_elem*>(p), n);
}
};
};
这里通过 union
的技巧保证类型大小,但是 union
本身阻止了类型 T
的 RAII 机制,因此需要手动触发。
其次,封装一个 Alloc
代理类型给上层 freelist
使用,实现了基本的 allocate
和 deallocate
接口,并保持是与 T
类型交互,因此这个中间层基本不需要过多的改动 freelist
本身的实现。(这个 allocator
写得挺半桶水的,launder
就当不存在好了)
剩下的就是一个基本的 EBO 套上 Alloc
,并且 freelist
仿造类似 allocator
的接口。整体接口如下:
template <typename T, typename Alloc = std::allocator<T>>
class Freelist: private Wrapped_elem<T, Alloc>::Alloc {
struct Node { Tagged_ptr<Node> next; };
// 分配 T 类型时实际使用的 allocator
using Custom_alloc = typename Wrapped_elem<T, Alloc>::Alloc;
public:
Freelist();
~Freelist();
// 公开接口全部为线程安全实现
// 仿造 std::allocator 的基本接口
public:
T* allocate();
void deallocate(T* p);
template <typename ...Args>
void construct(T *p, Args &&...);
void destroy(T *p);
private:
std::atomic<Tagged_ptr<Node>> _pool;
};
完整代码如下,除了上述讨论过的细节,其余的算法部分就是 CAS 的应用:
template <typename T, typename Alloc = std::allocator<T>>
class Freelist: private Wrapped_elem<T, Alloc>::Alloc {
struct Node { Tagged_ptr<Node> next; };
// 分配 T 类型时实际使用的 allocator
using Custom_alloc = typename Wrapped_elem<T, Alloc>::Alloc;
public:
Freelist();
~Freelist();
// 公开接口全部为线程安全实现
// 仿造 std::allocator 的基本接口
public:
T* allocate();
void deallocate(T* p);
template <typename ...Args>
void construct(T *p, Args &&...);
void destroy(T *p);
private:
std::atomic<Tagged_ptr<Node>> _pool;
};
template <typename T, typename Alloc>
Freelist<T, Alloc>::Freelist()
: _pool{nullptr}
{}
template <typename T, typename Alloc>
Freelist<T, Alloc>::~Freelist() {
Tagged_ptr<Node> cur = _pool.load();
while(cur) {
auto ptr = cur.get_ptr();
if(ptr) cur = ptr->next;
Custom_alloc::deallocate(reinterpret_cast<T*>(ptr), 1);
}
}
template <typename T, typename Alloc>
T* Freelist<T, Alloc>::allocate() {
Tagged_ptr<Node> old_head = _pool.load(std::memory_order_acquire);
for(;;) {
if(old_head == nullptr) {
return Custom_alloc::allocate(1);
}
Tagged_ptr<Node> new_head = old_head->next;
new_head.set_tag(old_head.next_tag());
if(_pool.compare_exchange_weak(old_head, new_head,
std::memory_order_release, std::memory_order_relaxed)) {
void *ptr = old_head.get_ptr();
return reinterpret_cast<T*>(ptr);
}
}
}
template <typename T, typename Alloc>
void Freelist<T, Alloc>::deallocate(T *p) {
Tagged_ptr<Node> old_head = _pool.load(std::memory_order_acquire);
auto new_head_ptr = reinterpret_cast<Node*>(p);
for(;;) {
Tagged_ptr new_head {new_head_ptr, old_head.get_tag()};
new_head->next = old_head;
if(_pool.compare_exchange_weak(old_head, new_head,
std::memory_order_release, std::memory_order_relaxed)) {
return;
}
}
}
template <typename T, typename Alloc>
template <typename ...Args>
void Freelist<T, Alloc>::construct(T* p, Args &&...args) {
new (p) T(std::forward<Args>(args)...);
}
template <typename T, typename Alloc>
void Freelist<T, Alloc>::destroy(T *p) {
p->~T();
}
无锁 stack
stack
部分能讨论的地方不多,需要注意每个数据成员独占一个 cache line 大小(x84-64 的话一般为 64 字节,可以看 cpuinfo 确认),C++
提供了 alignas
特性直接支持对齐操作。
接口上为了简化,只提供线程安全的版本。为了性能,理应实现非线程安全版本并应用在合适的地方。
template <typename T, typename Alloc_of_T = std::allocator<T>>
class Stack {
public:
struct Node {
T data;
Tagged_ptr<Node> next;
template <typename ...Args> Node(Args&&...args)
: data(std::forward<Args>(args)...), next(nullptr){}
};
// Stack 中实际分配使用的 allocator
using Node_Alloc = typename std::allocator_traits<Alloc_of_T>
::template rebind_alloc<Node>;
using Node_ptr = Tagged_ptr<Node>;
public:
Stack();
~Stack();
public:
template <typename ...Args>
bool push(Args &&...);
std::optional<T> pop();
bool empty();
private:
// cacheline == 64 bytes
// 将跨越 2 个 cacheline
alignas(64)
std::atomic<Node_ptr> _head;
alignas(64)
Freelist<Node, Node_Alloc> _pool;
};
template <typename T, typename Alloc_of_T>
Stack<T, Alloc_of_T>::Stack()
: _head(nullptr)
{}
template <typename T, typename Alloc_of_T>
Stack<T, Alloc_of_T>::~Stack() {
while(pop());
}
template <typename T, typename Alloc_of_T>
template <typename ...Args>
bool Stack<T, Alloc_of_T>::push(Args &&...args) {
Node *ptr = _pool.allocate();
// 为了简化,这里并不处理异常安全
if(!ptr) return false;
_pool.construct(ptr, std::forward<Args>(args)...);
Tagged_ptr<Node> old_head = _head.load(std::memory_order_acquire);
for(;;) {
Node_ptr new_head {ptr, old_head.get_tag()};
new_head->next = old_head;
if(_head.compare_exchange_weak(old_head, new_head,
std::memory_order_release, std::memory_order_relaxed)) {
return true;
}
}
}
template <typename T, typename Alloc_of_T>
std::optional<T> Stack<T, Alloc_of_T>::pop() {
Tagged_ptr<Node> old_head = _head.load(std::memory_order_acquire);
for(;;) {
if(!old_head) return std::nullopt;
Tagged_ptr<Node> new_head = old_head->next;
new_head.set_tag(old_head.next_tag());
if(_head.compare_exchange_weak(old_head, new_head,
std::memory_order_release, std::memory_order_relaxed)) {
auto opt = std::make_optional<T>(old_head->data);
auto ptr = old_head.get_ptr();
_pool.destroy(ptr);
_pool.deallocate(ptr);
return opt;
}
}
}
template <typename T, typename Alloc_of_T>
bool Stack<T, Alloc_of_T>::empty() {
return !_head.load(std::memory_order_relaxed);
}
[WIP] 无锁 Queue
无锁 Queue
部分目前仍在进行中,代码有较多的地方不严谨,仅作记录。
Queue
实现不同于前两个容器,它需要的算法不够直观(因为涉及到 head
和 tail
两部分),细节已写入到注释中。
template <typename T, typename Alloc_of_T = std::allocator<T>>
class Queue {
public:
struct Node;
using Node_ptr = Tagged_ptr<Node>;
using Requirements = std::enable_if_t<std::is_trivial_v<T> /* && TODO */>;
struct alignas(64) Node {
T data;
std::atomic<Node_ptr> next;
template <typename ...Args> Node(Args &&...args)
: data(std::forward<Args>(args)...)
{
next.store(Node_ptr{nullptr}, std::memory_order_relaxed);
}
};
using Node_alloc = typename std::allocator_traits<Alloc_of_T>
:: template rebind_alloc<Node>;
public:
Queue();
~Queue();
public:
// 在 MS queue 原来的算法中,push 必然成功
// 但是 freelist 可能会失败,因此仍保留 bool 接口
template <typename ...Args>
bool push(Args &&...);
std::optional<T> pop();
private:
Freelist<Node, Node_alloc> _pool;
// 各自独占 cacheline
alignas(64) std::atomic<Node_ptr> _head;
alignas(64) std::atomic<Node_ptr> _tail;
};
template <typename T, typename A>
Queue<T, A>::Queue() {
Node *dummy = _pool.allocate();
if(!dummy) throw std::bad_alloc();
_pool.construct(dummy);
Node_ptr tagged_dummy {dummy, 0x5a5a};
_head.store(tagged_dummy);
_tail.store(tagged_dummy);
}
template <typename T, typename A>
Queue<T, A>::~Queue() {
while(pop());
}
template <typename T, typename A>
template <typename ...Args>
bool Queue<T, A>::push(Args &&...args) {
auto node_ptr = _pool.allocate();
if(!node_ptr) return false;
_pool.construct(node_ptr, std::forward<Args>(args)...);
Tagged_ptr<Node> node {node_ptr};
// 假设 Tx 是当前线程,Ty 是第二个线程
// (虽然是 lockfree,但算法中有实际进展的是任意 2 条线程)
for(;;) {
auto tail = _tail.load(std::memory_order_acquire);
auto next = tail->next.load(std::memory_order_acquire);
auto tail2 = _tail.load(std::memory_order_acquire);
// 需要 tag 比较,确保 tail 和 next 是一致的
if(tail != tail2) continue;
// L1 能确保原子性的 append(需要后续 L2 CAS 的保证)
// 因为 MS Queue 存在不断链的性质
if(next == nullptr) { // L1
node.set_tag(next.next_tag());
// failed 意味着有其他线程至少提前完成了 L2,导致当前 L1 条件违反
if(tail->next.compare_exchange_weak(next, node,
std::memory_order_release, std::memory_order_relaxed)) { // L2
node.set_tag(tail.next_tag());
// 我认为这一步不可能 failed
// Tx 执行 L2 成功,意味着 Ty 即使能执行到 L1,也不能 CAS 到 L2,
// 因为 L1 的条件是整个链表中唯一存在的,
// 而此时符合 next==nullptr 的 node 并没有发布出去(L3)
//
// Note: 不应使用 timed lock(避免 spurious failure),因此不是 weak
//
// Note: 实际 L3 上可能返回 false,见 L4,其实是在不同线程上完成相同的工作
_tail.compare_exchange_strong(tail, node,
std::memory_order_release, std::memory_order_relaxed); // L3
return true;
}
// Ty 完成了 L2 甚至 L3
} else {
next.set_tag(tail.next_tag());
// _tail 在 MS Queue 中是指向最后一个或者倒数第二个结点
// - 指向最后一个结点就不必多说了,常规数据结构的形态
// - 指向倒数第二个是因为存在 Ty 完成了 L2,但是 L3 尚未完成
//
// 如果 Ty L3 未完成,失败方会“推波助澜”,帮助 Tx 完成 tail 向前移动到最后一个结点的操作
// (next 如果可见了,那也是唯一确定的)
// 这样可以提高并发吞吐
_tail.compare_exchange_strong(tail, next,
std::memory_order_release, std::memory_order_relaxed); // L4
}
}
}
template <typename T, typename A>
std::optional<T> Queue<T, A>::pop() {
for(;;) {
auto head = _head.load(std::memory_order_acquire);
auto tail = _tail.load(std::memory_order_acquire);
auto next = head->next.load(std::memory_order_acquire);
// 保证 head tail next 一致性
auto head2 = _head.load(std::memory_order_acquire);
if(head != head2) continue;
if(head.get_ptr() == tail.get_ptr()) {
// is dummy
if(!next) return std::nullopt;
next.set_tag(tail.next_tag());
// Ty push 进行中,Tx 推波助澜
_tail.compare_exchange_strong(tail, next,
std::memory_order_release, std::memory_order_relaxed);
} else {
if(!next) continue;
// copy
auto opt = std::make_optional<T>(next->data);
next.set_tag(head.next_tag());
if(_head.compare_exchange_weak(head, next,
std::memory_order_release, std::memory_order_relaxed)) {
auto old_head_ptr = head.get_ptr();
_pool.destroy(old_head_ptr);
_pool.deallocate(old_head_ptr);
return opt;
}
}
}
}
References
Chapter 20. Boost.Lockfree - 1.81.0
Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms
c++ - Non-POD types in lock-free data structures - Stack Overflow
data structures - Lock-free queue algorithm, repeated reads for consistency - Stack Overflow