背景

这是 perfbook 笔记 关于形式化验证的补充说明,鉴于本人也不会这些(因菜废学!),就只聊工具。本文并非教程,GenMC 的使用很简单,只交代我为什么会推荐这个工具。

内存模型

内存模型很难是明确的事实。对于并非每天接触并行编程的人来说,给定一个 acquire-release 的关系,应该能判断出正确性;但要是来一个 IRIW 示例(独立读-独立写),不一定会意识到 seq-cst 的必要性;再来一大块的数据结构?很难说了。

那么怎么快速确认一个并发程序是正确的呢?

  • 方法一是纸面证明(perfbook 12.1.4.3),但是难以避免工程上 memory order 用错的可能性。
  • 方法二是 TSAN。它确实性价比很高,但是必须运行时触发场景,而且算法没有覆盖语言标准。
  • 方法三是跑测试。但是众所周知实际体系结构和语言抽象机器有差距,并且有编译器优化恰好让你能跑通的例子。这并非实践不可取,事实上测好 x86 和 ARM 已经覆盖正常人的需求。
  • 方法四就是使用模型检验工具(model checker),GenMC 是其中一款试过觉得不错的。

其实方法四的工作量是很低的。内存模型文件是既定的(最难的部分已经帮你写好了),市面上的工具又有多种选择,只需挑一个适合自己的就 OK。

工具对比

perfbook 推荐使用 herd7/litmus 测试,它更多关注于硬件体系结构的内存模型和 LKMM(Linux 内核的内存模型)。虽然 C/C++ 内存模型也是支持的(使用 cpp11.cfg),但是不能直接告诉你是否存在 data race,并且语法上存在一定的限制:比如支持非常有限的 loop-while,以及本身难以表达复杂的数据结构。

最常见的 C++ 内存模型检验工具是 CppMem,语法和上述类似。不过问题在于 CAS 支持似乎并不完善,我仿写了一个 cppreference 上的正确示例,它报出 data race(求指教?)。另外一个问题是搜索遍历状态极慢,一个测试用例跑了半小时,再加两行代码,我永远等不到结果了。

还有对于应用程序开发更加友好的 CDSChecker。基本支持原生 C++11 语法是一大亮点,这意味着要验证正确性,只需复制粘贴稍作修改即可。很可惜连官方提供的 rwlock 示例都无法遍历完整,因为也使用了复杂的循环。虽然作者已经跑路了(git 服务器连不上,只能下载快照),但还是可以作为备选工具。

GenMC 同样支持 C/C++ 语法,由于前端基于 Clang-16(目前不能使用更高版本),甚至支持 C++20 标准。另一个亮点是极快,文档提到对于循环它是能做足够聪明的转换,因此我的 spinlock 测试不到一秒就能得出结果。注意 GenMC 实际检验的是 RC11 内存模型,与应用程序中的实际内存模型有差距,比如 relaxed ordering 中的代码无法复现(r1 == r2 == 42),文档第 3.1 节也特意提到这种现象。不过这对于一个工具来说有参考结果很足够了。

测试用例

// spinlock.cpp
#include <atomic>
#include <stdlib.h>
#include <pthread.h>

struct Spinlock {
    enum State {LOCKED, UNLOCKED};
    std::atomic<State> _state {UNLOCKED};
    void lock() {
        if(_state.exchange(LOCKED, std::memory_order_acquire) == LOCKED) {
            while(_state.exchange(LOCKED, std::memory_order_relaxed) == LOCKED);
            // 移除下面这一行会有不同的结果
            std::atomic_thread_fence(std::memory_order_acquire);
        }
    }
    void unlock() { _state.store(UNLOCKED, std::memory_order_release); }
};

static int x;
static Spinlock spinlock;

void* test(void*) {
    spinlock.lock();
    // 提供 data race 检测的机会
    x++;
    spinlock.unlock();
    return nullptr;
}

// GenMC 不支持 std::thread 和 std::jthread
auto make_thread(auto func) {
    pthread_t tid;
    pthread_create(&tid, {}, func, {}) && (abort(), 1);
    return tid;
}

int main() {
    auto t1 = make_thread(test);
    auto t2 = make_thread(test);
    // ...
}

参数:./genmc -disable-sr -- -std=c++20 spinlock.cpp

有些 warning 是面向内存模型自身的实现,对于应用程序开发可以忽略(运行过程会提示关闭方法)。我们只需关注是否有 hard error 即可。(No errors were detected.)

附录

另一个 SPSC 示例
#include <array>
#include <atomic>
#include <bit>
#include <utility>
#include <stdlib.h>
#include <pthread.h>

// Linux 内核的 kfifo 移植实现
template <typename T, std::size_t SIZE /* Must be a power of 2. */>
    requires requires { requires std::has_single_bit(SIZE); }
struct Kfifo {
    std::array<T, SIZE> _buffer {};
    alignas(64) std::atomic<std::size_t> _in {};
    alignas(64) std::atomic<std::size_t> _out {};

    constexpr auto mod(auto value) {
        return value & (SIZE - 1);
    }

    bool push(T elem) {
        // Only the producer can modify `_in`.
        std::size_t in = _in.load(std::memory_order_relaxed);
        std::size_t next_in = mod(in+1);
        if(next_in == _out.load(std::memory_order_acquire)) {
            return false;
        }
        _buffer[in] = std::move(elem);
        // Let the consumer know your change.
        _in.store(next_in, std::memory_order_release);
        return true;
    }

    bool pop(T &val) {
        std::size_t out = _out.load(std::memory_order_relaxed);
        std::size_t in = _in.load(std::memory_order_acquire);
        if(in == out) {
            return false;
        }
        val = std::move(_buffer[out]);
        _out.store(mod(out+1), std::memory_order_release);
        return true;
    }
};

static std::array data {998, 244, 353};
static Kfifo<int, 64> fifo;

void* producer(void*) {
    for(auto i : data) {
        fifo.push(i);
    }
    return nullptr;
}

void* consumer(void*) {
    int i {};
    fifo.pop(i);
    return nullptr;
}

// GenMC 不支持 std::thread 和 std::jthread
auto make_thread(auto func) {
    pthread_t tid;
    pthread_create(&tid, {}, func, {}) && (abort(), 1);
    return tid;
}

int main() {
    // 单生产者-单消费者的场合
    // 只存在 t1 和 t2 是合法的
    auto t1 = make_thread(producer);
    auto t2 = make_thread(consumer);

    // 添加 t3 使得示例成为错误的 MPSC
    // 添加 t4 使得示例进一步成为错误的 MPMC
    // auto t3 = make_thread(producer);
    // auto t4 = make_thread(consumer);

    // 要想保留 t3 和 t4 并且保持程序合法
    // 可以使用上面的 spinlock 实现
    // 为 t1/t3 和 t2/t4 单独上锁

    // ...
}
RC11 禁止的 relaxed ordering 行为
// relaxed.cpp
#include <atomic>
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>

constexpr auto link [[maybe_unused]] =
    "https://en.cppreference.com/w/cpp/atomic/memory_order#Relaxed_ordering";

////////////////////////////////////////////////// pthread wrappers

auto make_thread(auto func) {
    pthread_t tid;
    pthread_create(&tid, {}, func, {}) && (abort(), 1);
    return tid;
}

auto join_threads(auto ...threads) {
    (pthread_join(threads, {}), ...);
}

auto pthread_routine(auto f) /* requires lambda */
        requires requires { f.operator()(); } {
    return +[](void*) -> void* {
        return decltype(f)()(), nullptr;
    };
}

////////////////////////////////////////////////// pthread wrappers (END)

std::atomic<int> x {}, y {};
int r1 {}, r2 {};

auto thread1 = pthread_routine([] {
    r1 = y.load(std::memory_order_relaxed); // A
    x.store(r1, std::memory_order_relaxed); // B
});

auto thread2 = pthread_routine([] {
    r2 = x.load(std::memory_order_relaxed); // C
    y.store(42, std::memory_order_relaxed); // D
});

int main() {
    auto t1 = make_thread(thread1);
    auto t2 = make_thread(thread2);
    join_threads(t1, t2);

    // 执行 ./genmc -- -std=c++20 relaxed.cpp | grep -E "^\(.*\)$"
    // 没有 (42 42) 的可能性
    printf("(%d %d)\n", r1, r2);
}