虽然 std::simd
还没有合入 C++26(属于 Parallelism TS 2),但是这个库的封装确实合我心意,所以本文先用来当作 SIMD 练手工具了。需要声明一下我不会向量算法,就随便写写。
查找最值
#include <algorithm>
#include <array>
#include <print>
#include <ranges>
#include <random>
#include <chrono>
#include <tuple>
#include <cassert>
#include <experimental/simd>
#include <immintrin.h>
namespace stdx = std::experimental;
namespace stdv = std::views;
namespace stdr = std::ranges;
template <typename T, auto N>
constexpr auto simd_max_element(const std::array<T, N> &arr) {
// std::native_simd 类型会根据 -march 挑选具体实现(native ABI)
//
// 注意 std::simd 类型默认使用不同的 ABI(不会跟随 -march 调整最佳实现)
// 虽然 std::simd 同样是 std::native_simd 的别名
// 但是 std::simd_abi 的挑选准则是 compatible ABI
using simd_t = stdx::native_simd<T>;
// SIMD 向量的构造函数有很多。如果使用标量 T 初始化,那会使得向量内所有的标量都广播为相同数值
simd_t max_value = std::numeric_limits<T>::min();
constexpr auto step = simd_t::size();
constexpr auto tile = N / step;
constexpr auto left = N % step;
for(auto &batch : arr | stdv::stride(step) | stdv::take(tile)) {
// 另一种构造函数,从地址载入连续多个标量的数值
// 对齐方式也可以按照向量对齐
simd_t temp(std::addressof(batch), stdx::element_aligned);
// select 操作:where 表达式会根据 std::simd_mask(由 operator<(...) 生成)处理赋值操作符
where(max_value < temp, max_value) = temp;
}
if constexpr (left) {
auto left_view = arr | stdv::drop(tile * step);
auto v = *stdr::max_element(left_view);
if(stdx::none_of(max_value > v)) {
return v;
}
}
// SIMD 类型不提供迭代器,并且 operator[] 实际返回了代理类型
// 因此想要 ranged-based `for` loop 是不讨好的
//
// 除非你实在想要,可以尝试:
// stdv:: iota(0) | take(size()) | transform([](index) -> T { []... })
//
// 但是 SIMD 库已经提供了 std::reduce() 重载以及一些便利函数
// 比如这里,可以 std::hmax() 返回向量当中的最大值(horizontal max)
return stdx::hmax(max_value);
}
constexpr std::size_t MASS = 1e8+3;
std::array<int, MASS> massive;
// 算法无关的函数
auto initiate(auto &arr);
auto tick(auto f);
int main() {
auto check = initiate(massive);
auto [v, elapsed] = tick([&] { return simd_max_element(massive); });
// auto [v, elapsed] = tick([&] { return *stdr::max_element(massive); });
assert(v == check);
std::println("max value: {}, elapsed: {}", v, elapsed);
}
使用 clang++-18 -O3 -march=skylake -std=c++2b
编译:
- SIMD 版本耗时 20 ms。
- 标准库版本耗时 80 ms。
NOTES:
- 我的机子(按摩殿·禅叁)不支持 AVX512,所以这里跑分实际使用的是 AVX2。
- GCC 相当聪明,标准库实现和 SIMD 实现均为 20 ms。
双调排序
#include <algorithm>
#include <array>
#include <print>
#include <ranges>
#include <random>
#include <chrono>
#include <tuple>
#include <cassert>
#include <thread>
#include <vector>
#include <experimental/simd>
#include <immintrin.h>
namespace stdx = std::experimental;
namespace stdv = std::views;
namespace stdr = std::ranges;
namespace bitonic {
template <typename> struct use_simd {};
//////////////////////////////////////////////////////////// parallel
// A rough tuning option for amortizing thread overheads.
constexpr std::size_t default_scale_hint = 1 << 20;
// We don't have a good thread pool here, so let it be false.
constexpr bool enable_parallel_for = false;
constexpr bool enable_parallel(std::integral auto hint) {
return hint >= default_scale_hint;
}
// Note: parallel(...) and parallel(...) are not parallel!
void parallel(std::size_t scale_hint, std::invocable auto job,
std::invocable auto ...jobs) {
if(!enable_parallel(scale_hint)) return job(), (jobs(), ...);
std::array parallel_jobs {std::jthread{std::move(jobs)}...};
job();
}
void parallel_for(auto view, auto func) {
auto f = [func](auto maybe_subview) {
for(auto &&v : maybe_subview) {
func(std::forward<decltype(v)>(v));
}
};
if(!enable_parallel_for || !enable_parallel(std::size(view))) {
return f(view);
}
auto per_thread_view = view | stdv::chunk(default_scale_hint);
std::vector<std::jthread> parallel_jobs;
for(auto g : per_thread_view) {
parallel_jobs.emplace_back(f, g);
}
}
template <typename simd_t>
auto parallel_for(use_simd<simd_t>, auto view, auto func) {
constexpr auto step = simd_t::size();
const auto tile = std::size(view) / step;
const auto consumed = step * tile;
auto simdify = stdv::stride(step) | stdv::take(tile);
parallel_for(view | simdify, std::move(func));
return consumed;
}
//////////////////////////////////////////////////////////// parallel (end)
//////////////////////////////////////////////////////////// misc
constexpr struct {
std::less<> incr;
std::greater<> decr;
} direction;
template <typename dir>
concept directional =
std::is_same_v<dir, decltype(direction.incr)> ||
std::is_same_v<dir, decltype(direction.decr)>;
void merge(auto range, directional auto dir);
void sort(auto range, directional auto dir);
auto cut(auto range) {
auto pivot = std::begin(range) + std::size(range) / 2;
auto lo = stdr::subrange(std::begin(range), pivot);
auto hi = stdr::subrange(pivot, std::end(range));
return std::tuple(lo, hi);
}
template <template<typename...> typename Tuple, typename T>
void perform(Tuple<T&, T&> bitonic_pair, directional auto dir) {
auto &[lhs, rhs] = bitonic_pair;
if(!dir(lhs, rhs)) {
std::swap(lhs, rhs);
}
}
template <typename simd_t, template<typename...> typename Tuple, typename T>
void perform(use_simd<simd_t>, Tuple<T&, T&> bitonic_pair, directional auto dir) {
auto &[lhs, rhs] = bitonic_pair;
simd_t x(std::addressof(lhs), stdx::element_aligned);
simd_t y(std::addressof(rhs), stdx::element_aligned);
// SIMD 库提供了非常方便的 minmax 操作
// 返回 std::pair<> 类型的 [min, max] 向量
if constexpr (dir(0, 1)) std::tie(x, y) = stdx::minmax(x, y);
else std::tie(y, x) = stdx::minmax(x, y);
x.copy_to(std::addressof(lhs), stdx::element_aligned);
y.copy_to(std::addressof(rhs), stdx::element_aligned);
}
//////////////////////////////////////////////////////////// misc (end)
//////////////////////////////////////////////////////////// core
void sort(auto range, directional auto dir) {
if(std::size(range) < 2) return;
auto [lo, hi] = cut(range);
parallel(std::size(range),
[=]{ sort(lo, direction.incr); },
[=]{ sort(hi, direction.decr); });
merge(range, dir);
}
void merge(auto range, directional auto dir) {
if(std::size(range) < 2) return;
using T = stdr::range_value_t<decltype(range)>;
using simd_t = stdx::native_simd<T>;
auto [lo, hi] = cut(range);
auto zip_view = stdv::zip(lo, hi);
// All the comparisons can be done in parallel.
auto consumed = parallel_for(use_simd<simd_t>(), zip_view,
[=](auto tup) { perform(use_simd<simd_t>(), tup, dir); });
stdr::for_each(zip_view | stdv::drop(consumed),
[=](auto tup) { perform(tup, dir); });
parallel(std::size(range),
[=]{ merge(lo, dir); },
[=]{ merge(hi, dir); });
}
} // namespace bitonic
void simd_sort(stdr::random_access_range auto &&range) {
if(std::size(range) < 2) return;
assert(std::has_single_bit(std::size(range)));
bitonic::sort(range | stdv::all, bitonic::direction.incr);
}
//////////////////////////////////////////////////////////// core (end)
constexpr std::size_t MASS = 1 << 26;
std::array<int, MASS> massive;
// 算法无关的函数
void initiate(auto &arr);
auto tick(auto f);
int main() {
initiate(massive);
// bitonic sort 要求容器大小为 2 的幂
static_assert(std::has_single_bit(std::size(massive)));
auto elapsed = tick([&]{ simd_sort(massive); });
// auto elapsed = tick([&] { stdr::sort(massive); });
assert(std::ranges::is_sorted(massive));
std::println("time: {}", elapsed);
}
使用 clang++-18 -O3 -march=skylake -std=c++2b
编译:
- SIMD 排序(bitonic sort)耗时 2815 ms。
- 标准库排序(introspective sort)耗时 4972 ms。
NOTES:
- 性能增益实际来自于双调排序的并行性(可以多线程排序),事实上不开启 SIMD 还能更快(2600 ms)。你也可以在这里获取非 SIMD 版本的并行双调排序算法实现。
- GCC 的表现在这里很丢人:SIMD 版本耗时 3500 ms+。
单词统计
std::string_view frost = "Whose woods these are I think I know.\n"
"His house is in the village though; \n"
"He will not see me stopping here \n"
"To watch his woods fill up with snow.\n";
std::size_t std_nvda_word_count(std::string_view s) {
if(!std::size(s)) return 0;
auto isspace = [](auto c) { return c == ' ' || c == '\n'; };
auto transform = [=](auto l, auto r) { return isspace(l) && !isspace(r); };
return std::transform_reduce(std::begin(s), std::end(s)-1, std::begin(s)+1,
std::size_t(!isspace(s.front())),
std::plus<>(),
transform);
}
上面是 NV 给出来的标准库非 SIMD 实现版本,看起来还挺 NV 的。后面会用来做对比。
1 = space, 0 = letter.
E = End of a word.
E E
11000011000001000... bit
00111100111110111 (~)
00011110011111011 <<1
^ ^ bit & (~bit)<<1
count = 2
个人想到的是这种位运算思路,应该是哪里见过的技巧。不过由于 std::simd_mask
不支持位移(srli/slli
)操作,因此不能高效实现上面的算法。其实从作者的提案实现也可看出这种问题(紧急画饼中)。既然有提案实现这种先行版,那就先强上了。
#include <algorithm>
#include <print>
#include <ranges>
#include <random>
#include <chrono>
#include <cassert>
#include <string>
#include <iostream>
#include <bitset>
// 提案作者给出的先行实现
#include <vir/simd.h>
#include <vir/simd_bit.h>
#include <vir/simd_bitset.h>
// 大部分用法和 TS 版 std::simd 差不多
// 我们这里只用标准库未提供的 std::simd_mask to std::bitset 功能
namespace stdx = vir::stdx;
namespace stdv = std::views;
namespace stdr = std::ranges;
std::string_view frost = "Whose woods these are I think I know.\n"
"His house is in the village though; \n"
"He will not see me stopping here \n"
"To watch his woods fill up with snow.\n";
namespace detail {
auto isspace(auto /* char or simd_chars */ c) {
return c == ' ' || c == '\n';
}
std::size_t std_nvda_word_count(stdr::view auto s) {
if(!std::size(s)) return 0;
return std::transform_reduce(std::begin(s), std::prev(std::end(s)),
std::next(std::begin(s)),
std::size_t(!isspace(s.front())),
std::plus<>(),
[](auto l, auto r) {
return isspace(l) && !isspace(r);
});
}
std::size_t simd_word_count(stdr::view auto s) {
using simd_t = stdx::native_simd<char>;
constexpr auto step = simd_t::size();
const auto tile = std::size(s) / step;
auto simdify = stdv::stride(step) | stdv::take(tile);
std::size_t count = 0;
//////////////////////////////////////////////////
auto simd_view = s | simdify;
auto simd_transform = [&](auto &to_simd) {
simd_t temp (std::addressof(to_simd), stdx::element_aligned);
auto mask = isspace(temp);
// 勉强当作 mask 之间的直接位操作使用
auto bits = vir::to_bitset(mask);
return (bits & (~(bits) << 1)).count();
};
for(auto &&to_simd : simd_view) {
count += simd_transform(to_simd);
}
//////////////////////////////////////////////////
auto x = s | simdify | stdv::drop(1);
auto y = s | stdv::drop(step - 1) | simdify;
auto z = stdv::zip(x, y);
for(auto [curr, prev] : z) {
count += isspace(curr) && !isspace(prev);
}
//////////////////////////////////////////////////
if(auto batch = step * tile; std::size(s) > batch) {
auto left_view = s | stdv::drop(batch);
auto prev_view = s | stdv::drop(batch - 1);
count += std_nvda_word_count(left_view);
count += isspace(left_view[0]) && !isspace(prev_view[0]);
}
return count;
}
} // namespace detail
template <typename T>
concept string_class = std::is_convertible_v<T, std::string_view>;
std::size_t std_nvda_word_count(string_class auto &&str) {
return detail::std_nvda_word_count(str | stdv::all);
}
std::size_t simd_word_count(string_class auto &&str) {
return detail::simd_word_count(str | stdv::all);
}
auto tick(auto f) {
namespace stdc = std::chrono;
auto start = stdc::steady_clock::now();
auto v = f();
auto end = stdc::steady_clock::now();
auto elapsed = stdc::duration_cast<stdc::milliseconds>(end - start);
return std::tuple(v, elapsed);
}
int main() {
std::string text;
for(std::string line; std::getline(std::cin, line);) {
if(!text.empty()) text += ' ';
text += line;
}
auto [count, elapsed] = tick([&] { return simd_word_count(text); });
// auto [count, elapsed] = tick([&] { return std_nvda_word_count(text); });
std::println("count: {}, time: {}", count, elapsed);
}
使用 g++-14 -O3 -march=skylake -std=c++2b
编译,以生成器提供的 1e9 文本测试:
- SIMD 实现耗时 149 ms。
- 标准库实现耗时 776 ms。
NOTE:Clang 在这里的表现特别丢人,SIMD 居然要 2000 ms+。
瑞士甜表
#include <array>
#include <ranges>
#include <tuple>
#include <algorithm>
#include <utility>
#include <cassert>
#include <chrono>
#include <concepts>
#include <type_traits>
#include <random>
#include <print>
#include <vir/simd.h>
#include <vir/simd_bit.h>
#include <vir/simd_bitset.h>
namespace stdx = vir::stdx;;
namespace stdr = std::ranges;
namespace stdv = std::views;
using byte = signed char;
using xbyte = stdx::native_simd<byte>;
// Special flags MUST start with 0b1_______.
constexpr byte empty_flag = 0b10000000;
constexpr byte deleted_flag = 0b11000000;
constexpr auto xwidth = xbyte::size();
template <std::size_t N>
concept swiss_like = N > 0 && N % xwidth == 0;
template <typename K, typename V>
concept simplified_toy = std::is_trivially_copyable_v<K>
&& std::is_trivially_copyable_v<V>;
// An extreme toy implementation of Swiss table.
// This version simplifies almost everything but still utilize the SIMD techniques.
//
// {K, V, N} = Key, Value, Capacity.
template <typename K, typename V, std::size_t N>
requires swiss_like<N> && simplified_toy<K, V>
struct Sweet_table: protected std::tuple<std::hash<K>, std::equal_to<K>> {
using T = std::pair<K, V>;
using H = std::hash<K>;
using E = std::equal_to<K>;
using Base = std::tuple<H, E>;
Sweet_table() { for(auto &cb : ctrls) stdr::fill(cb, empty_flag); }
auto hash(const auto &key) {
return std::get<0>(static_cast<Base&>(*this))(key);
}
auto equal(const auto &key1, const auto &key2) {
return std::get<1>(static_cast<Base&>(*this))(key1, key2);
}
////////////////////////////////////////////////////////////
using g_ctrl_t = std::array<byte, xwidth>;
using g_slot_t = std::array<T, xwidth>;
inline static constexpr
std::size_t group_size() { return N / xwidth; }
struct Group {
auto match(byte h2) {
return vir::to_bitset(temp == h2);
}
auto empty() {
// std::simd 做不到 absl 库的 _mm_movemask_epi8(_mm_sign_epi8(ctrl, ctrl)) 操作
// 提案实现没有提供对应的 intrinsics 指令映射
return vir::to_bitset(temp == empty_flag);
}
auto empty_deleted() {
static_assert(empty_flag <= deleted_flag);
return vir::to_bitset(temp <= deleted_flag);
}
g_ctrl_t &ctrls;
g_slot_t &slots;
xbyte temp{ctrls.data(), stdx::vector_aligned};
};
auto group_view(auto offset) {
return stdv::iota(offset / xwidth)
| stdv::transform([this](auto i) { return i % group_size(); })
| stdv::transform([this](auto i) { return Group{ctrls[i], slots[i]}; })
| stdv::take(group_size());
}
////////////////////////////////////////////////////////////
struct seq {
int operator*() { return std::countr_zero(x); }
seq operator++(auto...) { return {x &= x-1}; }
seq begin() { return {x}; }
seq end() { return {0}; }
auto operator<=>(const seq &) const=default;
unsigned long long x;
};
template <auto X>
auto bitseq(const std::bitset<X> &bits) { return seq{bits.to_ullong()}; }
auto salt() { return std::bit_cast<std::ptrdiff_t>(ctrls.data()) >> 12; }
auto H1(std::size_t hashed) { return hashed ^ salt(); }
auto H2(std::size_t hashed) { return hashed & 0x7f; }
////////////////////////////////////////////////////////////
// Don't blame me, this name was taken from std::map.
bool insert_or_assign(auto &&key, auto &&...args) {
auto hashed = hash(key);
auto h1 = H1(hashed);
auto h2 = H2(hashed);
for(auto group : group_view(h1 % N)) {
for(auto index : bitseq(group.match(h2))) {
auto &&[old_key, old_value] = group.slots[index];
if(equal(old_key, key)) {
old_value = V{std::forward<decltype(args)>(args)...};
return true;
}
}
for(auto index : bitseq(group.empty_deleted())) {
group.ctrls[index] = h2;
group.slots[index] = T(K{std::forward<decltype(key)>(key)},
V{std::forward<decltype(args)>(args)...});
return false;
}
}
// This is a fixed-size table.
// Otherwise you need to resize().
std::unreachable();
}
std::optional<T> find(const auto &key) {
auto hashed = hash(key);
auto h1 = H1(hashed);
auto h2 = H2(hashed);
for(auto group : group_view(h1 % N)) {
for(auto index : bitseq(group.match(h2))) {
auto &&[old_key, _] = group.slots[index];
if(equal(old_key, key)) {
return group.slots.begin()[index];
}
}
if(group.empty().any()) break;
}
return {};
}
// erase() is simple, just find then mark them DELETED.
alignas(64) std::array<g_ctrl_t, group_size()> ctrls;
alignas(64) std::array<g_slot_t, group_size()> slots;
};
// 算法无关的函数
auto initiate(auto &first, auto &next);
auto tick(auto f, auto &map);
constexpr std::size_t MAXN = 1 << 25;
constexpr std::size_t WORKLOAD = MAXN / 2;
auto first_input_data = std::array<int, WORKLOAD>{};
auto next_input_data = std::array<std::tuple<int /* old */, int /* new */>,
std::max(8zu, WORKLOAD >> 12)>{};
auto sweet_table = Sweet_table<int, int, MAXN>{};
auto std_umap = std::unordered_map<int, int>(MAXN);
int main() {
auto check = initiate(first_input_data, next_input_data);
auto f = [](auto &map) {
std::int64_t x = 0;
for(auto i : first_input_data) {
map.insert_or_assign(i, i);
}
for(auto i : first_input_data) {
auto [k, v] = *map.find(i);
x += v;
}
for(auto [o, n] : next_input_data) {
map.insert_or_assign(o, n);
auto [k, v] = *map.find(o);
x += v;
}
return x;
};
auto [x, e] = tick(f, sweet_table);
// auto [x, e] = tick(f, std_umap);
assert(x == check);
std::println("{}", e);
}
之前看 V 站有人做了一个针对小数据的 Swiss table。我也挺感兴趣的,顺手搞一个山寨版(设计偷懒 + 功能残缺,所以叫 Sweet table)。使用方面基本算是故技重施了。其实这里 std::simd
是非常低效的,它做不到一些 signbit 操作,并且 perf 发现 SIMD 和 std::bitset
操作相当的热(不排除我写得差劲),除了跑得比标准库快以外就没啥优势了。
使用 g++-14 -O3 -march=skylake -std=c++2b
编译:
std::unordered_map
实现耗时:3866 ms。sweet_table + std::simd
实现耗时:1725 ms。
TBC
虽说本文只关注 std::simd
的使用,但是目前就连 API 都是处于缺失状态,感觉也做不了什么事情。后续不定期更新。