Home 实现 lockfree 容器:freelist,stack 和 queue
Post
Cancel

实现 lockfree 容器:freelist,stack 和 queue

实现一个 xyz 系列算是本博客的月指活了,这次写的是 lockfree 容器(硬核程度越来越高,以后咋低成本水文章啊)

声明

不建议自己从零造 lockfree 轮子,至少需要有 paper 支撑,或者从已有的项目中改进,不然代码的正确性基本没有任何保证。为此本文参考了模板库 boost::lockfree 的实现以及 MS Queue 的 paper(Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms)。

另外本文实现的容器只关注 lockfree 部分。因此,作为类设计就是不及格的,比如完全无视异常安全、不管是否 const 等等。

无锁 freelist

万事开头难,freelist 虽然实现上较为朴素,但是需要讨论基本的无锁实现注意事项还是挺多的。

freelist 的关键在于:使用过程中获取到的内存资源绝不返还给 OS/libc(这不是内存泄漏,会在析构时再全部返还)。

为什么要这么做呢?这是因为无锁条件下的处理内存回收是个棘手问题(不要相信你的直觉,我翻过车!),能用的手段可以是:

  • Hazard pointer.
  • Lockfree reference counting.
  • Epoch based reclamation.

每一个算法都让人头痛,因此反过来的思路就是:绝不内存回收,而是地址复用。这也是为啥造轮子先造 freelist 的原因。

但是地址(对象)复用的也有问题:那就是著名的 ABA。因为这里判断一个对象是否相同的依据是地址,而前者 A 和后者 A 本质不是相同对象。

为了解决这个 ABA 问题,可以从“地址”的角度去动手,那就是经典的 tagged pointer。它的实现是基于 x86-64 在四级分页下只使用低 48 位有效地址的事实前提来完成的,虽然不具有可移植性,但在这里可以轻易解决问题:地址的高位用了不同的标签(版本号)去区分,因此算法的过程中并不会把复用的对象视为相同,从而避开 ABA。

永久链接:Snippets/Tagged_ptr.hpp at 55d403f52b6cbd6 · Caturra000/Snippets (github.com)

// 48bit pointer
// 只适用于 x86_64 四级分页的情况
// 使用高 16bit 作为 tag 避免 ABA 问题
template <typename T>
class Tagged_ptr {
public:
    Tagged_ptr() = default;
    Tagged_ptr(T *p, uint16_t t = 0): _ptr_tag(make(p, t)) {}
    uint16_t get_tag() { return _ptr_tag >> 48; }
    uint16_t next_tag() { return (get_tag() + 1) & TAG_MASK;}
    T* get_ptr() { return reinterpret_cast<T*>(_ptr_tag & PTR_MASK); }
    void set_ptr(T *p) { _ptr_tag = make(p, get_tag()); }
    void set_tag(uint16_t t) { _ptr_tag = make(get_ptr(), t); }
    T* operator->() { return get_ptr(); }
    T& operator*() { return *get_ptr(); }
    operator bool() { return !!get_ptr(); }
    bool operator==(T *p) { return make(get_ptr(), 0) == make(p, 0); }
    bool operator!=(T *p) { return !operator==(p); }
    bool operator==(std::nullptr_t) { return !operator bool(); }
    bool operator!=(std::nullptr_t) { return operator bool(); }
private:
    constexpr static size_t PTR_MASK = (1ull<<48)-1;
    constexpr static size_t TAG_MASK = (1ull<<16)-1;
    static uint64_t make(T *p, uint16_t t) {
        union {uint64_t x; uint16_t y[4];};
        x = uint64_t(p);
        y[3] = t;
        return x;
    }
private:
    uint64_t _ptr_tag;
};

还有放入 freelist 的元素将直接通过类型双关的技巧来直接作为链表节点使用,以减少内存分配的压力。因此,freelist 容器的内部类 Node 节点类型只有一个 next 成员:

struct Node { Tagged_ptr<Node> next; };

但是需要注意的是,类模板 freelist<T, Alloc> 中的元素类型 T 需要本身大小大于等于 sizeof(Node),因此要么是在 allocator 上动手脚,要么是 T 在内部做封装:

template <typename T, typename Alloc_of_T = std::allocator<T>>
struct Wrapped_elem {
    using Padding = std::byte[64];
    union { T data; Padding _; };
    Wrapped_elem(): data() {}
    template <typename ...Args>
    Wrapped_elem(Args &&...args): data(std::forward<Args>(args)...) {}
    ~Wrapped_elem() {data.~T();}
    // allocator for wrapped_elem
    using Internel_alloc = typename std::allocator_traits<Alloc_of_T>
        ::template rebind_alloc<Wrapped_elem<T, Alloc_of_T>>;
    struct Alloc: private Internel_alloc {
        T* allocate(size_t n, const void* h = 0) {
            [](...){}(h); // unused
            auto ptr = Internel_alloc::allocate(n);
            return &ptr->data;
        }
        void deallocate(T *p, size_t n) {
            Internel_alloc::deallocate(reinterpret_cast<Wrapped_elem*>(p), n);
        }
    };
};

这里通过 union 的技巧保证类型大小,但是 union 本身阻止了类型 T 的 RAII 机制,因此需要手动触发。

其次,封装一个 Alloc 代理类型给上层 freelist 使用,实现了基本的 allocatedeallocate 接口,并保持是与 T 类型交互,因此这个中间层基本不需要过多的改动 freelist 本身的实现。(这个 allocator 写得挺半桶水的,launder 就当不存在好了)

剩下的就是一个基本的 EBO 套上 Alloc,并且 freelist 仿造类似 allocator 的接口。整体接口如下:

template <typename T, typename Alloc = std::allocator<T>>
class Freelist: private Wrapped_elem<T, Alloc>::Alloc {
    struct Node { Tagged_ptr<Node> next; };
    // 分配 T 类型时实际使用的 allocator
    using Custom_alloc = typename Wrapped_elem<T, Alloc>::Alloc;
public:
    Freelist();
    ~Freelist();
// 公开接口全部为线程安全实现
// 仿造 std::allocator 的基本接口
public:
    T* allocate();
    void deallocate(T* p);
    template <typename ...Args>
    void construct(T *p, Args &&...);
    void destroy(T *p);
private:
    std::atomic<Tagged_ptr<Node>> _pool;
};

完整代码如下,除了上述讨论过的细节,其余的算法部分就是 CAS 的应用:

永久链接:Snippets/Freelist.hpp at 55d403f52b · Caturra000/Snippets (github.com)

template <typename T, typename Alloc = std::allocator<T>>
class Freelist: private Wrapped_elem<T, Alloc>::Alloc {
    struct Node { Tagged_ptr<Node> next; };
    // 分配 T 类型时实际使用的 allocator
    using Custom_alloc = typename Wrapped_elem<T, Alloc>::Alloc;
public:
    Freelist();
    ~Freelist();
// 公开接口全部为线程安全实现
// 仿造 std::allocator 的基本接口
public:
    T* allocate();
    void deallocate(T* p);
    template <typename ...Args>
    void construct(T *p, Args &&...);
    void destroy(T *p);
private:
    std::atomic<Tagged_ptr<Node>> _pool;
};
template <typename T, typename Alloc>
Freelist<T, Alloc>::Freelist()
    : _pool{nullptr}
{}
template <typename T, typename Alloc>
Freelist<T, Alloc>::~Freelist() {
    Tagged_ptr<Node> cur = _pool.load();
    while(cur) {
        auto ptr = cur.get_ptr();
        if(ptr) cur = ptr->next;
        Custom_alloc::deallocate(reinterpret_cast<T*>(ptr), 1);
    }
}
template <typename T, typename Alloc>
T* Freelist<T, Alloc>::allocate() {
    Tagged_ptr<Node> old_head = _pool.load(std::memory_order_acquire);
    for(;;) {
        if(old_head == nullptr) {
            return Custom_alloc::allocate(1);
        }
        Tagged_ptr<Node> new_head = old_head->next;
        new_head.set_tag(old_head.next_tag());
        if(_pool.compare_exchange_weak(old_head, new_head,
                std::memory_order_release, std::memory_order_relaxed)) {
            void *ptr = old_head.get_ptr();
            return reinterpret_cast<T*>(ptr);
        }
    }
}
template <typename T, typename Alloc>
void Freelist<T, Alloc>::deallocate(T *p) {
    Tagged_ptr<Node> old_head = _pool.load(std::memory_order_acquire);
    auto new_head_ptr = reinterpret_cast<Node*>(p);
    for(;;) {
        Tagged_ptr new_head {new_head_ptr, old_head.get_tag()};
        new_head->next = old_head;
        if(_pool.compare_exchange_weak(old_head, new_head,
                std::memory_order_release, std::memory_order_relaxed)) {
            return;
        }
    }
}
template <typename T, typename Alloc>
template <typename ...Args>
void Freelist<T, Alloc>::construct(T* p, Args &&...args) {
    new (p) T(std::forward<Args>(args)...);
}
template <typename T, typename Alloc>
void Freelist<T, Alloc>::destroy(T *p) {
    p->~T();
}

无锁 stack

stack 部分能讨论的地方不多,需要注意每个数据成员独占一个 cache line 大小(x84-64 的话一般为 64 字节,可以看 cpuinfo 确认),C++ 提供了 alignas 特性直接支持对齐操作。

接口上为了简化,只提供线程安全的版本。为了性能,理应实现非线程安全版本并应用在合适的地方。

永久链接:Snippets/Stack_lockfree.cpp at 55d403f52b6cbd · Caturra000/Snippets (github.com)

template <typename T, typename Alloc_of_T = std::allocator<T>>
class Stack {
public:
    struct Node {
        T data;
        Tagged_ptr<Node> next;
        template <typename ...Args> Node(Args&&...args)
            : data(std::forward<Args>(args)...), next(nullptr){}
    };
    // Stack 中实际分配使用的 allocator
    using Node_Alloc = typename std::allocator_traits<Alloc_of_T>
                        ::template rebind_alloc<Node>;
    using Node_ptr = Tagged_ptr<Node>;
public:
    Stack();
    ~Stack();
public:
    template <typename ...Args>
    bool push(Args &&...);
    std::optional<T> pop();
    bool empty();
private:
    // cacheline == 64 bytes
    // 将跨越 2 个 cacheline
    alignas(64)
    std::atomic<Node_ptr> _head;
    alignas(64)
    Freelist<Node, Node_Alloc> _pool;
};
template <typename T, typename Alloc_of_T>
Stack<T, Alloc_of_T>::Stack()
    : _head(nullptr)
{}
template <typename T, typename Alloc_of_T>
Stack<T, Alloc_of_T>::~Stack() {
    while(pop());
}
template <typename T, typename Alloc_of_T>
template <typename ...Args>
bool Stack<T, Alloc_of_T>::push(Args &&...args) {
    Node *ptr = _pool.allocate();
    // 为了简化,这里并不处理异常安全
    if(!ptr) return false;
    _pool.construct(ptr, std::forward<Args>(args)...);
    Tagged_ptr<Node> old_head = _head.load(std::memory_order_acquire);
    for(;;) {
        Node_ptr new_head {ptr, old_head.get_tag()};
        new_head->next = old_head;
        if(_head.compare_exchange_weak(old_head, new_head,
                std::memory_order_release, std::memory_order_relaxed)) {
            return true;
        }
    }
}
template <typename T, typename Alloc_of_T>
std::optional<T> Stack<T, Alloc_of_T>::pop() {
    Tagged_ptr<Node> old_head = _head.load(std::memory_order_acquire);
    for(;;) {
        if(!old_head) return std::nullopt;
        Tagged_ptr<Node> new_head = old_head->next;
        new_head.set_tag(old_head.next_tag());
        if(_head.compare_exchange_weak(old_head, new_head,
                std::memory_order_release, std::memory_order_relaxed)) {
            auto opt = std::make_optional<T>(old_head->data);
            auto ptr = old_head.get_ptr();
            _pool.destroy(ptr);
            _pool.deallocate(ptr);
            return opt;
        }
    }
}
template <typename T, typename Alloc_of_T>
bool Stack<T, Alloc_of_T>::empty() {
    return !_head.load(std::memory_order_relaxed);
}

[WIP] 无锁 Queue

无锁 Queue 部分目前仍在进行中,代码有较多的地方不严谨,仅作记录。

Queue 实现不同于前两个容器,它需要的算法不够直观(因为涉及到 headtail 两部分),细节已写入到注释中。

永久链接:Snippets/Queue_lockfree.cpp at 55d403f52b6cbd · Caturra000/Snippets (github.com)

template <typename T, typename Alloc_of_T = std::allocator<T>>
class Queue {
public:
    struct Node;
    using Node_ptr = Tagged_ptr<Node>;
    using Requirements = std::enable_if_t<std::is_trivial_v<T> /* && TODO */>;
    struct alignas(64) Node {
        T data;
        std::atomic<Node_ptr> next;
        template <typename ...Args> Node(Args &&...args)
            : data(std::forward<Args>(args)...)
        {
            next.store(Node_ptr{nullptr}, std::memory_order_relaxed);
        }
    };
    using Node_alloc = typename std::allocator_traits<Alloc_of_T>
                        :: template rebind_alloc<Node>;
public:
    Queue();
    ~Queue();
public:
    // 在 MS queue 原来的算法中,push 必然成功
    // 但是 freelist 可能会失败,因此仍保留 bool 接口
    template <typename ...Args>
    bool push(Args &&...);
    std::optional<T> pop();
private:
    Freelist<Node, Node_alloc> _pool;
    // 各自独占 cacheline
    alignas(64) std::atomic<Node_ptr> _head;
    alignas(64) std::atomic<Node_ptr> _tail;
};
template <typename T, typename A>
Queue<T, A>::Queue() {
    Node *dummy = _pool.allocate();
    if(!dummy) throw std::bad_alloc();
    _pool.construct(dummy);
    Node_ptr tagged_dummy {dummy, 0x5a5a};
    _head.store(tagged_dummy);
    _tail.store(tagged_dummy);
}
template <typename T, typename A>
Queue<T, A>::~Queue() {
    while(pop());
}
template <typename T, typename A>
template <typename ...Args>
bool Queue<T, A>::push(Args &&...args) {
    auto node_ptr = _pool.allocate();
    if(!node_ptr) return false;
    _pool.construct(node_ptr, std::forward<Args>(args)...);
    Tagged_ptr<Node> node {node_ptr};
    // 假设 Tx 是当前线程,Ty 是第二个线程
    // (虽然是 lockfree,但算法中有实际进展的是任意 2 条线程)
    for(;;) {
        auto tail = _tail.load(std::memory_order_acquire);
        auto next = tail->next.load(std::memory_order_acquire);
        auto tail2 = _tail.load(std::memory_order_acquire);
        // 需要 tag 比较,确保 tail 和 next 是一致的
        if(tail != tail2) continue;
        // L1 能确保原子性的 append(需要后续 L2 CAS 的保证)
        // 因为 MS Queue 存在不断链的性质
        if(next == nullptr) {                                                   // L1
            node.set_tag(next.next_tag());
            // failed 意味着有其他线程至少提前完成了 L2,导致当前 L1 条件违反
            if(tail->next.compare_exchange_weak(next, node,
                    std::memory_order_release, std::memory_order_relaxed)) {    // L2
                node.set_tag(tail.next_tag());
                // 我认为这一步不可能 failed
                // Tx 执行 L2 成功,意味着 Ty 即使能执行到 L1,也不能 CAS 到 L2,
                // 因为 L1 的条件是整个链表中唯一存在的,
                // 而此时符合 next==nullptr 的 node 并没有发布出去(L3)
                //
                // Note: 不应使用 timed lock(避免 spurious failure),因此不是 weak
                //
                // Note: 实际 L3 上可能返回 false,见 L4,其实是在不同线程上完成相同的工作
                _tail.compare_exchange_strong(tail, node,
                    std::memory_order_release, std::memory_order_relaxed);      // L3
                return true;
            }
        // Ty 完成了 L2 甚至 L3
        } else {
            next.set_tag(tail.next_tag());
            // _tail 在 MS Queue 中是指向最后一个或者倒数第二个结点
            // - 指向最后一个结点就不必多说了,常规数据结构的形态
            // - 指向倒数第二个是因为存在 Ty 完成了 L2,但是 L3 尚未完成
            // 
            // 如果 Ty L3 未完成,失败方会“推波助澜”,帮助 Tx 完成 tail 向前移动到最后一个结点的操作
            // (next 如果可见了,那也是唯一确定的)
            // 这样可以提高并发吞吐
            _tail.compare_exchange_strong(tail, next,
                std::memory_order_release, std::memory_order_relaxed);          // L4
        }
    }
}
template <typename T, typename A>
std::optional<T> Queue<T, A>::pop() {
    for(;;) {
        auto head = _head.load(std::memory_order_acquire);
        auto tail = _tail.load(std::memory_order_acquire);
        auto next = head->next.load(std::memory_order_acquire);
        // 保证 head tail next 一致性
        auto head2 = _head.load(std::memory_order_acquire);
        if(head != head2) continue;
        if(head.get_ptr() == tail.get_ptr()) {
            // is dummy
            if(!next) return std::nullopt;
            next.set_tag(tail.next_tag());
            // Ty push 进行中,Tx 推波助澜
            _tail.compare_exchange_strong(tail, next,
                std::memory_order_release, std::memory_order_relaxed);
        } else {
            if(!next) continue;
            // copy
            auto opt = std::make_optional<T>(next->data);
            next.set_tag(head.next_tag());
            if(_head.compare_exchange_weak(head, next,
                    std::memory_order_release, std::memory_order_relaxed)) {
                auto old_head_ptr = head.get_ptr();
                _pool.destroy(old_head_ptr);
                _pool.deallocate(old_head_ptr);
                return opt;
            }
        }
    }
}

References

Chapter 20. Boost.Lockfree - 1.81.0

Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms (rochester.edu)

c++ - Non-POD types in lock-free data structures - Stack Overflow

data structures - Lock-free queue algorithm, repeated reads for consistency - Stack Overflow

This post is licensed under CC BY 4.0 by the author.
Contents