太长不看
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
[&]<auto I>(this auto &&self) {
if constexpr (I < Last) {
f.template operator()<I>();
self.template operator()<I + 1>();
}
}.template operator()<First>();
};
简单结论:
- 使用 SIMD intrinsic 编程时,编译器 -O3 并不提供充分的 ILP 优化,因此需要显式声明。
- 使用
constexpr_for 即可做到显式 ILP。
- 在编译器 -O3 优化质量较低时,显式 ILP 可以掩盖性能问题。
- 注意 std::ranges 的抽象税。
下面会用 SIMD 求和的示例程序来说明。限于篇幅,本文不会解释 ILP 的基本概念。
SIMD sum
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
// 使用 C++23 的 range_adaptor_closure 进行快速定制适配器
// 从而方便构造 SIMD 的批量视图(非完整块会丢弃)
template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
constexpr auto operator()(auto &&view) const noexcept {
auto size = std::ranges::size(view);
return std::forward<decltype(view)>(view)
| std::views::stride(Lane) // 每 Lane 个元素取一个
| std::views::take(size / Lane); // 总共只取 size / Lane 个元素
}
};
template <auto i>
constexpr simdify_t<i> simdify;
int sum_avx2(std::ranges::range auto &&rng) {
constexpr auto lane = sizeof(__m256i) / sizeof(int);
__m256i partial_sum {};
auto simd_view = rng
| simdify<lane>;
for(auto &&simd_v : simd_view) {
auto addr = &simd_v;
auto loadu = _mm256_loadu_si256((__m256i*)addr);
partial_sum = _mm256_add_epi32(partial_sum, loadu);
}
auto scalar_view = rng
| std::views::drop(lane * std::ranges::size(simd_view));
int sum = std::ranges::fold_left(scalar_view, 0, std::plus());
int temp[lane];
// Clang 编译器会优化为寄存器内归约操作
_mm256_storeu_si256((__m256i*)std::ranges::data(temp), partial_sum);
sum = std::ranges::fold_left(temp, sum, std::plus());
return sum;
}
我们看一个 AVX2 版本的 SIMD 求和实现。非常直接,示例程序将输入的整个 range 拆分为 SIMD 视图和 scalar 视图并分别计算,计算完成后合并求和结果。
SIMD+ILP sum
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
// 显式 ILP 的工具
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
[&]<auto I>(this auto &&self) {
if constexpr (I < Last) {
f.template operator()<I>();
self.template operator()<I + 1>();
}
}.template operator()<First>();
};
template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
constexpr auto operator()(auto &&view) const noexcept {
auto size = std::ranges::size(view);
return std::forward<decltype(view)>(view)
| std::views::stride(Lane)
| std::views::take(size / Lane);
}
};
template <auto i>
constexpr simdify_t<i> simdify;
template <size_t ILP = 4>
int sum_avx2_ilp(std::ranges::range auto &&rng) {
constexpr auto lane = sizeof(__m256i) / sizeof(int);
constexpr auto bulk = lane * ILP;
__m256i partial_sum[ILP] {};
auto process_simd = [&](auto konstexpr_for, auto simd_view) {
for(auto &&simd_v : simd_view) {
// ILP 展开,最高一次并行处理 lane * ILP * 4 字节的数据
konstexpr_for([&, addr = &simd_v]<auto Index> {
auto &partial = partial_sum[Index];
auto loadu = _mm256_loadu_si256(
(__m256i*)(addr + Index * lane));
partial = _mm256_add_epi32(partial, loadu);
});
}
};
// SIMD+ILP
auto bulk_simd_view = rng
| simdify<lane>
| simdify<ILP>; // 这里就知道定制 simdify 的妙处了
process_simd(constexpr_for<0, ILP>, bulk_simd_view);
// SIMD
auto single_simd_view = rng
| std::views::drop(bulk * std::ranges::size(bulk_simd_view))
| simdify<lane>;
process_simd(constexpr_for<0, 1>, single_simd_view);
auto scalar_view = rng
| std::views::drop(bulk * std::ranges::size(bulk_simd_view))
| std::views::drop(lane * std::ranges::size(single_simd_view));
int sum = std::ranges::fold_left(scalar_view, 0, std::plus());
constexpr_for<1, ILP>([&]<size_t Index> {
partial_sum[0] = _mm256_add_epi32(partial_sum[0], partial_sum[Index]);
});
int temp[lane];
_mm256_storeu_si256((__m256i*)std::ranges::data(temp), partial_sum[0]);
sum = std::ranges::fold_left(temp, sum, std::plus());
return sum;
}
显式 ILP 的实现如上。在前面例子的基础上扩展,将输入 range 拆分为三个视图:bulk SIMD / SIMD / scalar,同样是只有无法完整覆盖的块才会被下一级的视图处理。
也就是说 ILP 版本的思路是扩展到流水线的宽度,而不是单个向量寄存器的宽度:处理器端口这么多,大于一总比等于一好。
动机和跑分
可是为什么需要特意写 constexpr_for,编译器不会优化吗?
与其理论,不如直接跑分。我们使用 Clang-20 编译器和 Google Benchmark,开启 -O3 优化和 -march=znver3 目标架构(面向我的 Zen 3 主机)。
代码折叠
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <type_traits>
#include <algorithm>
#include <ranges>
#include <array>
#include <random>
// ...此前的代码
// ----------------------------------------------------------------------------
// Google Benchmark
// ----------------------------------------------------------------------------
template <size_t Size>
const auto& generate_data() {
static const auto arr = [] {
std::array<int, Size> result;
std::mt19937 gen{std::random_device{}()};
// No overflow.
std::uniform_int_distribution dist(-10, 10);
std::ranges::generate(result, [&] { return dist(gen); });
return result;
} ();
return arr;
}
template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
const auto &rng = generate_data<Size>();
for(auto _ : state) {
auto res = func(rng);
benchmark::DoNotOptimize(res);
}
state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(int));
}
template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
auto name, auto func) {
(benchmark::RegisterBenchmark(
std::string(name) + "/" + std::to_string(Is),
[func](benchmark::State& state) {
BM_run<Is>(state, func);
}
), ...);
}
int main(int argc, char** argv) {
std::integer_sequence<size_t,
35,
350,
3502,
35023,
350234
> seq;
auto register_test = [seq](auto name, auto func) {
register_tests(seq, name, func);
};
register_test("BM_sum_avx2",
[](const auto &r) { return sum_avx2(r); });
register_test("BM_sum_avx2_ilp<4>",
[](const auto &r) { return sum_avx2_ilp<4>(r); });
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
return 0;
}
Run on (16 X 3193.93 MHz CPU s)
CPU Caches:
L1 Data 32 KiB (x8)
L1 Instruction 32 KiB (x8)
L2 Unified 512 KiB (x8)
L3 Unified 16384 KiB (x1)
Load Average: 0.29, 0.20, 0.11
------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
------------------------------------------------------------------------------------
BM_sum_avx2/35 1.00 ns 1.00 ns 690551079 bytes_per_second=130.031Gi/s
BM_sum_avx2/350 53.4 ns 53.4 ns 12520116 bytes_per_second=24.3976Gi/s
BM_sum_avx2/3502 621 ns 621 ns 1098424 bytes_per_second=21.007Gi/s
BM_sum_avx2/35023 6116 ns 6116 ns 109537 bytes_per_second=21.3326Gi/s
BM_sum_avx2/350234 63115 ns 63113 ns 10968 bytes_per_second=20.673Gi/s
BM_sum_avx2_ilp<4>/35 0.978 ns 0.978 ns 697817742 bytes_per_second=133.345Gi/s
BM_sum_avx2_ilp<4>/350 14.2 ns 14.2 ns 48383810 bytes_per_second=91.7483Gi/s
BM_sum_avx2_ilp<4>/3502 155 ns 155 ns 4560351 bytes_per_second=84.2754Gi/s
BM_sum_avx2_ilp<4>/35023 1457 ns 1457 ns 475022 bytes_per_second=89.5557Gi/s
BM_sum_avx2_ilp<4>/350234 17217 ns 17210 ns 41257 bytes_per_second=75.8117Gi/s
总之,我们确实得到了数倍的并行性能提升,也验证了 ILP 策略的有效性。
汇编结果
avx::test_medium(std::array<int, 3502ul>):
// ...
vpaddd (%rsi), %ymm0, %ymm0
// ...
avx_ilp::test_medium(std::array<int, 3502ul>):
// ...
vpaddd (%rcx,%rdx), %ymm1, %ymm1
vpaddd 32(%rcx,%rdx), %ymm2, %ymm2
vpaddd 64(%rcx,%rdx), %ymm0, %ymm0
vpaddd 96(%rcx,%rdx), %ymm3, %ymm3
// ...
我们可以在 godbolt 快速查验编译器的生成质量。一个很明确的结论是:SIMD intrinsic 确实不会在指令并行这方面有优化,至少现在(2025 年)不行。
NOTES:
- intrinsic 有一定程度的优化,但是下面会指出它相对常规指令的优化会怂很多。
- 能不能用
for 替代 constexpr_for?结论是能但是不可靠,Clang 在 -O2 级别就没法优化到位。
- 关于
constexpr_for 的实现方式有一点话题无关的讨论,见下方折叠。
constexpr_for 实现方式
// 一种更直白的写法
template <auto First, auto Last>
constexpr void constexpr_for(auto &&f) {
if constexpr (First < Last) {
f.template operator()<First>();
constexpr_for<First + 1, Last>(f);
}
}
其实 constexpr 可以写得更加直白,但是这样的实现是函数而不是函数对象。
// 如果这么写,结合函数版本的 constexpr_for……
auto process_simd = [&]<size_t Width>(auto simd_view) {
for(auto &&simd_v : simd_view) {
constexpr_for<0, Width>([]<auto> {
// ...
});
}
};
// 这个时候需要
process_simd.template operator()<ILP>(bulk_simd_view);
而又由于 process_simd 是一个 lambda,它的模板需要传入类型信息,如果不想调用用户不友好的 .template operator() 的话需要很曲折的做法,而改为传入一个带类型信息的对象很容易。
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
[&]<auto ...I>(std::index_sequence<I...>) {
(f.template operator()<First + I>(), ...);
}(std::make_index_sequence<Last - First>{});
};
另一个符合要求的做法也可以不使用 C++23 deducing this 特性。这种实现需要额外的头文件(现实中通常不需要)引入 std::index sequence,并且可读性我觉得不太好,怕被喷就没敢在文章用……当然经典的仿函数类就不用说了。
目前的实现缺陷是,要求的编译器版本比较高,比如 GCC 要开到 14 及以上版本。
也就是说,程序中的 __m256i partial_sum 是只能作为一个 ymm 寄存器去使用。而 constexpr_for 显式地以 ILP 的形式去展开,所以 ILP=4 时会明确使用 4 路并行的 ymm 寄存器。
基线差距
// 在 Google benchmark 注册一个新的测试
register_test("BM_sum_std_fold_left",
[](const auto &r) { return std::ranges::fold_left(r, 0, std::plus()); });
我们再以 std::ranges::fold_left 作为基线对比,它的实现非常朴素:libstdc++/ranges_algo.h。
--------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
--------------------------------------------------------------------------------------
BM_sum_avx2/35 0.985 ns 0.985 ns 694303937 bytes_per_second=132.394Gi/s
BM_sum_avx2/350 53.5 ns 53.5 ns 12573037 bytes_per_second=24.3865Gi/s
BM_sum_avx2/3502 612 ns 612 ns 1133219 bytes_per_second=21.3033Gi/s
BM_sum_avx2/35023 6022 ns 6022 ns 114100 bytes_per_second=21.6667Gi/s
BM_sum_avx2/350234 62198 ns 62195 ns 11085 bytes_per_second=20.9778Gi/s
BM_sum_avx2_ilp<4>/35 0.974 ns 0.974 ns 710236295 bytes_per_second=133.852Gi/s
BM_sum_avx2_ilp<4>/350 14.2 ns 14.2 ns 49325710 bytes_per_second=91.594Gi/s
BM_sum_avx2_ilp<4>/3502 142 ns 142 ns 4957734 bytes_per_second=92.1278Gi/s
BM_sum_avx2_ilp<4>/35023 1481 ns 1481 ns 467864 bytes_per_second=88.086Gi/s
BM_sum_avx2_ilp<4>/350234 17261 ns 17256 ns 40704 bytes_per_second=75.6079Gi/s
BM_sum_std_fold_left/35 0.988 ns 0.988 ns 691087391 bytes_per_second=131.98Gi/s
BM_sum_std_fold_left/350 8.63 ns 8.63 ns 78505005 bytes_per_second=151.164Gi/s
BM_sum_std_fold_left/3502 90.9 ns 90.9 ns 7649494 bytes_per_second=143.574Gi/s
BM_sum_std_fold_left/35023 1026 ns 1025 ns 679782 bytes_per_second=127.277Gi/s
BM_sum_std_fold_left/350234 15973 ns 15972 ns 48874 bytes_per_second=81.6897Gi/s
可以看出,fold_left 优化明显更强,我们有必要再看一眼汇编去确认它是怎么做的。
rng::test_medium(std::array<int, 3502ul>):
// ...
.LBB10_1:
vpaddd -992(%rcx,%rax,4), %ymm0, %ymm0
vpaddd -960(%rcx,%rax,4), %ymm1, %ymm1
vpaddd -928(%rcx,%rax,4), %ymm2, %ymm2
vpaddd -896(%rcx,%rax,4), %ymm3, %ymm3
vpaddd -864(%rcx,%rax,4), %ymm0, %ymm0
vpaddd -832(%rcx,%rax,4), %ymm1, %ymm1
vpaddd -800(%rcx,%rax,4), %ymm2, %ymm2
vpaddd -768(%rcx,%rax,4), %ymm3, %ymm3
vpaddd -736(%rcx,%rax,4), %ymm0, %ymm0
vpaddd -704(%rcx,%rax,4), %ymm1, %ymm1
vpaddd -672(%rcx,%rax,4), %ymm2, %ymm2
vpaddd -640(%rcx,%rax,4), %ymm3, %ymm3
vpaddd -608(%rcx,%rax,4), %ymm0, %ymm0
vpaddd -544(%rcx,%rax,4), %ymm2, %ymm4
vpaddd -576(%rcx,%rax,4), %ymm1, %ymm1
vpaddd -512(%rcx,%rax,4), %ymm3, %ymm5
vpaddd -480(%rcx,%rax,4), %ymm0, %ymm2
vpaddd -448(%rcx,%rax,4), %ymm1, %ymm3
vpaddd -416(%rcx,%rax,4), %ymm4, %ymm0
vpaddd -384(%rcx,%rax,4), %ymm5, %ymm1
cmpq $3577, %rax
je .LBB10_3
vpaddd -352(%rcx,%rax,4), %ymm2, %ymm2
vpaddd -320(%rcx,%rax,4), %ymm3, %ymm3
vpaddd -288(%rcx,%rax,4), %ymm0, %ymm0
vpaddd -256(%rcx,%rax,4), %ymm1, %ymm1
vpaddd -224(%rcx,%rax,4), %ymm2, %ymm2
vpaddd -160(%rcx,%rax,4), %ymm0, %ymm4
vpaddd -192(%rcx,%rax,4), %ymm3, %ymm3
vpaddd -128(%rcx,%rax,4), %ymm1, %ymm5
vpaddd -96(%rcx,%rax,4), %ymm2, %ymm0
vpaddd -64(%rcx,%rax,4), %ymm3, %ymm1
vpaddd -32(%rcx,%rax,4), %ymm4, %ymm2
vpaddd (%rcx,%rax,4), %ymm5, %ymm3
addq $256, %rax
jmp .LBB10_1
// ...
其实 fold_left 不仅会并行使用多路寄存器,而且还进一步使用(非常暴力的)循环展开。
NOTES:
- 为什么不用
std::reduce 而用 stdr::fold_left?因为前者的跑分表现非常菜。
- 如果是
std::reduce+std::execution::unseq 的话,表现和 fold_left 类似。
- 这里说的是 Clang 搭配 libstdc++ 的情况;而 libc++ 表现没有劣化,但是仍不强于
fold_left。
目前 libc++ 场景仍需要启用 -fexperimental-library 才能完成测试。
- 目前 libc++ 在 ranges 支持上不够完善,不足以完成所有测试。
进一步优化
现在以 fold_left 为目标改进原有的程序。
auto process_simd = [&](auto konstexpr_for, auto simd_view) {
#pragma clang loop unroll_count(4)
for(auto &&simd_v : simd_view) {
// ...
}
}
首先能想到的是编译器特定的 unroll 选项,结论是对 ranges 起负作用。感兴趣可以自行看下汇编,编译器会生成一堆非常混乱的指令。
// ILP 版本类似,但是问题更加严重
avx::test_medium(std::array<int, 3502ul>):
// ...
.LBB1_1:
vpaddd (%rsi), %ymm0, %ymm0
movq %rax, %rdi
subq %rsi, %rdi
leaq 32(%rsi), %r8
sarq $2, %rdi
addq $-9, %rdi
cmpq $-8, %rdi
cmovaeq %rax, %r8
cmpq %rsi, %rax
cmovneq %r8, %rsi
cmpq %rdx, %rsi
jne .LBB1_1
进一步调查 AVX/ILP 版本的汇编,发现另一个问题是:它总是在计算莫名其妙的边界,一个合理推测是 ranges 本身的编译器优化不到位。也就是说哪怕是 ILP 版本也是有大量无用指令的。一个简单的解决方案是使用最朴素的 for 循环,不交抽象税。
template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
constexpr auto operator()(auto &&r) const noexcept {
auto v = std::forward<decltype(r)>(r) | std::views::all;
auto n = std::ranges::size(v) / Lane;
auto i = std::views::iota(size_t{0}, n);
auto f = [v](auto index) -> decltype(auto) { return v[index * Lane]; };
return std::views::transform(i, f);
}
};
template <auto i>
constexpr simdify_t<i> simdify;
当然个人原因我不打算放弃 ranges,重写一份让编译器能更加优化友好的版本(DoD 风格?)。这样我们程序的核心算法完全不需要修改。
avx::test_medium(std::array<int, 3502ul>):
movl $704, %eax
leaq 8(%rsp), %rcx
vpxor %xmm0, %xmm0, %xmm0
.LBB1_1:
vpaddd -704(%rcx,%rax), %ymm0, %ymm0
vpaddd -672(%rcx,%rax), %ymm0, %ymm0
vpaddd -640(%rcx,%rax), %ymm0, %ymm0
vpaddd -608(%rcx,%rax), %ymm0, %ymm0
vpaddd -576(%rcx,%rax), %ymm0, %ymm0
vpaddd -544(%rcx,%rax), %ymm0, %ymm0
vpaddd -512(%rcx,%rax), %ymm0, %ymm0
vpaddd -480(%rcx,%rax), %ymm0, %ymm0
vpaddd -448(%rcx,%rax), %ymm0, %ymm0
vpaddd -416(%rcx,%rax), %ymm0, %ymm0
vpaddd -384(%rcx,%rax), %ymm0, %ymm0
vpaddd -352(%rcx,%rax), %ymm0, %ymm0
vpaddd -320(%rcx,%rax), %ymm0, %ymm0
vpaddd -288(%rcx,%rax), %ymm0, %ymm0
vpaddd -256(%rcx,%rax), %ymm0, %ymm0
vpaddd -224(%rcx,%rax), %ymm0, %ymm0
vpaddd -192(%rcx,%rax), %ymm0, %ymm0
vpaddd -160(%rcx,%rax), %ymm0, %ymm0
vpaddd -128(%rcx,%rax), %ymm0, %ymm0
vpaddd -96(%rcx,%rax), %ymm0, %ymm0
vpaddd -64(%rcx,%rax), %ymm0, %ymm0
vpaddd -32(%rcx,%rax), %ymm0, %ymm0
vpaddd (%rcx,%rax), %ymm0, %ymm0
addq $736, %rax
cmpq $14688, %rax
jne .LBB1_1
// ...
avx_ilp::test_medium(std::array<int, 3502ul>):
movl $992, %eax
leaq 8(%rsp), %rcx
vpxor %xmm0, %xmm0, %xmm0
vpxor %xmm1, %xmm1, %xmm1
vpxor %xmm2, %xmm2, %xmm2
vpxor %xmm3, %xmm3, %xmm3
.LBB5_1:
vpaddd -992(%rcx,%rax), %ymm2, %ymm2
vpaddd -960(%rcx,%rax), %ymm1, %ymm1
vpaddd -928(%rcx,%rax), %ymm0, %ymm0
vpaddd -896(%rcx,%rax), %ymm3, %ymm3
vpaddd -864(%rcx,%rax), %ymm2, %ymm2
vpaddd -832(%rcx,%rax), %ymm1, %ymm1
vpaddd -800(%rcx,%rax), %ymm0, %ymm0
vpaddd -768(%rcx,%rax), %ymm3, %ymm3
vpaddd -736(%rcx,%rax), %ymm2, %ymm2
vpaddd -704(%rcx,%rax), %ymm1, %ymm1
vpaddd -672(%rcx,%rax), %ymm0, %ymm0
vpaddd -640(%rcx,%rax), %ymm3, %ymm3
vpaddd -608(%rcx,%rax), %ymm2, %ymm2
vpaddd -544(%rcx,%rax), %ymm0, %ymm0
vpaddd -576(%rcx,%rax), %ymm1, %ymm1
vpaddd -512(%rcx,%rax), %ymm3, %ymm4
vpaddd -448(%rcx,%rax), %ymm1, %ymm3
vpaddd -480(%rcx,%rax), %ymm2, %ymm2
vpaddd -416(%rcx,%rax), %ymm0, %ymm0
vpaddd -384(%rcx,%rax), %ymm4, %ymm1
cmpq $14304, %rax
je .LBB5_3
vpaddd -352(%rcx,%rax), %ymm2, %ymm2
vpaddd -320(%rcx,%rax), %ymm3, %ymm3
vpaddd -288(%rcx,%rax), %ymm0, %ymm0
vpaddd -256(%rcx,%rax), %ymm1, %ymm1
vpaddd -224(%rcx,%rax), %ymm2, %ymm2
vpaddd -160(%rcx,%rax), %ymm0, %ymm0
vpaddd -192(%rcx,%rax), %ymm3, %ymm3
vpaddd -128(%rcx,%rax), %ymm1, %ymm4
vpaddd -64(%rcx,%rax), %ymm3, %ymm1
vpaddd -96(%rcx,%rax), %ymm2, %ymm2
vpaddd -32(%rcx,%rax), %ymm0, %ymm0
vpaddd (%rcx,%rax), %ymm4, %ymm3
addq $1024, %rax
jmp .LBB5_1
// ...
修改后可以确认循环展开和多路寄存器都是有的,而且不再有诡异的边界计算。一次性解决了两个问题。简单对比,可以认为 ILP 版本和 fold_left 基本一致,AVX 版本去掉了多路 YMM 并行(只依靠硬件调度)。
--------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
--------------------------------------------------------------------------------------
BM_sum_avx2/35 0.968 ns 0.968 ns 670405544 bytes_per_second=134.668Gi/s
BM_sum_avx2/350 9.82 ns 9.82 ns 68731562 bytes_per_second=132.756Gi/s
BM_sum_avx2/3502 96.4 ns 96.4 ns 7216600 bytes_per_second=135.349Gi/s
BM_sum_avx2/35023 1055 ns 1055 ns 651462 bytes_per_second=123.717Gi/s
BM_sum_avx2/350234 15576 ns 15575 ns 49096 bytes_per_second=83.7717Gi/s
BM_sum_avx2_ilp<4>/35 0.952 ns 0.952 ns 715878822 bytes_per_second=136.893Gi/s
BM_sum_avx2_ilp<4>/350 9.79 ns 9.79 ns 71926717 bytes_per_second=133.126Gi/s
BM_sum_avx2_ilp<4>/3502 85.1 ns 85.1 ns 8083910 bytes_per_second=153.384Gi/s
BM_sum_avx2_ilp<4>/35023 1047 ns 1045 ns 650510 bytes_per_second=124.797Gi/s
BM_sum_avx2_ilp<4>/350234 15939 ns 15938 ns 48235 bytes_per_second=81.863Gi/s
BM_sum_std_fold_left/35 1.05 ns 1.05 ns 663313893 bytes_per_second=124.056Gi/s
BM_sum_std_fold_left/350 8.74 ns 8.74 ns 78781858 bytes_per_second=149.122Gi/s
BM_sum_std_fold_left/3502 91.0 ns 91.0 ns 7675929 bytes_per_second=143.435Gi/s
BM_sum_std_fold_left/35023 1115 ns 1113 ns 630869 bytes_per_second=117.201Gi/s
BM_sum_std_fold_left/350234 16219 ns 16218 ns 45655 bytes_per_second=80.449Gi/s
另外可以看出,此时 AVX 版本已经和 ILP 版本没有性能差异。也即是说,即使只用单个 YMM 寄存器,Zen 3 处理器的硬件调度仍然是非常可靠的,在编译器优化到位的前提下,不需要显式手写 ILP 设计。
浮点 benchmark
--------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
--------------------------------------------------------------------------------------
BM_sum_avx2/35 1.46 ns 1.46 ns 472044277 bytes_per_second=89.2223Gi/s
BM_sum_avx2/350 12.2 ns 12.2 ns 55809571 bytes_per_second=106.701Gi/s
BM_sum_avx2/3502 274 ns 274 ns 2538655 bytes_per_second=47.5458Gi/s
BM_sum_avx2/35023 3090 ns 3089 ns 229860 bytes_per_second=42.2333Gi/s
BM_sum_avx2/350234 31352 ns 31351 ns 21813 bytes_per_second=41.6167Gi/s
BM_sum_avx2_ilp<4>/35 1.38 ns 1.38 ns 492076739 bytes_per_second=94.4973Gi/s
BM_sum_avx2_ilp<4>/350 12.0 ns 12.0 ns 57376116 bytes_per_second=108.563Gi/s
BM_sum_avx2_ilp<4>/3502 89.9 ns 89.9 ns 7725717 bytes_per_second=145.178Gi/s
BM_sum_avx2_ilp<4>/35023 1044 ns 1044 ns 651714 bytes_per_second=125.029Gi/s
BM_sum_avx2_ilp<4>/350234 15670 ns 15670 ns 48038 bytes_per_second=83.2645Gi/s
BM_sum_std_fold_left/35 1.39 ns 1.39 ns 503907127 bytes_per_second=93.8597Gi/s
BM_sum_std_fold_left/350 10.6 ns 10.6 ns 66140767 bytes_per_second=123.357Gi/s
BM_sum_std_fold_left/3502 93.7 ns 93.7 ns 7384452 bytes_per_second=139.258Gi/s
BM_sum_std_fold_left/35023 1053 ns 1053 ns 641127 bytes_per_second=123.88Gi/s
BM_sum_std_fold_left/350234 15627 ns 15627 ns 47696 bytes_per_second=83.4937Gi/s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// 需要明确启用 -ffast-math 编译,否则标准库吞吐量是个位数
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
#include <array>
#include <vector>
#include <random>
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
[&]<auto I>(this auto &&self) {
if constexpr (I < Last) {
f.template operator()<I>();
self.template operator()<I + 1>();
}
}.template operator()<First>();
};
template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
constexpr auto operator()(auto &&r) const noexcept {
auto v = std::forward<decltype(r)>(r) | std::views::all;
auto n = std::ranges::size(v) / Lane;
auto i = std::views::iota(size_t{0}, n);
auto f = [v](auto index) -> decltype(auto) { return v[index * Lane]; };
return std::views::transform(i, f);
}
};
template <auto i>
constexpr simdify_t<i> simdify;
float sum_avx2(std::ranges::range auto &&rng) {
constexpr auto lane = sizeof(__m256) / sizeof(float);
__m256 partial_sum {};
auto simd_view = rng
| simdify<lane>;
for(auto &&simd_v : simd_view) {
auto addr = &simd_v;
auto loadu = _mm256_loadu_ps(addr);
partial_sum = _mm256_add_ps(partial_sum, loadu);
}
auto scalar_view = rng
| std::views::drop(lane * std::ranges::size(simd_view));
float sum = std::ranges::fold_left(scalar_view, 0.0f, std::plus());
float temp[lane];
_mm256_storeu_ps(std::ranges::data(temp), partial_sum);
sum = std::ranges::fold_left(temp, sum, std::plus());
return sum;
}
template <size_t ILP = 4>
float sum_avx2_ilp(std::ranges::range auto &&rng) {
constexpr auto lane = sizeof(__m256) / sizeof(float);
constexpr auto bulk = lane * ILP;
__m256 partial_sum[ILP] {};
auto process_simd = [&](auto konstexpr_for, auto simd_view) {
for(auto &&simd_v : simd_view) {
konstexpr_for([&, addr = &simd_v]<auto Index> {
auto &partial = partial_sum[Index];
auto loadu = _mm256_loadu_ps(addr + Index * lane);
partial = _mm256_add_ps(partial, loadu);
});
}
};
// SIMD+ILP
auto bulk_simd_view = rng
| simdify<lane>
| simdify<ILP>;
process_simd(constexpr_for<0, ILP>, bulk_simd_view);
// SIMD
auto single_simd_view = rng
| std::views::drop(bulk * std::ranges::size(bulk_simd_view))
| simdify<lane>;
process_simd(constexpr_for<0, 1>, single_simd_view);
auto scalar_view = rng
| std::views::drop(bulk * std::ranges::size(bulk_simd_view))
| std::views::drop(lane * std::ranges::size(single_simd_view));
float sum = std::ranges::fold_left(scalar_view, 0.0f, std::plus());
constexpr_for<1, ILP>([&]<size_t Index> {
partial_sum[0] = _mm256_add_ps(partial_sum[0], partial_sum[Index]);
});
float temp[lane];
_mm256_storeu_ps(std::ranges::data(temp), partial_sum[0]);
sum = std::ranges::fold_left(temp, sum, std::plus());
return sum;
}
// ----------------------------------------------------------------------------
// Google Benchmark
// ----------------------------------------------------------------------------
template <size_t Size>
const auto& generate_data() {
static const auto arr = [] {
std::array<float, Size> result;
std::mt19937 gen{std::random_device{}()};
// No overflow.
std::uniform_real_distribution<float> dist(-10, 10);
std::ranges::generate(result, [&] { return dist(gen); });
return result;
} ();
return arr;
}
template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
const auto &rng = generate_data<Size>();
for(auto _ : state) {
auto res = func(rng);
benchmark::DoNotOptimize(res);
}
state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(float));
}
template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
auto name, auto func) {
(benchmark::RegisterBenchmark(
std::string(name) + "/" + std::to_string(Is),
[func](benchmark::State& state) {
BM_run<Is>(state, func);
}
), ...);
}
int main(int argc, char** argv) {
std::integer_sequence<size_t,
35,
350,
3502,
35023,
350234
> seq;
auto register_test = [seq](auto name, auto func) {
register_tests(seq, name, func);
};
register_test("BM_sum_avx2",
[](const auto &r) { return sum_avx2(r); });
register_test("BM_sum_avx2_ilp<4>",
[](const auto &r) { return sum_avx2_ilp<4>(r); });
register_test("BM_sum_std_fold_left",
[](const auto &r) { return std::ranges::fold_left(r, 0.0f, std::plus()); });
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
return 0;
}
更复杂的并行 scan(前缀和)benchmark
---------------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
---------------------------------------------------------------------------------------
BM_scan_avx2/35 7.26 ns 7.26 ns 94313812 bytes_per_second=17.9496Gi/s
BM_scan_avx2/350 118 ns 118 ns 5832606 bytes_per_second=11.0215Gi/s
BM_scan_avx2/3502 1367 ns 1366 ns 515752 bytes_per_second=9.54705Gi/s
BM_scan_avx2/35023 13817 ns 13808 ns 50688 bytes_per_second=9.4486Gi/s
BM_scan_avx2/350234 142191 ns 142092 ns 4985 bytes_per_second=9.18223Gi/s
BM_scan_ilp<4>/35 6.35 ns 6.35 ns 109700105 bytes_per_second=20.5368Gi/s
BM_scan_ilp<4>/350 54.6 ns 54.6 ns 11938873 bytes_per_second=23.8893Gi/s
BM_scan_ilp<4>/3502 528 ns 527 ns 1328081 bytes_per_second=24.77Gi/s
BM_scan_ilp<4>/35023 5243 ns 5239 ns 132626 bytes_per_second=24.9029Gi/s
BM_scan_ilp<4>/350234 53049 ns 53004 ns 13056 bytes_per_second=24.6155Gi/s
BM_simple/35 5.68 ns 5.67 ns 124240372 bytes_per_second=23.0068Gi/s
BM_simple/350 80.4 ns 80.5 ns 8581244 bytes_per_second=16.191Gi/s
BM_simple/3502 857 ns 858 ns 823705 bytes_per_second=15.2046Gi/s
BM_simple/35023 8386 ns 8405 ns 83875 bytes_per_second=15.5227Gi/s
BM_simple/350234 81456 ns 85884 ns 8071 bytes_per_second=15.1916Gi/s
BM_std_partial_sum/35 5.46 ns 5.73 ns 119831285 bytes_per_second=22.7636Gi/s
BM_std_partial_sum/350 79.7 ns 83.4 ns 8124839 bytes_per_second=15.635Gi/s
BM_std_partial_sum/3502 811 ns 845 ns 831782 bytes_per_second=15.4415Gi/s
BM_std_partial_sum/35023 8081 ns 8376 ns 84210 bytes_per_second=15.576Gi/s
BM_std_partial_sum/350234 83362 ns 86160 ns 8116 bytes_per_second=15.1431Gi/s
BM_std_inclusive_scan/35 5.48 ns 5.65 ns 121750793 bytes_per_second=23.0835Gi/s
BM_std_inclusive_scan/350 77.5 ns 79.7 ns 8456195 bytes_per_second=16.3676Gi/s
BM_std_inclusive_scan/3502 830 ns 847 ns 813811 bytes_per_second=15.41Gi/s
BM_std_inclusive_scan/35023 8272 ns 8432 ns 80930 bytes_per_second=15.4735Gi/s
BM_std_inclusive_scan/350234 84073 ns 85544 ns 8200 bytes_per_second=15.2522Gi/s
BM_std_execution/35 5.59 ns 5.67 ns 123962094 bytes_per_second=22.977Gi/s
BM_std_execution/350 80.4 ns 81.5 ns 8459091 bytes_per_second=15.9979Gi/s
BM_std_execution/3502 835 ns 845 ns 818100 bytes_per_second=15.4332Gi/s
BM_std_execution/35023 8279 ns 8351 ns 80787 bytes_per_second=15.624Gi/s
BM_std_execution/350234 84563 ns 85395 ns 7943 bytes_per_second=15.2786Gi/s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
/// For test.
#include <array>
#include <random>
#include <tuple>
#include <numeric>
#include <execution>
// 改为返回元数据(size)
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
static_assert(First <= Last);
return [&]<auto I>(this auto &&self) {
if constexpr (I < Last) {
f.template operator()<I>();
return self.template operator()<I + 1>();
} else {
return Last - First;
}
}.template operator()<First>();
};
template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
constexpr auto operator()(auto &&r) const noexcept {
auto v = std::forward<decltype(r)>(r) | std::views::all;
auto n = std::ranges::size(v) / Lane;
auto i = std::views::iota(size_t{0}, n);
auto f = [v](auto index) -> decltype(auto) {
return v[index * Lane];
};
return std::views::transform(i, f);
}
};
template <auto i>
constexpr simdify_t<i> simdify;
template <size_t ILP = 4>
int scan_ilp(std::ranges::range auto &&rng) {
constexpr auto lane = sizeof(__m256i) / sizeof(int);
constexpr auto bulk = ILP * lane;
auto inner_scan = [](const __m256i &v1) {
// X2:
// h g f e | d c b a (from v)
// g f e 0 | c b a 0 (from s)
// =>
// ... | c+d,b+c,a+b,a
auto s1 = _mm256_slli_si256(v1, sizeof(int));
auto v2 = _mm256_add_epi32(v1, s1);
// X4:
// ... | c+d,b+c,a+b,a
// ... | a+b,a
// =>
// ... | a+b+c+d,a+b+c,a+b,a
auto s2 = _mm256_slli_si256(v2, 2 * sizeof(int));
auto v3 = _mm256_add_epi32(v2, s2);
// X8:
// Cross lane:
// a+b+c+d,a+b+c,a+b+c+d,a+b+c | a+b+c+d,a+b+c,a+b+c+d,a+b+c
auto p3 = _mm256_permute4x64_epi64(v3, _MM_SHUFFLE(1, 1, 1, 1));
// Broadcast:
// a+b+c+d,a+b+c+d,a+b+c+d,a+b+c+d | a+b+c+d,a+b+c+d,a+b+c+d,a+b+c+d
auto s3 = _mm256_shuffle_epi32(p3, _MM_SHUFFLE(3, 3, 3, 3));
// a+b+c+d+e+f+g+h,a+b+c+d+e+f+g,a+b+c+d+e+f,a+b+c+d+e | XXXX
auto a3 = _mm256_add_epi32(v3, s3);
// What we need.
return _mm256_blend_epi32(v3, a3, 0b11110000);
};
auto get_carry = [](const __m256i &result) {
auto buffer = _mm256_permute4x64_epi64(result, _MM_SHUFFLE(3, 3, 3, 3));
return _mm256_shuffle_epi32(buffer, _MM_SHUFFLE(1, 1, 1, 1));
};
auto sum = _mm256_setzero_si256();
auto process_simd = [&](auto static_for, auto simd_view) {
constexpr auto size = static_for([]<auto>{});
__m256i results[size] {};
__m256i carries[size] {};
for(auto &&simd_v : simd_view) {
static_for([&, addr = &simd_v]<auto Index> {
auto loadu = _mm256_loadu_si256((__m256i*)(addr + Index * lane));
results[Index] = inner_scan(loadu);
});
static_for([&]<auto Index> {
carries[Index] = get_carry(results[Index]);
});
static_for([&, addr = &simd_v]<auto Index> {
auto result = _mm256_add_epi32(results[Index], sum);
_mm256_storeu_si256((__m256i*)(addr + Index * lane), result);
sum = _mm256_add_epi32(sum, carries[Index]);
});
}
};
auto bulk_simd_view = rng
| simdify<lane>
| simdify<ILP>;
process_simd(constexpr_for<0, ILP>, bulk_simd_view);
auto single_simd_view = rng
| std::views::drop(bulk * std::ranges::size(bulk_simd_view))
| simdify<lane>;
process_simd(constexpr_for<0, 1>, single_simd_view);
auto scalar_view = rng
| std::views::drop(bulk * std::ranges::size(bulk_simd_view))
| std::views::drop(lane * std::ranges::size(single_simd_view));
std::inclusive_scan(std::ranges::begin(scalar_view),
std::ranges::end(scalar_view),
std::ranges::begin(scalar_view),
std::plus(),
_mm256_cvtsi256_si32(sum));
return 0;
}
int scan_avx2(std::ranges::range auto &&rng) {
constexpr auto lane = sizeof(__m256i) / sizeof(int);
auto inner_scan = [](const __m256i &v1) {
auto s1 = _mm256_slli_si256(v1, sizeof(int));
auto v2 = _mm256_add_epi32(v1, s1);
auto s2 = _mm256_slli_si256(v2, 2 * sizeof(int));
auto v3 = _mm256_add_epi32(v2, s2);
auto p3 = _mm256_permute4x64_epi64(v3, _MM_SHUFFLE(1, 1, 1, 1));
auto s3 = _mm256_shuffle_epi32(p3, _MM_SHUFFLE(3, 3, 3, 3));
auto a3 = _mm256_add_epi32(v3, s3);
return _mm256_blend_epi32(v3, a3, 0b11110000);
};
auto get_carry = [](const __m256i &result) {
auto buffer = _mm256_permute4x64_epi64(result, _MM_SHUFFLE(3, 3, 3, 3));
return _mm256_shuffle_epi32(buffer, _MM_SHUFFLE(1, 1, 1, 1));
};
auto sum = _mm256_setzero_si256();
auto single_simd_view = rng
| simdify<lane>;
for(auto &&simd_v : single_simd_view) {
auto data = (__m256i*)&simd_v;
auto block_v = _mm256_loadu_si256(data);
auto result = inner_scan(block_v);
auto carry = get_carry(result);
result = _mm256_add_epi32(result, sum);
_mm256_storeu_si256(data, result);
sum = _mm256_add_epi32(sum, carry);
}
auto scalar_view = rng
| std::views::drop(lane * std::ranges::size(single_simd_view));
std::inclusive_scan(std::ranges::begin(scalar_view),
std::ranges::end(scalar_view),
std::ranges::begin(scalar_view),
std::plus(),
_mm256_cvtsi256_si32(sum));
return 0;
}
int simple(std::ranges::range auto &&rng) {
int sum = 0;
for(auto &v : rng) {
sum += v;
v = sum;
}
return 0;
}
// ----------------------------------------------------------------------------
// Google Benchmark
// ----------------------------------------------------------------------------
template <size_t Size>
const auto& generate_data() {
static const auto arr = [] {
std::array<int, Size> result;
std::mt19937 gen{std::random_device{}()};
// No overflow.
std::uniform_int_distribution dist(-10, 10);
std::ranges::generate(result, [&] { return dist(gen); });
return result;
} ();
return arr;
}
template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
// Copy.
auto rng = generate_data<Size>();
for(auto _ : state) {
auto res = func(rng);
benchmark::DoNotOptimize(res);
benchmark::DoNotOptimize(std::ranges::data(rng));
}
state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(int));
}
template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
auto name, auto func) {
(benchmark::RegisterBenchmark(
std::string(name) + "/" + std::to_string(Is),
[func](benchmark::State& state) {
BM_run<Is>(state, func);
}
), ...);
}
int main(int argc, char** argv) {
std::integer_sequence<size_t,
35,
350,
3502,
35023,
350234
> seq;
auto register_test = [seq](auto name, auto func) {
register_tests(seq, name, func);
};
register_test("BM_scan_avx2",
[](auto &r) { return scan_avx2(r); });
register_test("BM_scan_ilp<1>",
[](auto &r) { return scan_ilp<1>(r); });
register_test("BM_scan_ilp<2>",
[](auto &r) { return scan_ilp<2>(r); });
register_test("BM_scan_ilp<4>",
[](auto &r) { return scan_ilp<4>(r); });
register_test("BM_scan_ilp<6>",
[](auto &r) { return scan_ilp<6>(r); });
register_test("BM_scan_ilp<8>",
[](auto &r) { return scan_ilp<8>(r); });
register_test("BM_simple",
[](auto &r) { return simple(r); });
register_test("BM_std_partial_sum", [](auto &r) {
return std::partial_sum(std::ranges::begin(r),
std::ranges::end(r),
std::ranges::begin(r));
});
register_test("BM_std_inclusive_scan", [](auto &r) {
return std::inclusive_scan(std::ranges::begin(r),
std::ranges::end(r),
std::ranges::begin(r),
std::plus(), 0);
});
register_test("BM_std_execution", [](auto &r) {
return std::inclusive_scan(std::execution::unseq,
std::ranges::begin(r),
std::ranges::end(r),
std::ranges::begin(r),
std::plus(), 0);
});
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
return 0;
}
过于用力的 LUT(ASCII 查找表)benchmark
----------------------------------------------------------------------------------
Benchmark Time CPU Iterations UserCounters...
----------------------------------------------------------------------------------
BM_lookup_avx2/35 4.46 ns 4.26 ns 163460034 bytes_per_second=7.65192Gi/s
BM_lookup_avx2/350 56.3 ns 53.8 ns 12806205 bytes_per_second=6.05853Gi/s
BM_lookup_avx2/3502 419 ns 401 ns 1767725 bytes_per_second=8.13886Gi/s
BM_lookup_avx2/35023 4299 ns 4110 ns 170806 bytes_per_second=7.93551Gi/s
BM_lookup_avx2/350234 45226 ns 43236 ns 16284 bytes_per_second=7.54423Gi/s
BM_lookup_ilp<4>/35 4.44 ns 4.24 ns 165368195 bytes_per_second=7.68365Gi/s
BM_lookup_ilp<4>/350 51.0 ns 48.8 ns 14175979 bytes_per_second=6.68431Gi/s
BM_lookup_ilp<4>/3502 517 ns 494 ns 1373566 bytes_per_second=6.60049Gi/s
BM_lookup_ilp<4>/35023 4916 ns 4696 ns 146758 bytes_per_second=6.94512Gi/s
BM_lookup_ilp<4>/350234 52513 ns 50197 ns 13764 bytes_per_second=6.49798Gi/s
BM_lookup_scalar/35 17.0 ns 16.3 ns 41531748 bytes_per_second=2.00183Gi/s
BM_lookup_scalar/350 86.4 ns 82.6 ns 8464803 bytes_per_second=3.94739Gi/s
BM_lookup_scalar/3502 783 ns 749 ns 942863 bytes_per_second=4.35576Gi/s
BM_lookup_scalar/35023 8002 ns 7643 ns 92445 bytes_per_second=4.26769Gi/s
BM_lookup_scalar/350234 80248 ns 76722 ns 8937 bytes_per_second=4.25149Gi/s
BM_std_transform/35 17.0 ns 16.2 ns 42159156 bytes_per_second=2.00726Gi/s
BM_std_transform/350 87.9 ns 84.0 ns 8537585 bytes_per_second=3.87842Gi/s
BM_std_transform/3502 779 ns 745 ns 938266 bytes_per_second=4.38055Gi/s
BM_std_transform/35023 8372 ns 7972 ns 91165 bytes_per_second=4.09155Gi/s
BM_std_transform/350234 81403 ns 77818 ns 8972 bytes_per_second=4.1916Gi/s
BM_std_execution/35 17.0 ns 16.3 ns 42631765 bytes_per_second=2.00524Gi/s
BM_std_execution/350 86.4 ns 82.6 ns 8399217 bytes_per_second=3.94398Gi/s
BM_std_execution/3502 783 ns 748 ns 940645 bytes_per_second=4.35812Gi/s
BM_std_execution/35023 7972 ns 7619 ns 90280 bytes_per_second=4.28105Gi/s
BM_std_execution/350234 81110 ns 77531 ns 9081 bytes_per_second=4.20712Gi/s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
#include <cassert>
/// For test.
#include <array>
#include <iostream>
#include <vector>
#include <random>
#include <chrono>
#include <tuple>
#include <execution>
namespace stdr = std::ranges;
namespace stdv = std::views;
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
[&]<auto I>(this auto &&self) {
if constexpr (I < Last) {
f.template operator()<I>();
self.template operator()<I + 1>();
}
}.template operator()<First>();
};
template <size_t Lane>
struct simdify_t: stdr::range_adaptor_closure<simdify_t<Lane>> {
constexpr auto operator()(auto &&r) const noexcept {
auto v = std::forward<decltype(r)>(r) | stdv::all;
auto n = stdr::size(v) / Lane;
auto i = stdv::iota(size_t{0}, n);
auto f = [v](auto index) -> decltype(auto) {
return v[index * Lane];
};
return stdv::transform(i, f);
}
};
template <auto i>
constexpr simdify_t<i> simdify;
template <size_t ILP = 4>
void lookup_ilp(stdr::range auto &&source, stdr::range auto &&lookup_table) {
assert(stdr::size(lookup_table) >= 256 && "We need a char-width table.");
constexpr auto table_size = 256;
constexpr auto lane = sizeof(__m256i);
constexpr auto bulk = ILP * lane;
__m256i luts[table_size / sizeof(__m128i)];
constexpr_for<0, stdr::size(luts)>([&, data = stdr::data(lookup_table)]<auto Index> {
auto buffer128 = _mm_loadu_si128((__m128i*)(data) + Index);
luts[Index] = _mm256_broadcastsi128_si256(buffer128);
});
const auto simd_zero = _mm256_setzero_si256();
auto reduce = [&](__m256i *simd_addr) {
// Reused in each reduction.
// The first round of reduction uses size(luts)/2.
// The second round uses size(luts)/4, size(luts)/8, ...
__m256i arenas[stdr::size(luts) / 2];
auto full = _mm256_loadu_si256(simd_addr);
auto nibble = _mm256_and_si256(full, _mm256_set1_epi8((char)0x0f));
// Start from bit 4 to bit 7.
constexpr auto bias = 4;
constexpr auto for_each_bit = constexpr_for<4, 8>;
for_each_bit([&]<auto Bit> {
constexpr auto bit_shift = static_cast<char>(1 << Bit);
auto test1 = _mm256_and_si256(full, _mm256_set1_epi8(bit_shift));
auto blend_mask = _mm256_cmpeq_epi8(test1, simd_zero);
// Bit 4 as the first round.
constexpr auto round = Bit - bias;
constexpr auto tree_depth = stdr::size(arenas) >> round;
constexpr auto tree_reduce = constexpr_for<0, tree_depth>;
tree_reduce([&]<auto Level> {
auto emit = [&]<auto i> /* -> decltype(auto) */ {
if constexpr (round > 0) return arenas[i];
else return _mm256_shuffle_epi8(luts[i], nibble);
};
auto &&lo = emit.template operator()<Level * 2>();
auto &&hi = emit.template operator()<Level * 2 + 1>();
arenas[Level] = _mm256_blendv_epi8(hi, lo, blend_mask);
});
});
_mm256_storeu_si256(simd_addr, arenas[0]);
};
auto process_simd = [&](auto konstexpr_for, auto simd_view) {
for(auto &&simd_v : simd_view) {
konstexpr_for([&, addr = &simd_v]<auto Index> {
reduce((__m256i*)(addr) + Index);
});
}
};
auto bulk_simd_view = source | simdify<lane> | simdify<ILP>;
process_simd(constexpr_for<0, ILP>, bulk_simd_view);
auto single_simd_view = source
| stdv::drop(bulk * stdr::size(bulk_simd_view))
| simdify<lane>;
process_simd(constexpr_for<0, 1>, single_simd_view);
auto scalar_view = source
| stdv::drop(bulk * stdr::size(bulk_simd_view))
| stdv::drop(lane * stdr::size(single_simd_view));
for(auto &v : scalar_view) {
v = lookup_table[v];
}
}
void lookup_avx2(stdr::range auto &&source, stdr::range auto &&lookup_table) {
assert(stdr::size(lookup_table) >= 256 && "We need a char-width table.");
constexpr auto table_size = 256;
constexpr auto lane = sizeof(__m256i);
__m256i luts[table_size / sizeof(__m128i)];
constexpr_for<0, stdr::size(luts)>([&, data = stdr::data(lookup_table)]<auto Index> {
auto buffer128 = _mm_loadu_si128((__m128i*)(data) + Index);
luts[Index] = _mm256_broadcastsi128_si256(buffer128);
});
// Reused in each reduction.
// The first round of reduction uses size(luts)/2.
// The second round uses size(luts)/4, size(luts)/8, ...
__m256i arenas[stdr::size(luts) / 2];
auto simd_zero = _mm256_setzero_si256();
auto simd_view = source | simdify<lane>;
for(auto &&simd_v : simd_view) {
auto addr = &simd_v;
auto full = _mm256_loadu_si256((__m256i*)addr);
auto nibble = _mm256_and_si256(full, _mm256_set1_epi8((char)0x0f));
// Start from bit 4 to bit 7.
constexpr auto bias = 4;
constexpr auto for_each_bit = constexpr_for<4, 8>;
for_each_bit([&]<auto Bit> {
constexpr auto bit_shift = static_cast<char>(1 << Bit);
auto test1 = _mm256_and_si256(full, _mm256_set1_epi8(bit_shift));
auto blend_mask = _mm256_cmpeq_epi8(test1, simd_zero);
// Bit 4 as the first round.
constexpr auto round = Bit - bias;
constexpr auto tree_depth = stdr::size(arenas) >> round;
constexpr auto tree_reduce = constexpr_for<0, tree_depth>;
tree_reduce([&]<auto Level> {
auto emit = [&]<auto i> /* -> decltype(auto) */ {
if constexpr (round > 0) return arenas[i];
else return _mm256_shuffle_epi8(luts[i], nibble);
};
auto &&lo = emit.template operator()<Level * 2>();
auto &&hi = emit.template operator()<Level * 2 + 1>();
arenas[Level] = _mm256_blendv_epi8(hi, lo, blend_mask);
});
});
_mm256_storeu_si256((__m256i*)addr, arenas[0]);
}
auto scalar_view = source
| stdv::drop(lane * stdr::size(simd_view));
for(auto &v : scalar_view) {
v = lookup_table[v];
}
}
void lookup_scalar(auto &&rng, auto &&lut) {
for(auto &v : rng) {
v = lut[v];
}
}
void lookup_std_transform(auto &&rng, auto &&lut) {
std::transform(stdr::begin(rng), stdr::end(rng), stdr::begin(rng),
[&](auto idx) { return lut[idx]; });
}
void lookup_std_execution(auto &&rng, auto &&lut) {
std::transform(std::execution::unseq,
stdr::begin(rng), stdr::end(rng), stdr::begin(rng),
[&](auto idx) { return lut[idx]; });
}
// ----------------------------------------------------------------------------
// Google Benchmark Infrastructure
// ----------------------------------------------------------------------------
// Generate constant random LUT (256 bytes)
const auto& get_lut() {
static const auto arr = [] {
std::array<uint8_t, 256> result;
std::mt19937 gen{std::random_device{}()};
std::uniform_int_distribution<int> dist(0, 255);
stdr::generate(result, [&] { return static_cast<uint8_t>(dist(gen)); });
return result;
}();
return arr;
}
// Generate random source data
template <size_t Size>
const auto& generate_data() {
static const auto arr = [] {
std::array<uint8_t, Size> result;
std::mt19937 gen{std::random_device{}()};
std::uniform_int_distribution<int> dist(0, 255);
stdr::generate(result, [&] { return static_cast<uint8_t>(dist(gen)); });
return result;
}();
return arr;
}
template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
// 1. Get static const data (no cost here)
const auto& source_template = generate_data<Size>();
const auto& lut = get_lut();
for(auto _ : state) {
// 2. Make a mutable copy on the stack
// Note: This copy overhead is included in the benchmark time.
// Since lookup is O(N) and copy is O(N), this dilutes the speedup ratio
// but keeps the code structure simple as requested.
auto rng = source_template;
// 3. Run algo
func(rng, lut);
// 4. Prevent optimization
benchmark::DoNotOptimize(rng);
}
state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(uint8_t));
}
template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
auto name, auto func) {
(benchmark::RegisterBenchmark(
std::string(name) + "/" + std::to_string(Is),
[func](benchmark::State& state) {
BM_run<Is>(state, func);
}
), ...);
}
int main(int argc, char** argv) {
std::integer_sequence<size_t,
35,
350,
3502,
35023,
350234
> seq;
auto register_test = [seq](auto name, auto func) {
register_tests(seq, name, func);
};
register_test("BM_lookup_avx2",
[](auto &r, auto &lut) { lookup_avx2(r, lut); });
register_test("BM_lookup_ilp<4>",
[](auto &r, auto &lut) { lookup_ilp<4>(r, lut); });
register_test("BM_lookup_scalar",
[](auto &r, auto &lut) { lookup_scalar(r, lut); });
register_test("BM_std_transform",
[](auto &r, auto &lut) { lookup_std_transform(r, lut); });
register_test("BM_std_execution",
[](auto &r, auto &lut) { lookup_std_execution(r, lut); });
benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
return 0;
}
NOTE: 如果 benchmark 使用延迟更高的指令,显式 ILP 仍然有性能提升。详见上方数据。
总结
基本上没别的事情了,因为向量求和算法很固定,所以自动向量化本来挺好的。
但是如果考虑手写/混用 SIMD intrinsic 的话,开发者还是要有手段去控制编译器的优化行为。constexpr_for 是一个极好的控制 ILP 的工具,它是现代且可移植的写法。
处理器的硬件调度能力也是个考虑点,Zen 3 的表现就是优化(暴力展开)到位了,显式 ILP 无所谓。但是前面在你不知道性能瓶颈的时候,简单的显式 ILP 设计就能把性能提升数倍,还不需要做剖析(本文有基线,省了 perf 环节)。考虑到其他更加复杂的算法和延迟更高的向量指令,显式 ILP 的做法仍然是非常的甜点。
std::ranges 的组合玩法很好,但是不看汇编的话你不知道有什么坑。本文中的 ILP 程序性能瓶颈就是 ranges 导致的。