虽然 std::simd 还没有合入 C++26(属于 Parallelism TS 2),但是这个库的封装确实合我心意,所以本文先用来当作 SIMD 练手工具了。需要声明一下我不会向量算法,就随便写写。

查找最值

#include <algorithm>
#include <array>
#include <print>
#include <ranges>
#include <random>
#include <chrono>
#include <tuple>
#include <cassert>
#include <experimental/simd>
#include <immintrin.h>

namespace stdx = std::experimental;
namespace stdv = std::views;
namespace stdr = std::ranges;

template <typename T, auto N>
constexpr auto simd_max_element(const std::array<T, N> &arr) {
    // std::native_simd 类型会根据 -march 挑选具体实现(native ABI)
    //
    // 注意 std::simd 类型默认使用不同的 ABI(不会跟随 -march 调整最佳实现)
    // 虽然 std::simd 同样是 std::native_simd 的别名
    // 但是 std::simd_abi 的挑选准则是 compatible ABI
    using simd_t = stdx::native_simd<T>;
    // SIMD 向量的构造函数有很多。如果使用标量 T 初始化,那会使得向量内所有的标量都广播为相同数值
    simd_t max_value = std::numeric_limits<T>::min();

    constexpr auto step = simd_t::size();
    constexpr auto tile = N / step;
    constexpr auto left = N % step;

    for(auto &batch : arr | stdv::stride(step) | stdv::take(tile)) {
        // 另一种构造函数,从地址载入连续多个标量的数值
        // 对齐方式也可以按照向量对齐
        simd_t temp(std::addressof(batch), stdx::element_aligned);
        // select 操作:where 表达式会根据 std::simd_mask(由 operator<(...) 生成)处理赋值操作符
        where(max_value < temp, max_value) = temp;
    }

    if constexpr (left) {
        auto left_view = arr | stdv::drop(tile * step);
        auto v = *stdr::max_element(left_view);
        if(stdx::none_of(max_value > v)) {
            return v;
        }
    }

    // SIMD 类型不提供迭代器,并且 operator[] 实际返回了代理类型
    // 因此想要 ranged-based `for` loop 是不讨好的
    //
    // 除非你实在想要,可以尝试:
    // stdv:: iota(0) | take(size()) | transform([](index) -> T { []... })
    //
    // 但是 SIMD 库已经提供了 std::reduce() 重载以及一些便利函数
    // 比如这里,可以 std::hmax() 返回向量当中的最大值(horizontal max)
    return stdx::hmax(max_value);
}

constexpr std::size_t MASS = 1e8+3;
std::array<int, MASS> massive;

// 算法无关的函数
auto initiate(auto &arr);
auto tick(auto f);

int main() {
    auto check = initiate(massive);
    auto [v, elapsed] = tick([&] { return simd_max_element(massive); });
    // auto [v, elapsed] = tick([&] { return *stdr::max_element(massive); });
    assert(v == check);
    std::println("max value: {}, elapsed: {}", v, elapsed);
}
算法无关的代码部分
// Generate random values and flush cachelines.
// Return a maximum value for assert().
auto initiate(auto &arr) {
    auto verifier = *begin(arr);
    std::default_random_engine generator;
    std::uniform_int_distribution<int> distribution(0, arr.size());
    for(auto &v : arr) {
        v = distribution(generator);
        verifier = std::max(v, verifier);
    }

    constexpr auto cacheline_elements = 64 / sizeof(arr[0]);
    auto flush_view = stdv::iota(arr.data())
        | stdv::stride(cacheline_elements)
        | stdv::take(arr.size() / cacheline_elements);
    for(auto addr : flush_view) {
        _mm_clflush(addr);
    }
    return verifier;
}

auto tick(auto f) {
    namespace stdc = std::chrono;
    auto start = stdc::steady_clock::now();
    auto v = f();
    auto end = stdc::steady_clock::now();
    auto elapsed = stdc::duration_cast<stdc::milliseconds>(end - start);
    return std::tuple(v, elapsed);
}

使用 clang++-18 -O3 -march=skylake -std=c++2b 编译:

  • SIMD 版本耗时 20 ms。
  • 标准库版本耗时 80 ms。

NOTES:

  • 我的机子(按摩殿·禅叁)不支持 AVX512,所以这里跑分实际使用的是 AVX2。
  • GCC 相当聪明,标准库实现和 SIMD 实现均为 20 ms。

双调排序

#include <algorithm>
#include <array>
#include <print>
#include <ranges>
#include <random>
#include <chrono>
#include <tuple>
#include <cassert>
#include <thread>
#include <vector>
#include <experimental/simd>
#include <immintrin.h>

namespace stdx = std::experimental;
namespace stdv = std::views;
namespace stdr = std::ranges;

namespace bitonic {

template <typename> struct use_simd {};

//////////////////////////////////////////////////////////// parallel

// A rough tuning option for amortizing thread overheads.
constexpr std::size_t default_scale_hint = 1 << 20;
// We don't have a good thread pool here, so let it be false.
constexpr bool enable_parallel_for = false;
constexpr bool enable_parallel(std::integral auto hint) {
    return hint >= default_scale_hint;
}

// Note: parallel(...) and parallel(...) are not parallel!
void parallel(std::size_t scale_hint, std::invocable auto job,
                                      std::invocable auto ...jobs) {
    if(!enable_parallel(scale_hint)) return job(), (jobs(), ...);
    std::array parallel_jobs {std::jthread{std::move(jobs)}...};
    job();
}

void parallel_for(auto view, auto func) {
    auto f = [func](auto maybe_subview) {
        for(auto &&v : maybe_subview) {
            func(std::forward<decltype(v)>(v));
        }
    };
    if(!enable_parallel_for || !enable_parallel(std::size(view))) {
        return f(view);
    }
    auto per_thread_view = view | stdv::chunk(default_scale_hint);
    std::vector<std::jthread> parallel_jobs;
    for(auto g : per_thread_view) {
        parallel_jobs.emplace_back(f, g);
    }
}

template <typename simd_t>
auto parallel_for(use_simd<simd_t>, auto view, auto func) {
    constexpr auto step = simd_t::size();
    const auto tile = std::size(view) / step;
    const auto consumed = step * tile;
    auto simdify = stdv::stride(step) | stdv::take(tile);
    parallel_for(view | simdify, std::move(func));
    return consumed;
}

//////////////////////////////////////////////////////////// parallel (end)
//////////////////////////////////////////////////////////// misc

constexpr struct {
    std::less<>    incr;
    std::greater<> decr;
} direction;

template <typename dir>
concept directional =
    std::is_same_v<dir, decltype(direction.incr)> ||
    std::is_same_v<dir, decltype(direction.decr)>;

void merge(auto range, directional auto dir);
void sort(auto range, directional auto dir);

auto cut(auto range) {
    auto pivot = std::begin(range) + std::size(range) / 2;
    auto lo = stdr::subrange(std::begin(range), pivot);
    auto hi = stdr::subrange(pivot, std::end(range));
    return std::tuple(lo, hi);
}

template <template<typename...> typename Tuple, typename T>
void perform(Tuple<T&, T&> bitonic_pair, directional auto dir) {
    auto &[lhs, rhs] = bitonic_pair;
    if(!dir(lhs, rhs)) {
        std::swap(lhs, rhs);
    }
}

template <typename simd_t, template<typename...> typename Tuple, typename T>
void perform(use_simd<simd_t>, Tuple<T&, T&> bitonic_pair, directional auto dir) {
    auto &[lhs, rhs] = bitonic_pair;
    simd_t x(std::addressof(lhs), stdx::element_aligned);
    simd_t y(std::addressof(rhs), stdx::element_aligned);
    // SIMD 库提供了非常方便的 minmax 操作
    // 返回 std::pair<> 类型的 [min, max] 向量
    if constexpr (dir(0, 1)) std::tie(x, y) = stdx::minmax(x, y);
    else                     std::tie(y, x) = stdx::minmax(x, y);
    x.copy_to(std::addressof(lhs), stdx::element_aligned);
    y.copy_to(std::addressof(rhs), stdx::element_aligned);
}

//////////////////////////////////////////////////////////// misc (end)
//////////////////////////////////////////////////////////// core

void sort(auto range, directional auto dir) {
    if(std::size(range) < 2) return;
    auto [lo, hi] = cut(range);
    parallel(std::size(range),
        [=]{ sort(lo, direction.incr); },
        [=]{ sort(hi, direction.decr); });
    merge(range, dir);
}

void merge(auto range, directional auto dir) {
    if(std::size(range) < 2) return;
    using T = stdr::range_value_t<decltype(range)>;
    using simd_t = stdx::native_simd<T>;

    auto [lo, hi] = cut(range);
    auto zip_view = stdv::zip(lo, hi);

    // All the comparisons can be done in parallel.
    auto consumed = parallel_for(use_simd<simd_t>(), zip_view,
        [=](auto tup) { perform(use_simd<simd_t>(), tup, dir); });

    stdr::for_each(zip_view | stdv::drop(consumed),
        [=](auto tup) { perform(tup, dir); });

    parallel(std::size(range),
        [=]{ merge(lo, dir); },
        [=]{ merge(hi, dir); });
}

} // namespace bitonic

void simd_sort(stdr::random_access_range auto &&range) {
    if(std::size(range) < 2) return;
    assert(std::has_single_bit(std::size(range)));
    bitonic::sort(range | stdv::all, bitonic::direction.incr);
}

//////////////////////////////////////////////////////////// core (end)

constexpr std::size_t MASS = 1 << 26;
std::array<int, MASS> massive;

// 算法无关的函数
void initiate(auto &arr);
auto tick(auto f);

int main() {
    initiate(massive);
    // bitonic sort 要求容器大小为 2 的幂
    static_assert(std::has_single_bit(std::size(massive)));
    auto elapsed = tick([&]{ simd_sort(massive); });
    // auto elapsed = tick([&] { stdr::sort(massive); });
    assert(std::ranges::is_sorted(massive));
    std::println("time: {}", elapsed);
}
算法无关的代码部分
// Generate random values and flush cachelines.
void initiate(auto &arr) {
    std::default_random_engine generator;
    std::uniform_int_distribution<int> distribution(0, arr.size());
    for(auto &v : arr) {
        v = distribution(generator);
    }

    constexpr auto cacheline_elements = 64 / sizeof(arr[0]);
    auto flush_view = stdv::iota(arr.data())
        | stdv::stride(cacheline_elements)
        | stdv::take(arr.size() / cacheline_elements);
    for(auto addr : flush_view) {
        _mm_clflush(addr);
    }
}

auto tick(auto f) {
    namespace stdc = std::chrono;
    auto start = stdc::steady_clock::now();
    f();
    auto end = stdc::steady_clock::now();
    auto elapsed = stdc::duration_cast<stdc::milliseconds>(end - start);
    return elapsed;
}

使用 clang++-18 -O3 -march=skylake -std=c++2b 编译:

  • SIMD 排序(bitonic sort)耗时 2815 ms。
  • 标准库排序(introspective sort)耗时 4972 ms。

NOTES:

  • 性能增益实际来自于双调排序的并行性(可以多线程排序),事实上不开启 SIMD 还能更快(2600 ms)。你也可以在这里获取非 SIMD 版本的并行双调排序算法实现。
  • GCC 的表现在这里很丢人:SIMD 版本耗时 3500 ms+。

单词统计

std::string_view frost = "Whose woods these are I think I know.\n"
                         "His house is in the village though;  \n"
                         "He will not see me stopping here     \n"
                         "To watch his woods fill up with snow.\n";

std::size_t std_nvda_word_count(std::string_view s) {
    if(!std::size(s)) return 0;
    auto isspace = [](auto c) { return c == ' ' || c == '\n'; };
    auto transform = [=](auto l, auto r) { return isspace(l) && !isspace(r); };
    return std::transform_reduce(std::begin(s), std::end(s)-1, std::begin(s)+1,
                                 std::size_t(!isspace(s.front())),
                                 std::plus<>(),
                                 transform);
}

上面是 NV 给出来的标准库非 SIMD 实现版本,看起来还挺 NV 的。后面会用来做对比。

1 = space, 0 = letter.
E = End of a word.

      E      E
11000011000001000... bit
00111100111110111    (~)
00011110011111011    <<1
      ^      ^       bit & (~bit)<<1
count = 2

个人想到的是这种位运算思路,应该是哪里见过的技巧。不过由于 std::simd_mask 不支持位移(srli/slli)操作,因此不能高效实现上面的算法。其实从作者的提案实现也可看出这种问题(紧急画饼中)。既然有提案实现这种先行版,那就先强上了。

#include <algorithm>
#include <print>
#include <ranges>
#include <random>
#include <chrono>
#include <cassert>
#include <string>
#include <iostream>
#include <bitset>

// 提案作者给出的先行实现
#include <vir/simd.h>
#include <vir/simd_bit.h>
#include <vir/simd_bitset.h>

// 大部分用法和 TS 版 std::simd 差不多
// 我们这里只用标准库未提供的 std::simd_mask to std::bitset 功能
namespace stdx = vir::stdx;
namespace stdv = std::views;
namespace stdr = std::ranges;

std::string_view frost = "Whose woods these are I think I know.\n"
                         "His house is in the village though;  \n"
                         "He will not see me stopping here     \n"
                         "To watch his woods fill up with snow.\n";

namespace detail {

auto isspace(auto /* char or simd_chars */ c) {
    return c == ' ' || c == '\n';
}

std::size_t std_nvda_word_count(stdr::view auto s) {
    if(!std::size(s)) return 0;
    return std::transform_reduce(std::begin(s), std::prev(std::end(s)),
                                 std::next(std::begin(s)),
                                 std::size_t(!isspace(s.front())),
                                 std::plus<>(),
                                 [](auto l, auto r) {
                                     return isspace(l) && !isspace(r);
                                 });
}

std::size_t simd_word_count(stdr::view auto s) {
    using simd_t = stdx::native_simd<char>;
    constexpr auto step = simd_t::size();
    const auto tile = std::size(s) / step;

    auto simdify = stdv::stride(step) | stdv::take(tile);
    std::size_t count = 0;

    //////////////////////////////////////////////////

    auto simd_view = s | simdify;
    auto simd_transform = [&](auto &to_simd) {
        simd_t temp (std::addressof(to_simd), stdx::element_aligned);
        auto mask = isspace(temp);
        // 勉强当作 mask 之间的直接位操作使用
        auto bits = vir::to_bitset(mask);
        return (bits & (~(bits) << 1)).count();
    };
    for(auto &&to_simd : simd_view) {
        count += simd_transform(to_simd);
    }

    //////////////////////////////////////////////////

    auto x = s | simdify | stdv::drop(1);
    auto y = s | stdv::drop(step - 1) | simdify;
    auto z = stdv::zip(x, y);
    for(auto [curr, prev] : z) {
        count += isspace(curr) && !isspace(prev);
    }

    //////////////////////////////////////////////////

    if(auto batch = step * tile; std::size(s) > batch) {
        auto left_view = s | stdv::drop(batch);
        auto prev_view = s | stdv::drop(batch - 1);
        count += std_nvda_word_count(left_view);
        count += isspace(left_view[0]) && !isspace(prev_view[0]);
    }
    return count;
}

} // namespace detail

template <typename T>
concept string_class = std::is_convertible_v<T, std::string_view>;

std::size_t std_nvda_word_count(string_class auto &&str) {
    return detail::std_nvda_word_count(str | stdv::all);
}

std::size_t simd_word_count(string_class auto &&str) {
    return detail::simd_word_count(str | stdv::all);
}

auto tick(auto f) {
    namespace stdc = std::chrono;
    auto start = stdc::steady_clock::now();
    auto v = f();
    auto end = stdc::steady_clock::now();
    auto elapsed = stdc::duration_cast<stdc::milliseconds>(end - start);
    return std::tuple(v, elapsed);
}

int main() {
    std::string text;
    for(std::string line; std::getline(std::cin, line);) {
        if(!text.empty()) text += ' ';
        text += line;
    }

    auto [count, elapsed] = tick([&] { return simd_word_count(text); });
    // auto [count, elapsed] = tick([&] { return std_nvda_word_count(text); });

    std::println("count: {}, time: {}", count, elapsed);
}
带权重的随机文本生成器
#include <iostream>
#include <vector>
#include <tuple>
#include <algorithm>
#include <cassert>
#include <cstddef>
#include <random>
#include <limits>
#include <cmath>
#include <numeric>
#include <bitset>
#include <cstdint>
#include <utility>
#include <cassert>
#include <ranges>

// 很久以前写的 alias method 算法,细节我也差不多忘了 orz
// 成员的命名取自维基百科的 alias method 算法描述
class Alias {
public:
    Alias(std::vector<double> probabilities) {
        precondition(probabilities);
        std::tie(_probs, _alias) = make_alias_table(std::move(probabilities));
    }

    /// @brief  Just simply make a random O(1) choice.
    /// @return Index.
    std::size_t make() {
        std::uniform_real_distribution dis01 {0.0, 1.0};
        double x = dis01(_gen);
        std::size_t i = _alias.size() * x; // round down
        double p = _alias.size() * x - i;
        return p < _probs[i] ? i : _alias[i];
    }

    /// @deprecated
    /// __Simple method__ to make a random O(1) choice.
    /// Use make() instead, which is faster (~10%) but harder to write correctly.
    std::size_t make(decltype(std::ignore)...) {
        std::uniform_int_distribution dis0N {0ul, _alias.size() - 1};
        std::uniform_real_distribution dis01 {0.0, 1.0};
        std::size_t i = dis0N(_gen);
        double p = dis01(_gen);
        return p < _probs[i] ? i : _alias[i];
    }

private:
    // O(n) construction.
    static
    auto make_alias_table(std::vector<double> probabilities)
    -> std::tuple< std::vector<double>, std::vector<std::size_t> >
    {
        const auto N = probabilities.size();
        for(auto &p : probabilities) p *= N;

        // K: alias table.
        // U: {index, probability} table.
        std::vector<std::size_t> K(N, N /*uninitialized*/);
        enum u_type {OVERFULL, FULL, UNDERFULL, UTYPE_MAX};
        std::vector<std::tuple<std::size_t, double>> U[UTYPE_MAX];
        auto get_type = [&](std::size_t i, double p) {
            u_type type = p > 1 ? OVERFULL : UNDERFULL;
            if(is_one(p)) [[unlikely]] {
                type = FULL;
                // Optional, but make less buggy code.
                // NOTE: FULL or OVERFULL->FULL actually has no alias index.
                K[i] = i;
            }
            return type;
        };

        for(std::size_t i = 0; i < probabilities.size(); ++i) {
            auto p = probabilities[i];
            u_type type = get_type(i, p);
            U[type].emplace_back(i, p);
        }

        while(!U[OVERFULL].empty() && !U[UNDERFULL].empty()) {
            /// Calculate.
            auto [over_i, over_p] = pop(U[OVERFULL]);
            auto [under_i, under_p] = pop(U[UNDERFULL]);
            over_p -= (1.0 - under_p);
            K[under_i] = over_i;

            /// Reinsert.
            U[FULL].emplace_back(under_i, under_p);
            u_type type = get_type(over_i, over_p);
            U[type].emplace_back(over_i, over_p);
        }

        /// I hate floating points.
        auto corner_case = [&](auto &ulist) {
            while(!ulist.empty()) {
                auto [i, p] = pop(ulist);
                assert(is_one(p));
                K[i] = i;
                U[FULL].emplace_back(i, p);
            }
        };

        corner_case(U[OVERFULL]);
        corner_case(U[UNDERFULL]);

        // Now they are all FULL.
        std::vector<double> full_u(N);
        for(auto &&[i, p] : U[FULL]) full_u[i] = p;
        return std::make_tuple(full_u, K);
    }

    // 1. size() > 0
    // 2. \sum probabilities == 1.0
    static
    void precondition(const std::vector<double> &probabilities) noexcept {
        assert(!probabilities.empty());
        [[maybe_unused]] double p_sum = 0;
        for(auto p : probabilities) p_sum += p;
        assert(is_one(p_sum));
    }

    static
    bool is_one(double x) noexcept {
        constexpr auto EPS = std::numeric_limits<double>::epsilon();
        return fabs(x - 1) < EPS;
    }

    template <typename T> static
    T pop(std::vector<T> &ulist) {
        auto elem = std::move(ulist.back());
        ulist.pop_back();
        return elem;
    }

private:
    std::random_device _rd;
    std::mt19937 _gen{_rd()};

    std::vector<double> _probs;
    std::vector<std::size_t> _alias;
};

int main() {
    std::vector letters_length      {10, 20, 30, 40};
    std::vector letters_probability {.1, .2, .3, .4};

    std::vector spaces_length      { 1,  2,  4,  8};
    std::vector spaces_probability {.1, .2, .3, .4};

    std::vector space_categories_type        {' ', '\n'};
    std::vector space_categories_probability {0.8,  0.2};

    constexpr std::size_t total_length = 1e9;

    Alias letters_alias(letters_probability);
    Alias spaces_alias(spaces_probability);
    Alias spaces_categories_alias(space_categories_probability);

    std::string text;
    std::size_t answer = 0;

    // For random text.
    std::default_random_engine generator;
    std::uniform_int_distribution<char> distribution('a', 'z');

    for(std::size_t length = 0; length < total_length;) {
        auto letters_index = letters_alias.make();
        namespace stdv = std::views;
        for(auto _ : stdv::iota(0, letters_length[letters_index])) {
            text += distribution(generator);
        }
        answer++;
        auto spaces_index = spaces_alias.make();
        auto space_categories_index = spaces_categories_alias.make();
        for(auto _ : stdv::iota(0, spaces_length[spaces_index])) {
            text += space_categories_type[space_categories_index];
        }
        length += letters_length[letters_index]
                + spaces_length[spaces_index];
    }

    // 使用管道时 std::cerr 仍输出到终端,用于对比答案
    std::cerr << answer << std::endl;
    std::cout << text << std::endl;
}

使用 g++-14 -O3 -march=skylake -std=c++2b 编译,以生成器提供的 1e9 文本测试:

  • SIMD 实现耗时 149 ms。
  • 标准库实现耗时 776 ms。

NOTE:Clang 在这里的表现特别丢人,SIMD 居然要 2000 ms+。

瑞士甜表

#include <array>
#include <ranges>
#include <tuple>
#include <algorithm>
#include <utility>
#include <cassert>
#include <chrono>
#include <concepts>
#include <type_traits>
#include <random>
#include <print>

#include <vir/simd.h>
#include <vir/simd_bit.h>
#include <vir/simd_bitset.h>

namespace stdx = vir::stdx;;
namespace stdr = std::ranges;
namespace stdv = std::views;

using byte = signed char;
using xbyte = stdx::native_simd<byte>;

// Special flags MUST start with 0b1_______.
constexpr byte empty_flag      = 0b10000000;
constexpr byte deleted_flag    = 0b11000000;
constexpr auto xwidth = xbyte::size();

template <std::size_t N>
concept swiss_like = N > 0 && N % xwidth == 0;
template <typename K, typename V>
concept simplified_toy = std::is_trivially_copyable_v<K>
                      && std::is_trivially_copyable_v<V>;

// An extreme toy implementation of Swiss table.
// This version simplifies almost everything but still utilize the SIMD techniques.
//
// {K, V, N} = Key, Value, Capacity.
template <typename K, typename V, std::size_t N>
    requires swiss_like<N> && simplified_toy<K, V>
struct Sweet_table: protected std::tuple<std::hash<K>, std::equal_to<K>> {
    using T = std::pair<K, V>;
    using H = std::hash<K>;
    using E = std::equal_to<K>;
    using Base = std::tuple<H, E>;

    Sweet_table() { for(auto &cb : ctrls) stdr::fill(cb, empty_flag); }

    auto hash(const auto &key) {
        return std::get<0>(static_cast<Base&>(*this))(key);
    }
    auto equal(const auto &key1, const auto &key2) {
        return std::get<1>(static_cast<Base&>(*this))(key1, key2);
    }

    ////////////////////////////////////////////////////////////

    using g_ctrl_t = std::array<byte, xwidth>;
    using g_slot_t = std::array<T, xwidth>;

    inline static constexpr
    std::size_t group_size() { return N / xwidth; }

    struct Group {
        auto match(byte h2) {
            return vir::to_bitset(temp == h2);
        }

        auto empty() {
            // std::simd 做不到 absl 库的 _mm_movemask_epi8(_mm_sign_epi8(ctrl, ctrl)) 操作
            // 提案实现没有提供对应的 intrinsics 指令映射
            return vir::to_bitset(temp == empty_flag);
        }

        auto empty_deleted() {
            static_assert(empty_flag <= deleted_flag);
            return vir::to_bitset(temp <= deleted_flag);
        }

        g_ctrl_t &ctrls;
        g_slot_t &slots;
        xbyte temp{ctrls.data(), stdx::vector_aligned};
    };

    auto group_view(auto offset) {
        return stdv::iota(offset / xwidth)
             | stdv::transform([this](auto i) { return i % group_size(); })
             | stdv::transform([this](auto i) { return Group{ctrls[i], slots[i]}; })
             | stdv::take(group_size());
    }

    ////////////////////////////////////////////////////////////

    struct seq {
        int operator*() { return std::countr_zero(x); }
        seq operator++(auto...) { return {x &= x-1}; }
        seq begin() { return {x}; }
        seq end() { return {0}; }
        auto operator<=>(const seq &) const=default;
        unsigned long long x;
    };

    template <auto X>
    auto bitseq(const std::bitset<X> &bits) { return seq{bits.to_ullong()}; }

    auto salt() { return std::bit_cast<std::ptrdiff_t>(ctrls.data()) >> 12; }
    auto H1(std::size_t hashed) { return hashed ^ salt(); }
    auto H2(std::size_t hashed) { return hashed & 0x7f; }

    ////////////////////////////////////////////////////////////

    // Don't blame me, this name was taken from std::map.
    bool insert_or_assign(auto &&key, auto &&...args) {
        auto hashed = hash(key);
        auto h1 = H1(hashed);
        auto h2 = H2(hashed);

        for(auto group : group_view(h1 % N)) {
            for(auto index : bitseq(group.match(h2))) {
                auto &&[old_key, old_value] = group.slots[index];
                if(equal(old_key, key)) {
                    old_value = V{std::forward<decltype(args)>(args)...};
                    return true;
                }
            }
            for(auto index : bitseq(group.empty_deleted())) {
                group.ctrls[index] = h2;
                group.slots[index] = T(K{std::forward<decltype(key)>(key)},
                                       V{std::forward<decltype(args)>(args)...});
                return false;
            }
        }

        // This is a fixed-size table.
        // Otherwise you need to resize().
        std::unreachable();
    }

    std::optional<T> find(const auto &key) {
        auto hashed = hash(key);
        auto h1 = H1(hashed);
        auto h2 = H2(hashed);

        for(auto group : group_view(h1 % N)) {
            for(auto index : bitseq(group.match(h2))) {
                auto &&[old_key, _] = group.slots[index];
                if(equal(old_key, key)) {
                    return group.slots.begin()[index];
                }
            }
            if(group.empty().any()) break;
        }
        return {};
    }

    // erase() is simple, just find then mark them DELETED.

    alignas(64) std::array<g_ctrl_t, group_size()> ctrls;
    alignas(64) std::array<g_slot_t, group_size()> slots;
};

// 算法无关的函数
auto initiate(auto &first, auto &next);
auto tick(auto f, auto &map);

constexpr std::size_t MAXN = 1 << 25;
constexpr std::size_t WORKLOAD = MAXN / 2;

auto first_input_data = std::array<int, WORKLOAD>{};
auto next_input_data = std::array<std::tuple<int /* old */, int /* new */>,
                                  std::max(8zu, WORKLOAD >> 12)>{};

auto sweet_table = Sweet_table<int, int, MAXN>{};
auto std_umap = std::unordered_map<int, int>(MAXN);

int main() {
    auto check = initiate(first_input_data, next_input_data);
    auto f = [](auto &map) {
        std::int64_t x = 0;
        for(auto i : first_input_data) {
            map.insert_or_assign(i, i);
        }
        for(auto i : first_input_data) {
            auto [k, v] = *map.find(i);
            x += v;
        }
        for(auto [o, n] : next_input_data) {
            map.insert_or_assign(o, n);
            auto [k, v] = *map.find(o);
            x += v;
        }
        return x;
    };

    auto [x, e] = tick(f, sweet_table);
    // auto [x, e] = tick(f, std_umap);

    assert(x == check);

    std::println("{}", e);
}
算法无关的代码部分
auto initiate(auto &first, auto &next) {
    std::int64_t result = 0;
    std::default_random_engine generator;
    std::uniform_int_distribution<int> distribution(0, std::numeric_limits<int>::max() / 64);
    for(auto &v : first) {
        v = distribution(generator);
        result += v;
    }

    for(auto &&[old_value, overwritten] : stdv::zip(first, next)) {
        auto new_value = distribution(generator);
        overwritten = std::tuple(old_value, new_value);
        result += new_value;
    }

    auto flush_view = [=](auto &arr) {
        constexpr auto cacheline_elements = 64 / sizeof(arr[0]);
        return stdv::iota(arr.data())
        | stdv::stride(cacheline_elements)
        | stdv::take(arr.size() / cacheline_elements);
    };
    for(auto addr : flush_view(first)) _mm_clflush(addr);
    for(auto addr : flush_view(next)) _mm_clflush(addr);
    return result;
}

auto tick(auto f, auto &map) {
    namespace stdc = std::chrono;
    auto start = stdc::steady_clock::now();
    auto v = f(map);
    auto end = stdc::steady_clock::now();
    auto elapsed = stdc::duration_cast<stdc::milliseconds>(end - start);
    return std::tuple(v, elapsed);
}

之前看 V 站有人做了一个针对小数据的 Swiss table。我也挺感兴趣的,顺手搞一个山寨版(设计偷懒 + 功能残缺,所以叫 Sweet table)。使用方面基本算是故技重施了。其实这里 std::simd 是非常低效的,它做不到一些 signbit 操作,并且 perf 发现 SIMD 和 std::bitset 操作相当的热(不排除我写得差劲),除了跑得比标准库快以外就没啥优势了。

使用 g++-14 -O3 -march=skylake -std=c++2b 编译:

  • std::unordered_map 实现耗时:3866 ms。
  • sweet_table + std::simd 实现耗时:1725 ms。

TBC

虽说本文只关注 std::simd 的使用,但是目前就连 API 都是处于缺失状态,感觉也做不了什么事情。后续不定期更新。