太长不看

template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
    [&]<auto I>(this auto &&self) {
        if constexpr (I < Last) {
            f.template operator()<I>();
            self.template operator()<I + 1>();
        }
    }.template operator()<First>();
};

简单结论:

  • 使用 SIMD intrinsic 编程时,编译器 -O3 并不提供充分的 ILP 优化,因此需要显式声明。
  • 使用 constexpr_for 即可做到显式 ILP。
  • 在编译器 -O3 优化质量较低时,显式 ILP 可以掩盖性能问题。
  • 注意 std::ranges 的抽象税。

下面会用 SIMD 求和的示例程序来说明。限于篇幅,本文不会解释 ILP 的基本概念。

SIMD sum

#include <x86intrin.h>
#include <algorithm>
#include <ranges>

// 使用 C++23 的 range_adaptor_closure 进行快速定制适配器
// 从而方便构造 SIMD 的批量视图(非完整块会丢弃)
template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
    constexpr auto operator()(auto &&view) const noexcept {
        auto size = std::ranges::size(view);
        return std::forward<decltype(view)>(view)
            | std::views::stride(Lane) // 每 Lane 个元素取一个
            | std::views::take(size / Lane); // 总共只取 size / Lane 个元素
    }
};
template <auto i>
constexpr simdify_t<i> simdify;

int sum_avx2(std::ranges::range auto &&rng) {
    constexpr auto lane = sizeof(__m256i) / sizeof(int);
    __m256i partial_sum {};

    auto simd_view = rng
                   | simdify<lane>;
    for(auto &&simd_v : simd_view) {
        auto addr = &simd_v;
        auto loadu = _mm256_loadu_si256((__m256i*)addr);
        partial_sum = _mm256_add_epi32(partial_sum, loadu);
    }

    auto scalar_view = rng
                     | std::views::drop(lane * std::ranges::size(simd_view));
    int sum = std::ranges::fold_left(scalar_view, 0, std::plus());

    int temp[lane];
    // Clang 编译器会优化为寄存器内归约操作
    _mm256_storeu_si256((__m256i*)std::ranges::data(temp), partial_sum);
    sum = std::ranges::fold_left(temp, sum, std::plus());

    return sum;
}

我们看一个 AVX2 版本的 SIMD 求和实现。非常直接,示例程序将输入的整个 range 拆分为 SIMD 视图和 scalar 视图并分别计算,计算完成后合并求和结果。

SIMD+ILP sum

#include <x86intrin.h>
#include <algorithm>
#include <ranges>

// 显式 ILP 的工具
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
    [&]<auto I>(this auto &&self) {
        if constexpr (I < Last) {
            f.template operator()<I>();
            self.template operator()<I + 1>();
        }
    }.template operator()<First>();
};

template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
    constexpr auto operator()(auto &&view) const noexcept {
        auto size = std::ranges::size(view);
        return std::forward<decltype(view)>(view)
            | std::views::stride(Lane)
            | std::views::take(size / Lane);
    }
};
template <auto i>
constexpr simdify_t<i> simdify;

template <size_t ILP = 4>
int sum_avx2_ilp(std::ranges::range auto &&rng) {
    constexpr auto lane = sizeof(__m256i) / sizeof(int);
    constexpr auto bulk = lane * ILP;
    __m256i partial_sum[ILP] {};
    auto process_simd = [&](auto konstexpr_for, auto simd_view) {
        for(auto &&simd_v : simd_view) {
            // ILP 展开,最高一次并行处理 lane * ILP * 4 字节的数据
            konstexpr_for([&, addr = &simd_v]<auto Index> {
                auto &partial = partial_sum[Index];
                auto loadu = _mm256_loadu_si256(
                    (__m256i*)(addr + Index * lane));
                partial = _mm256_add_epi32(partial, loadu);
            });
        }
    };

    // SIMD+ILP
    auto bulk_simd_view = rng
                        | simdify<lane>
                        | simdify<ILP>; // 这里就知道定制 simdify 的妙处了
    process_simd(constexpr_for<0, ILP>, bulk_simd_view);

    // SIMD
    auto single_simd_view = rng
                          | std::views::drop(bulk * std::ranges::size(bulk_simd_view))
                          | simdify<lane>;
    process_simd(constexpr_for<0, 1>, single_simd_view);

    auto scalar_view = rng
                     | std::views::drop(bulk * std::ranges::size(bulk_simd_view))
                     | std::views::drop(lane * std::ranges::size(single_simd_view));
    int sum = std::ranges::fold_left(scalar_view, 0, std::plus());

    constexpr_for<1, ILP>([&]<size_t Index> {
        partial_sum[0] = _mm256_add_epi32(partial_sum[0], partial_sum[Index]);
    });
    int temp[lane];
    _mm256_storeu_si256((__m256i*)std::ranges::data(temp), partial_sum[0]);
    sum = std::ranges::fold_left(temp, sum, std::plus());

    return sum;
}

显式 ILP 的实现如上。在前面例子的基础上扩展,将输入 range 拆分为三个视图:bulk SIMD / SIMD / scalar,同样是只有无法完整覆盖的块才会被下一级的视图处理。

也就是说 ILP 版本的思路是扩展到流水线的宽度,而不是单个向量寄存器的宽度:处理器端口这么多,大于一总比等于一好。

动机和跑分

可是为什么需要特意写 constexpr_for,编译器不会优化吗?

与其理论,不如直接跑分。我们使用 Clang-20 编译器和 Google Benchmark,开启 -O3 优化和 -march=znver3 目标架构(面向我的 Zen 3 主机)。

代码折叠
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <type_traits>
#include <algorithm>
#include <ranges>
#include <array>
#include <random>

// ...此前的代码

// ----------------------------------------------------------------------------
// Google Benchmark
// ----------------------------------------------------------------------------

template <size_t Size>
const auto& generate_data() {
    static const auto arr = [] {
        std::array<int, Size> result;
        std::mt19937 gen{std::random_device{}()};
        // No overflow.
        std::uniform_int_distribution dist(-10, 10);
        std::ranges::generate(result, [&] { return dist(gen); });
        return result;
    } ();
    return arr;
}

template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
    const auto &rng = generate_data<Size>();

    for(auto _ : state) {
        auto res = func(rng);
        benchmark::DoNotOptimize(res);
    }

    state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(int));
}

template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
                    auto name, auto func) {
    (benchmark::RegisterBenchmark(
        std::string(name) + "/" + std::to_string(Is),
        [func](benchmark::State& state) {
            BM_run<Is>(state, func);
        }
    ), ...);
}

int main(int argc, char** argv) {
    std::integer_sequence<size_t,
        35,
        350,
        3502,
        35023,
        350234
    > seq;

    auto register_test = [seq](auto name, auto func) {
        register_tests(seq, name, func);
    };

    register_test("BM_sum_avx2",
        [](const auto &r) { return sum_avx2(r); });

    register_test("BM_sum_avx2_ilp<4>",
        [](const auto &r) { return sum_avx2_ilp<4>(r); });

    benchmark::Initialize(&argc, argv);
    benchmark::RunSpecifiedBenchmarks();
    benchmark::Shutdown();
    return 0;
}
Run on (16 X 3193.93 MHz CPU s)
CPU Caches:
  L1 Data 32 KiB (x8)
  L1 Instruction 32 KiB (x8)
  L2 Unified 512 KiB (x8)
  L3 Unified 16384 KiB (x1)
Load Average: 0.29, 0.20, 0.11
------------------------------------------------------------------------------------
Benchmark                          Time             CPU   Iterations UserCounters...
------------------------------------------------------------------------------------
BM_sum_avx2/35                  1.00 ns         1.00 ns    690551079 bytes_per_second=130.031Gi/s
BM_sum_avx2/350                 53.4 ns         53.4 ns     12520116 bytes_per_second=24.3976Gi/s
BM_sum_avx2/3502                 621 ns          621 ns      1098424 bytes_per_second=21.007Gi/s
BM_sum_avx2/35023               6116 ns         6116 ns       109537 bytes_per_second=21.3326Gi/s
BM_sum_avx2/350234             63115 ns        63113 ns        10968 bytes_per_second=20.673Gi/s
BM_sum_avx2_ilp<4>/35          0.978 ns        0.978 ns    697817742 bytes_per_second=133.345Gi/s
BM_sum_avx2_ilp<4>/350          14.2 ns         14.2 ns     48383810 bytes_per_second=91.7483Gi/s
BM_sum_avx2_ilp<4>/3502          155 ns          155 ns      4560351 bytes_per_second=84.2754Gi/s
BM_sum_avx2_ilp<4>/35023        1457 ns         1457 ns       475022 bytes_per_second=89.5557Gi/s
BM_sum_avx2_ilp<4>/350234      17217 ns        17210 ns        41257 bytes_per_second=75.8117Gi/s

总之,我们确实得到了数倍的并行性能提升,也验证了 ILP 策略的有效性。

汇编结果

avx::test_medium(std::array<int, 3502ul>):
        // ...
        vpaddd  (%rsi), %ymm0, %ymm0
        // ...

avx_ilp::test_medium(std::array<int, 3502ul>):
        // ...
        vpaddd  (%rcx,%rdx), %ymm1, %ymm1
        vpaddd  32(%rcx,%rdx), %ymm2, %ymm2
        vpaddd  64(%rcx,%rdx), %ymm0, %ymm0
        vpaddd  96(%rcx,%rdx), %ymm3, %ymm3
        // ...

我们可以在 godbolt 快速查验编译器的生成质量。一个很明确的结论是:SIMD intrinsic 确实不会在指令并行这方面有优化,至少现在(2025 年)不行。

NOTES:

  • intrinsic 有一定程度的优化,但是下面会指出它相对常规指令的优化会怂很多。
  • 能不能用 for 替代 constexpr_for?结论是能但是不可靠,Clang 在 -O2 级别就没法优化到位。
  • 关于 constexpr_for 的实现方式有一点话题无关的讨论,见下方折叠。
constexpr_for 实现方式
// 一种更直白的写法
template <auto First, auto Last>
constexpr void constexpr_for(auto &&f) {
    if constexpr (First < Last) {
        f.template operator()<First>();
        constexpr_for<First + 1, Last>(f);
    }
}

其实 constexpr 可以写得更加直白,但是这样的实现是函数而不是函数对象

// 如果这么写,结合函数版本的 constexpr_for……
auto process_simd = [&]<size_t Width>(auto simd_view) {
    for(auto &&simd_v : simd_view) {
        constexpr_for<0, Width>([]<auto> {
            // ...
        });
    }
};

// 这个时候需要
process_simd.template operator()<ILP>(bulk_simd_view);

而又由于 process_simd 是一个 lambda,它的模板需要传入类型信息,如果不想调用用户不友好的 .template operator() 的话需要很曲折的做法,而改为传入一个带类型信息的对象很容易。

template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
    [&]<auto ...I>(std::index_sequence<I...>) {
        (f.template operator()<First + I>(), ...);
    }(std::make_index_sequence<Last - First>{});
};

另一个符合要求的做法也可以不使用 C++23 deducing this 特性。这种实现需要额外的头文件(现实中通常不需要)引入 std::index sequence,并且可读性我觉得不太好,怕被喷就没敢在文章用……当然经典的仿函数类就不用说了。

目前的实现缺陷是,要求的编译器版本比较高,比如 GCC 要开到 14 及以上版本。

也就是说,程序中的 __m256i partial_sum 是只能作为一个 ymm 寄存器去使用。而 constexpr_for 显式地以 ILP 的形式去展开,所以 ILP=4 时会明确使用 4 路并行的 ymm 寄存器。

基线差距

// 在 Google benchmark 注册一个新的测试
register_test("BM_sum_std_fold_left",
    [](const auto &r) { return std::ranges::fold_left(r, 0, std::plus()); });

我们再以 std::ranges::fold_left 作为基线对比,它的实现非常朴素:libstdc++/ranges_algo.h

--------------------------------------------------------------------------------------
Benchmark                            Time             CPU   Iterations UserCounters...
--------------------------------------------------------------------------------------
BM_sum_avx2/35                   0.985 ns        0.985 ns    694303937 bytes_per_second=132.394Gi/s
BM_sum_avx2/350                   53.5 ns         53.5 ns     12573037 bytes_per_second=24.3865Gi/s
BM_sum_avx2/3502                   612 ns          612 ns      1133219 bytes_per_second=21.3033Gi/s
BM_sum_avx2/35023                 6022 ns         6022 ns       114100 bytes_per_second=21.6667Gi/s
BM_sum_avx2/350234               62198 ns        62195 ns        11085 bytes_per_second=20.9778Gi/s
BM_sum_avx2_ilp<4>/35            0.974 ns        0.974 ns    710236295 bytes_per_second=133.852Gi/s
BM_sum_avx2_ilp<4>/350            14.2 ns         14.2 ns     49325710 bytes_per_second=91.594Gi/s
BM_sum_avx2_ilp<4>/3502            142 ns          142 ns      4957734 bytes_per_second=92.1278Gi/s
BM_sum_avx2_ilp<4>/35023          1481 ns         1481 ns       467864 bytes_per_second=88.086Gi/s
BM_sum_avx2_ilp<4>/350234        17261 ns        17256 ns        40704 bytes_per_second=75.6079Gi/s
BM_sum_std_fold_left/35          0.988 ns        0.988 ns    691087391 bytes_per_second=131.98Gi/s
BM_sum_std_fold_left/350          8.63 ns         8.63 ns     78505005 bytes_per_second=151.164Gi/s
BM_sum_std_fold_left/3502         90.9 ns         90.9 ns      7649494 bytes_per_second=143.574Gi/s
BM_sum_std_fold_left/35023        1026 ns         1025 ns       679782 bytes_per_second=127.277Gi/s
BM_sum_std_fold_left/350234      15973 ns        15972 ns        48874 bytes_per_second=81.6897Gi/s

可以看出,fold_left 优化明显更强,我们有必要再看一眼汇编去确认它是怎么做的

rng::test_medium(std::array<int, 3502ul>):
        // ...
.LBB10_1:
        vpaddd  -992(%rcx,%rax,4), %ymm0, %ymm0
        vpaddd  -960(%rcx,%rax,4), %ymm1, %ymm1
        vpaddd  -928(%rcx,%rax,4), %ymm2, %ymm2
        vpaddd  -896(%rcx,%rax,4), %ymm3, %ymm3
        vpaddd  -864(%rcx,%rax,4), %ymm0, %ymm0
        vpaddd  -832(%rcx,%rax,4), %ymm1, %ymm1
        vpaddd  -800(%rcx,%rax,4), %ymm2, %ymm2
        vpaddd  -768(%rcx,%rax,4), %ymm3, %ymm3
        vpaddd  -736(%rcx,%rax,4), %ymm0, %ymm0
        vpaddd  -704(%rcx,%rax,4), %ymm1, %ymm1
        vpaddd  -672(%rcx,%rax,4), %ymm2, %ymm2
        vpaddd  -640(%rcx,%rax,4), %ymm3, %ymm3
        vpaddd  -608(%rcx,%rax,4), %ymm0, %ymm0
        vpaddd  -544(%rcx,%rax,4), %ymm2, %ymm4
        vpaddd  -576(%rcx,%rax,4), %ymm1, %ymm1
        vpaddd  -512(%rcx,%rax,4), %ymm3, %ymm5
        vpaddd  -480(%rcx,%rax,4), %ymm0, %ymm2
        vpaddd  -448(%rcx,%rax,4), %ymm1, %ymm3
        vpaddd  -416(%rcx,%rax,4), %ymm4, %ymm0
        vpaddd  -384(%rcx,%rax,4), %ymm5, %ymm1
        cmpq    $3577, %rax
        je      .LBB10_3
        vpaddd  -352(%rcx,%rax,4), %ymm2, %ymm2
        vpaddd  -320(%rcx,%rax,4), %ymm3, %ymm3
        vpaddd  -288(%rcx,%rax,4), %ymm0, %ymm0
        vpaddd  -256(%rcx,%rax,4), %ymm1, %ymm1
        vpaddd  -224(%rcx,%rax,4), %ymm2, %ymm2
        vpaddd  -160(%rcx,%rax,4), %ymm0, %ymm4
        vpaddd  -192(%rcx,%rax,4), %ymm3, %ymm3
        vpaddd  -128(%rcx,%rax,4), %ymm1, %ymm5
        vpaddd  -96(%rcx,%rax,4), %ymm2, %ymm0
        vpaddd  -64(%rcx,%rax,4), %ymm3, %ymm1
        vpaddd  -32(%rcx,%rax,4), %ymm4, %ymm2
        vpaddd  (%rcx,%rax,4), %ymm5, %ymm3
        addq    $256, %rax
        jmp     .LBB10_1
        // ...

其实 fold_left 不仅会并行使用多路寄存器,而且还进一步使用(非常暴力的)循环展开。

NOTES:

  • 为什么不用 std::reduce 而用 stdr::fold_left?因为前者的跑分表现非常菜。
  • 如果是 std::reduce+std::execution::unseq 的话,表现和 fold_left 类似。
  • 这里说的是 Clang 搭配 libstdc++ 的情况;而 libc++ 表现没有劣化,但是仍不强于 fold_left
  • 目前 libc++ 场景仍需要启用 -fexperimental-library 才能完成测试。
  • 目前 libc++ 在 ranges 支持上不够完善,不足以完成所有测试。

进一步优化

现在以 fold_left 为目标改进原有的程序。

auto process_simd = [&](auto konstexpr_for, auto simd_view) {
    #pragma clang loop unroll_count(4)
    for(auto &&simd_v : simd_view) {
        // ...
    }
}

首先能想到的是编译器特定的 unroll 选项,结论是对 ranges 起负作用。感兴趣可以自行看下汇编,编译器会生成一堆非常混乱的指令。

// ILP 版本类似,但是问题更加严重
avx::test_medium(std::array<int, 3502ul>):
        // ...
.LBB1_1:
        vpaddd  (%rsi), %ymm0, %ymm0
        movq    %rax, %rdi
        subq    %rsi, %rdi
        leaq    32(%rsi), %r8
        sarq    $2, %rdi
        addq    $-9, %rdi
        cmpq    $-8, %rdi
        cmovaeq %rax, %r8
        cmpq    %rsi, %rax
        cmovneq %r8, %rsi
        cmpq    %rdx, %rsi
        jne     .LBB1_1

进一步调查 AVX/ILP 版本的汇编,发现另一个问题是:它总是在计算莫名其妙的边界,一个合理推测是 ranges 本身的编译器优化不到位。也就是说哪怕是 ILP 版本也是有大量无用指令的。一个简单的解决方案是使用最朴素的 for 循环,不交抽象税。

template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
    constexpr auto operator()(auto &&r) const noexcept {
        auto v = std::forward<decltype(r)>(r) | std::views::all;
        auto n = std::ranges::size(v) / Lane;
        auto i = std::views::iota(size_t{0}, n);
        auto f = [v](auto index) -> decltype(auto) { return v[index * Lane]; };
        return std::views::transform(i, f);
    }
};
template <auto i>
constexpr simdify_t<i> simdify;

当然个人原因我不打算放弃 ranges,重写一份让编译器能更加优化友好的版本(DoD 风格?)。这样我们程序的核心算法完全不需要修改。

avx::test_medium(std::array<int, 3502ul>):
        movl    $704, %eax
        leaq    8(%rsp), %rcx
        vpxor   %xmm0, %xmm0, %xmm0
.LBB1_1:
        vpaddd  -704(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -672(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -640(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -608(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -576(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -544(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -512(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -480(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -448(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -416(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -384(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -352(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -320(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -288(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -256(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -224(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -192(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -160(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -128(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -96(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -64(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -32(%rcx,%rax), %ymm0, %ymm0
        vpaddd  (%rcx,%rax), %ymm0, %ymm0
        addq    $736, %rax
        cmpq    $14688, %rax
        jne     .LBB1_1
        // ...

avx_ilp::test_medium(std::array<int, 3502ul>):
        movl    $992, %eax
        leaq    8(%rsp), %rcx
        vpxor   %xmm0, %xmm0, %xmm0
        vpxor   %xmm1, %xmm1, %xmm1
        vpxor   %xmm2, %xmm2, %xmm2
        vpxor   %xmm3, %xmm3, %xmm3
.LBB5_1:
        vpaddd  -992(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -960(%rcx,%rax), %ymm1, %ymm1
        vpaddd  -928(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -896(%rcx,%rax), %ymm3, %ymm3
        vpaddd  -864(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -832(%rcx,%rax), %ymm1, %ymm1
        vpaddd  -800(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -768(%rcx,%rax), %ymm3, %ymm3
        vpaddd  -736(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -704(%rcx,%rax), %ymm1, %ymm1
        vpaddd  -672(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -640(%rcx,%rax), %ymm3, %ymm3
        vpaddd  -608(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -544(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -576(%rcx,%rax), %ymm1, %ymm1
        vpaddd  -512(%rcx,%rax), %ymm3, %ymm4
        vpaddd  -448(%rcx,%rax), %ymm1, %ymm3
        vpaddd  -480(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -416(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -384(%rcx,%rax), %ymm4, %ymm1
        cmpq    $14304, %rax
        je      .LBB5_3
        vpaddd  -352(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -320(%rcx,%rax), %ymm3, %ymm3
        vpaddd  -288(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -256(%rcx,%rax), %ymm1, %ymm1
        vpaddd  -224(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -160(%rcx,%rax), %ymm0, %ymm0
        vpaddd  -192(%rcx,%rax), %ymm3, %ymm3
        vpaddd  -128(%rcx,%rax), %ymm1, %ymm4
        vpaddd  -64(%rcx,%rax), %ymm3, %ymm1
        vpaddd  -96(%rcx,%rax), %ymm2, %ymm2
        vpaddd  -32(%rcx,%rax), %ymm0, %ymm0
        vpaddd  (%rcx,%rax), %ymm4, %ymm3
        addq    $1024, %rax
        jmp     .LBB5_1
        // ...

修改后可以确认循环展开和多路寄存器都是有的,而且不再有诡异的边界计算。一次性解决了两个问题。简单对比,可以认为 ILP 版本和 fold_left 基本一致,AVX 版本去掉了多路 YMM 并行(只依靠硬件调度)。

--------------------------------------------------------------------------------------
Benchmark                            Time             CPU   Iterations UserCounters...
--------------------------------------------------------------------------------------
BM_sum_avx2/35                   0.968 ns        0.968 ns    670405544 bytes_per_second=134.668Gi/s
BM_sum_avx2/350                   9.82 ns         9.82 ns     68731562 bytes_per_second=132.756Gi/s
BM_sum_avx2/3502                  96.4 ns         96.4 ns      7216600 bytes_per_second=135.349Gi/s
BM_sum_avx2/35023                 1055 ns         1055 ns       651462 bytes_per_second=123.717Gi/s
BM_sum_avx2/350234               15576 ns        15575 ns        49096 bytes_per_second=83.7717Gi/s
BM_sum_avx2_ilp<4>/35            0.952 ns        0.952 ns    715878822 bytes_per_second=136.893Gi/s
BM_sum_avx2_ilp<4>/350            9.79 ns         9.79 ns     71926717 bytes_per_second=133.126Gi/s
BM_sum_avx2_ilp<4>/3502           85.1 ns         85.1 ns      8083910 bytes_per_second=153.384Gi/s
BM_sum_avx2_ilp<4>/35023          1047 ns         1045 ns       650510 bytes_per_second=124.797Gi/s
BM_sum_avx2_ilp<4>/350234        15939 ns        15938 ns        48235 bytes_per_second=81.863Gi/s
BM_sum_std_fold_left/35           1.05 ns         1.05 ns    663313893 bytes_per_second=124.056Gi/s
BM_sum_std_fold_left/350          8.74 ns         8.74 ns     78781858 bytes_per_second=149.122Gi/s
BM_sum_std_fold_left/3502         91.0 ns         91.0 ns      7675929 bytes_per_second=143.435Gi/s
BM_sum_std_fold_left/35023        1115 ns         1113 ns       630869 bytes_per_second=117.201Gi/s
BM_sum_std_fold_left/350234      16219 ns        16218 ns        45655 bytes_per_second=80.449Gi/s

另外可以看出,此时 AVX 版本已经和 ILP 版本没有性能差异。也即是说,即使只用单个 YMM 寄存器,Zen 3 处理器的硬件调度仍然是非常可靠的,在编译器优化到位的前提下,不需要显式手写 ILP 设计。

浮点 benchmark
--------------------------------------------------------------------------------------
Benchmark                            Time             CPU   Iterations UserCounters...
--------------------------------------------------------------------------------------
BM_sum_avx2/35                    1.46 ns         1.46 ns    472044277 bytes_per_second=89.2223Gi/s
BM_sum_avx2/350                   12.2 ns         12.2 ns     55809571 bytes_per_second=106.701Gi/s
BM_sum_avx2/3502                   274 ns          274 ns      2538655 bytes_per_second=47.5458Gi/s
BM_sum_avx2/35023                 3090 ns         3089 ns       229860 bytes_per_second=42.2333Gi/s
BM_sum_avx2/350234               31352 ns        31351 ns        21813 bytes_per_second=41.6167Gi/s
BM_sum_avx2_ilp<4>/35             1.38 ns         1.38 ns    492076739 bytes_per_second=94.4973Gi/s
BM_sum_avx2_ilp<4>/350            12.0 ns         12.0 ns     57376116 bytes_per_second=108.563Gi/s
BM_sum_avx2_ilp<4>/3502           89.9 ns         89.9 ns      7725717 bytes_per_second=145.178Gi/s
BM_sum_avx2_ilp<4>/35023          1044 ns         1044 ns       651714 bytes_per_second=125.029Gi/s
BM_sum_avx2_ilp<4>/350234        15670 ns        15670 ns        48038 bytes_per_second=83.2645Gi/s
BM_sum_std_fold_left/35           1.39 ns         1.39 ns    503907127 bytes_per_second=93.8597Gi/s
BM_sum_std_fold_left/350          10.6 ns         10.6 ns     66140767 bytes_per_second=123.357Gi/s
BM_sum_std_fold_left/3502         93.7 ns         93.7 ns      7384452 bytes_per_second=139.258Gi/s
BM_sum_std_fold_left/35023        1053 ns         1053 ns       641127 bytes_per_second=123.88Gi/s
BM_sum_std_fold_left/350234      15627 ns        15627 ns        47696 bytes_per_second=83.4937Gi/s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// 需要明确启用 -ffast-math 编译,否则标准库吞吐量是个位数
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
#include <array>
#include <vector>
#include <random>

template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
    [&]<auto I>(this auto &&self) {
        if constexpr (I < Last) {
            f.template operator()<I>();
            self.template operator()<I + 1>();
        }
    }.template operator()<First>();
};

template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
    constexpr auto operator()(auto &&r) const noexcept {
        auto v = std::forward<decltype(r)>(r) | std::views::all;
        auto n = std::ranges::size(v) / Lane;
        auto i = std::views::iota(size_t{0}, n);
        auto f = [v](auto index) -> decltype(auto) { return v[index * Lane]; };
        return std::views::transform(i, f);
    }
};
template <auto i>
constexpr simdify_t<i> simdify;

float sum_avx2(std::ranges::range auto &&rng) {
    constexpr auto lane = sizeof(__m256) / sizeof(float);
    __m256 partial_sum {};

    auto simd_view = rng
                   | simdify<lane>;
    for(auto &&simd_v : simd_view) {
        auto addr = &simd_v;
        auto loadu = _mm256_loadu_ps(addr);
        partial_sum = _mm256_add_ps(partial_sum, loadu);
    }

    auto scalar_view = rng
                     | std::views::drop(lane * std::ranges::size(simd_view));
    float sum = std::ranges::fold_left(scalar_view, 0.0f, std::plus());

    float temp[lane];
    _mm256_storeu_ps(std::ranges::data(temp), partial_sum);
    sum = std::ranges::fold_left(temp, sum, std::plus());

    return sum;
}

template <size_t ILP = 4>
float sum_avx2_ilp(std::ranges::range auto &&rng) {
    constexpr auto lane = sizeof(__m256) / sizeof(float);
    constexpr auto bulk = lane * ILP;
    __m256 partial_sum[ILP] {};
    auto process_simd = [&](auto konstexpr_for, auto simd_view) {
        for(auto &&simd_v : simd_view) {
            konstexpr_for([&, addr = &simd_v]<auto Index> {
                auto &partial = partial_sum[Index];
                auto loadu = _mm256_loadu_ps(addr + Index * lane);
                partial = _mm256_add_ps(partial, loadu);
            });
        }
    };

    // SIMD+ILP
    auto bulk_simd_view = rng
                        | simdify<lane>
                        | simdify<ILP>;
    process_simd(constexpr_for<0, ILP>, bulk_simd_view);

    // SIMD
    auto single_simd_view = rng
                          | std::views::drop(bulk * std::ranges::size(bulk_simd_view))
                          | simdify<lane>;
    process_simd(constexpr_for<0, 1>, single_simd_view);

    auto scalar_view = rng
                     | std::views::drop(bulk * std::ranges::size(bulk_simd_view))
                     | std::views::drop(lane * std::ranges::size(single_simd_view));
    float sum = std::ranges::fold_left(scalar_view, 0.0f, std::plus());

    constexpr_for<1, ILP>([&]<size_t Index> {
        partial_sum[0] = _mm256_add_ps(partial_sum[0], partial_sum[Index]);
    });
    float temp[lane];
    _mm256_storeu_ps(std::ranges::data(temp), partial_sum[0]);
    sum = std::ranges::fold_left(temp, sum, std::plus());

    return sum;
}

// ----------------------------------------------------------------------------
// Google Benchmark
// ----------------------------------------------------------------------------

template <size_t Size>
const auto& generate_data() {
    static const auto arr = [] {
        std::array<float, Size> result;
        std::mt19937 gen{std::random_device{}()};
        // No overflow.
        std::uniform_real_distribution<float> dist(-10, 10);
        std::ranges::generate(result, [&] { return dist(gen); });
        return result;
    } ();
    return arr;
}

template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
    const auto &rng = generate_data<Size>();

    for(auto _ : state) {
        auto res = func(rng);
        benchmark::DoNotOptimize(res);
    }

    state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(float));
}

template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
                    auto name, auto func) {
    (benchmark::RegisterBenchmark(
        std::string(name) + "/" + std::to_string(Is),
        [func](benchmark::State& state) {
            BM_run<Is>(state, func);
        }
    ), ...);
}

int main(int argc, char** argv) {
    std::integer_sequence<size_t,
        35,
        350,
        3502,
        35023,
        350234
    > seq;

    auto register_test = [seq](auto name, auto func) {
        register_tests(seq, name, func);
    };

    register_test("BM_sum_avx2",
        [](const auto &r) { return sum_avx2(r); });

    register_test("BM_sum_avx2_ilp<4>",
        [](const auto &r) { return sum_avx2_ilp<4>(r); });

    register_test("BM_sum_std_fold_left",
        [](const auto &r) { return std::ranges::fold_left(r, 0.0f, std::plus()); });

    benchmark::Initialize(&argc, argv);
    benchmark::RunSpecifiedBenchmarks();
    benchmark::Shutdown();
    return 0;
}
更复杂的并行 scan(前缀和)benchmark
---------------------------------------------------------------------------------------
Benchmark                             Time             CPU   Iterations UserCounters...
---------------------------------------------------------------------------------------
BM_scan_avx2/35                    7.26 ns         7.26 ns     94313812 bytes_per_second=17.9496Gi/s
BM_scan_avx2/350                    118 ns          118 ns      5832606 bytes_per_second=11.0215Gi/s
BM_scan_avx2/3502                  1367 ns         1366 ns       515752 bytes_per_second=9.54705Gi/s
BM_scan_avx2/35023                13817 ns        13808 ns        50688 bytes_per_second=9.4486Gi/s
BM_scan_avx2/350234              142191 ns       142092 ns         4985 bytes_per_second=9.18223Gi/s
BM_scan_ilp<4>/35                  6.35 ns         6.35 ns    109700105 bytes_per_second=20.5368Gi/s
BM_scan_ilp<4>/350                 54.6 ns         54.6 ns     11938873 bytes_per_second=23.8893Gi/s
BM_scan_ilp<4>/3502                 528 ns          527 ns      1328081 bytes_per_second=24.77Gi/s
BM_scan_ilp<4>/35023               5243 ns         5239 ns       132626 bytes_per_second=24.9029Gi/s
BM_scan_ilp<4>/350234             53049 ns        53004 ns        13056 bytes_per_second=24.6155Gi/s
BM_simple/35                       5.68 ns         5.67 ns    124240372 bytes_per_second=23.0068Gi/s
BM_simple/350                      80.4 ns         80.5 ns      8581244 bytes_per_second=16.191Gi/s
BM_simple/3502                      857 ns          858 ns       823705 bytes_per_second=15.2046Gi/s
BM_simple/35023                    8386 ns         8405 ns        83875 bytes_per_second=15.5227Gi/s
BM_simple/350234                  81456 ns        85884 ns         8071 bytes_per_second=15.1916Gi/s
BM_std_partial_sum/35              5.46 ns         5.73 ns    119831285 bytes_per_second=22.7636Gi/s
BM_std_partial_sum/350             79.7 ns         83.4 ns      8124839 bytes_per_second=15.635Gi/s
BM_std_partial_sum/3502             811 ns          845 ns       831782 bytes_per_second=15.4415Gi/s
BM_std_partial_sum/35023           8081 ns         8376 ns        84210 bytes_per_second=15.576Gi/s
BM_std_partial_sum/350234         83362 ns        86160 ns         8116 bytes_per_second=15.1431Gi/s
BM_std_inclusive_scan/35           5.48 ns         5.65 ns    121750793 bytes_per_second=23.0835Gi/s
BM_std_inclusive_scan/350          77.5 ns         79.7 ns      8456195 bytes_per_second=16.3676Gi/s
BM_std_inclusive_scan/3502          830 ns          847 ns       813811 bytes_per_second=15.41Gi/s
BM_std_inclusive_scan/35023        8272 ns         8432 ns        80930 bytes_per_second=15.4735Gi/s
BM_std_inclusive_scan/350234      84073 ns        85544 ns         8200 bytes_per_second=15.2522Gi/s
BM_std_execution/35                5.59 ns         5.67 ns    123962094 bytes_per_second=22.977Gi/s
BM_std_execution/350               80.4 ns         81.5 ns      8459091 bytes_per_second=15.9979Gi/s
BM_std_execution/3502               835 ns          845 ns       818100 bytes_per_second=15.4332Gi/s
BM_std_execution/35023             8279 ns         8351 ns        80787 bytes_per_second=15.624Gi/s
BM_std_execution/350234           84563 ns        85395 ns         7943 bytes_per_second=15.2786Gi/s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
/// For test.
#include <array>
#include <random>
#include <tuple>
#include <numeric>
#include <execution>

// 改为返回元数据(size)
template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
    static_assert(First <= Last);
    return [&]<auto I>(this auto &&self) {
        if constexpr (I < Last) {
            f.template operator()<I>();
            return self.template operator()<I + 1>();
        } else {
            return Last - First;
        }
    }.template operator()<First>();
};


template <size_t Lane>
struct simdify_t: std::ranges::range_adaptor_closure<simdify_t<Lane>> {
    constexpr auto operator()(auto &&r) const noexcept {
        auto v = std::forward<decltype(r)>(r) | std::views::all;
        auto n = std::ranges::size(v) / Lane;
        auto i = std::views::iota(size_t{0}, n);
        auto f = [v](auto index) -> decltype(auto) {
            return v[index * Lane];
        };
        return std::views::transform(i, f);
    }
};
template <auto i>
constexpr simdify_t<i> simdify;

template <size_t ILP = 4>
int scan_ilp(std::ranges::range auto &&rng) {
    constexpr auto lane = sizeof(__m256i) / sizeof(int);
    constexpr auto bulk = ILP * lane;
    auto inner_scan = [](const __m256i &v1) {
        // X2:
        // h g f e | d c b a (from v)
        // g f e 0 | c b a 0 (from s)
        // =>
        //  ...    | c+d,b+c,a+b,a
        auto s1 = _mm256_slli_si256(v1, sizeof(int));
        auto v2 = _mm256_add_epi32(v1, s1);

        // X4:
        //  ...    | c+d,b+c,a+b,a
        //  ...    | a+b,a
        // =>
        //  ...    | a+b+c+d,a+b+c,a+b,a
        auto s2 = _mm256_slli_si256(v2, 2 * sizeof(int));
        auto v3 = _mm256_add_epi32(v2, s2);

        // X8:
        // Cross lane:
        // a+b+c+d,a+b+c,a+b+c+d,a+b+c | a+b+c+d,a+b+c,a+b+c+d,a+b+c
        auto p3 = _mm256_permute4x64_epi64(v3, _MM_SHUFFLE(1, 1, 1, 1));
        // Broadcast:
        // a+b+c+d,a+b+c+d,a+b+c+d,a+b+c+d | a+b+c+d,a+b+c+d,a+b+c+d,a+b+c+d
        auto s3 = _mm256_shuffle_epi32(p3, _MM_SHUFFLE(3, 3, 3, 3));
        // a+b+c+d+e+f+g+h,a+b+c+d+e+f+g,a+b+c+d+e+f,a+b+c+d+e | XXXX
        auto a3 = _mm256_add_epi32(v3, s3);

        // What we need.
        return _mm256_blend_epi32(v3, a3, 0b11110000);
    };
    auto get_carry = [](const __m256i &result) {
        auto buffer = _mm256_permute4x64_epi64(result, _MM_SHUFFLE(3, 3, 3, 3));
        return _mm256_shuffle_epi32(buffer, _MM_SHUFFLE(1, 1, 1, 1));
    };
    auto sum = _mm256_setzero_si256();

    auto process_simd = [&](auto static_for, auto simd_view) {
        constexpr auto size = static_for([]<auto>{});
        __m256i results[size] {};
        __m256i carries[size] {};
        for(auto &&simd_v : simd_view) {
            static_for([&, addr = &simd_v]<auto Index> {
                auto loadu = _mm256_loadu_si256((__m256i*)(addr + Index * lane));
                results[Index] = inner_scan(loadu);
            });
            static_for([&]<auto Index> {
                carries[Index] = get_carry(results[Index]);
            });
            static_for([&, addr = &simd_v]<auto Index> {
                auto result = _mm256_add_epi32(results[Index], sum);
                _mm256_storeu_si256((__m256i*)(addr + Index * lane), result);
                sum = _mm256_add_epi32(sum, carries[Index]);
            });
        }
    };

    auto bulk_simd_view = rng
                        | simdify<lane>
                        | simdify<ILP>;
    process_simd(constexpr_for<0, ILP>, bulk_simd_view);

    auto single_simd_view = rng
                          | std::views::drop(bulk * std::ranges::size(bulk_simd_view))
                          | simdify<lane>;
    process_simd(constexpr_for<0, 1>, single_simd_view);

    auto scalar_view = rng
                     | std::views::drop(bulk * std::ranges::size(bulk_simd_view))
                     | std::views::drop(lane * std::ranges::size(single_simd_view));
    std::inclusive_scan(std::ranges::begin(scalar_view),
                        std::ranges::end(scalar_view),
                        std::ranges::begin(scalar_view),
                        std::plus(),
                        _mm256_cvtsi256_si32(sum));
    return 0;
}

int scan_avx2(std::ranges::range auto &&rng) {
    constexpr auto lane = sizeof(__m256i) / sizeof(int);
    auto inner_scan = [](const __m256i &v1) {
        auto s1 = _mm256_slli_si256(v1, sizeof(int));
        auto v2 = _mm256_add_epi32(v1, s1);
        auto s2 = _mm256_slli_si256(v2, 2 * sizeof(int));
        auto v3 = _mm256_add_epi32(v2, s2);
        auto p3 = _mm256_permute4x64_epi64(v3, _MM_SHUFFLE(1, 1, 1, 1));
        auto s3 = _mm256_shuffle_epi32(p3, _MM_SHUFFLE(3, 3, 3, 3));
        auto a3 = _mm256_add_epi32(v3, s3);
        return _mm256_blend_epi32(v3, a3, 0b11110000);
    };
    auto get_carry = [](const __m256i &result) {
        auto buffer = _mm256_permute4x64_epi64(result, _MM_SHUFFLE(3, 3, 3, 3));
        return _mm256_shuffle_epi32(buffer, _MM_SHUFFLE(1, 1, 1, 1));
    };
    auto sum = _mm256_setzero_si256();
    auto single_simd_view = rng
                          | simdify<lane>;
    for(auto &&simd_v : single_simd_view) {
        auto data = (__m256i*)&simd_v;
        auto block_v = _mm256_loadu_si256(data);
        auto result = inner_scan(block_v);
        auto carry = get_carry(result);
        result = _mm256_add_epi32(result, sum);
        _mm256_storeu_si256(data, result);
        sum = _mm256_add_epi32(sum, carry);
    }

    auto scalar_view = rng
                     | std::views::drop(lane * std::ranges::size(single_simd_view));
    std::inclusive_scan(std::ranges::begin(scalar_view),
                        std::ranges::end(scalar_view),
                        std::ranges::begin(scalar_view),
                        std::plus(),
                        _mm256_cvtsi256_si32(sum));
    return 0;
}

int simple(std::ranges::range auto &&rng) {
    int sum = 0;
    for(auto &v : rng) {
        sum += v;
        v = sum;
    }
    return 0;
}

// ----------------------------------------------------------------------------
// Google Benchmark
// ----------------------------------------------------------------------------

template <size_t Size>
const auto& generate_data() {
    static const auto arr = [] {
        std::array<int, Size> result;
        std::mt19937 gen{std::random_device{}()};
        // No overflow.
        std::uniform_int_distribution dist(-10, 10);
        std::ranges::generate(result, [&] { return dist(gen); });
        return result;
    } ();
    return arr;
}

template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
    // Copy.
    auto rng = generate_data<Size>();
    for(auto _ : state) {
        auto res = func(rng);
        benchmark::DoNotOptimize(res);
        benchmark::DoNotOptimize(std::ranges::data(rng));
    }

    state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(int));
}

template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
                    auto name, auto func) {
    (benchmark::RegisterBenchmark(
        std::string(name) + "/" + std::to_string(Is),
        [func](benchmark::State& state) {
            BM_run<Is>(state, func);
        }
    ), ...);
}

int main(int argc, char** argv) {
    std::integer_sequence<size_t,
        35,
        350,
        3502,
        35023,
        350234
    > seq;

    auto register_test = [seq](auto name, auto func) {
        register_tests(seq, name, func);
    };

    register_test("BM_scan_avx2",
        [](auto &r) { return scan_avx2(r); });

    register_test("BM_scan_ilp<1>",
        [](auto &r) { return scan_ilp<1>(r); });

    register_test("BM_scan_ilp<2>",
        [](auto &r) { return scan_ilp<2>(r); });

    register_test("BM_scan_ilp<4>",
        [](auto &r) { return scan_ilp<4>(r); });

    register_test("BM_scan_ilp<6>",
        [](auto &r) { return scan_ilp<6>(r); });

    register_test("BM_scan_ilp<8>",
        [](auto &r) { return scan_ilp<8>(r); });

    register_test("BM_simple",
        [](auto &r) { return simple(r); });

    register_test("BM_std_partial_sum", [](auto &r) {
        return std::partial_sum(std::ranges::begin(r),
                                std::ranges::end(r),
                                std::ranges::begin(r));
    });

    register_test("BM_std_inclusive_scan", [](auto &r) {
        return std::inclusive_scan(std::ranges::begin(r),
                                   std::ranges::end(r),
                                   std::ranges::begin(r),
                                   std::plus(), 0);
    });

    register_test("BM_std_execution", [](auto &r) {
        return std::inclusive_scan(std::execution::unseq,
                                   std::ranges::begin(r),
                                   std::ranges::end(r),
                                   std::ranges::begin(r),
                                   std::plus(), 0);
    });

    benchmark::Initialize(&argc, argv);
    benchmark::RunSpecifiedBenchmarks();
    benchmark::Shutdown();
    return 0;
}
过于用力的 LUT(ASCII 查找表)benchmark
----------------------------------------------------------------------------------
Benchmark                        Time             CPU   Iterations UserCounters...
----------------------------------------------------------------------------------
BM_lookup_avx2/35             4.46 ns         4.26 ns    163460034 bytes_per_second=7.65192Gi/s
BM_lookup_avx2/350            56.3 ns         53.8 ns     12806205 bytes_per_second=6.05853Gi/s
BM_lookup_avx2/3502            419 ns          401 ns      1767725 bytes_per_second=8.13886Gi/s
BM_lookup_avx2/35023          4299 ns         4110 ns       170806 bytes_per_second=7.93551Gi/s
BM_lookup_avx2/350234        45226 ns        43236 ns        16284 bytes_per_second=7.54423Gi/s
BM_lookup_ilp<4>/35           4.44 ns         4.24 ns    165368195 bytes_per_second=7.68365Gi/s
BM_lookup_ilp<4>/350          51.0 ns         48.8 ns     14175979 bytes_per_second=6.68431Gi/s
BM_lookup_ilp<4>/3502          517 ns          494 ns      1373566 bytes_per_second=6.60049Gi/s
BM_lookup_ilp<4>/35023        4916 ns         4696 ns       146758 bytes_per_second=6.94512Gi/s
BM_lookup_ilp<4>/350234      52513 ns        50197 ns        13764 bytes_per_second=6.49798Gi/s
BM_lookup_scalar/35           17.0 ns         16.3 ns     41531748 bytes_per_second=2.00183Gi/s
BM_lookup_scalar/350          86.4 ns         82.6 ns      8464803 bytes_per_second=3.94739Gi/s
BM_lookup_scalar/3502          783 ns          749 ns       942863 bytes_per_second=4.35576Gi/s
BM_lookup_scalar/35023        8002 ns         7643 ns        92445 bytes_per_second=4.26769Gi/s
BM_lookup_scalar/350234      80248 ns        76722 ns         8937 bytes_per_second=4.25149Gi/s
BM_std_transform/35           17.0 ns         16.2 ns     42159156 bytes_per_second=2.00726Gi/s
BM_std_transform/350          87.9 ns         84.0 ns      8537585 bytes_per_second=3.87842Gi/s
BM_std_transform/3502          779 ns          745 ns       938266 bytes_per_second=4.38055Gi/s
BM_std_transform/35023        8372 ns         7972 ns        91165 bytes_per_second=4.09155Gi/s
BM_std_transform/350234      81403 ns        77818 ns         8972 bytes_per_second=4.1916Gi/s
BM_std_execution/35           17.0 ns         16.3 ns     42631765 bytes_per_second=2.00524Gi/s
BM_std_execution/350          86.4 ns         82.6 ns      8399217 bytes_per_second=3.94398Gi/s
BM_std_execution/3502          783 ns          748 ns       940645 bytes_per_second=4.35812Gi/s
BM_std_execution/35023        7972 ns         7619 ns        90280 bytes_per_second=4.28105Gi/s
BM_std_execution/350234      81110 ns        77531 ns         9081 bytes_per_second=4.20712Gi/s
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#include <benchmark/benchmark.h>
#include <x86intrin.h>
#include <algorithm>
#include <ranges>
#include <cassert>
/// For test.
#include <array>
#include <iostream>
#include <vector>
#include <random>
#include <chrono>
#include <tuple>
#include <execution>

namespace stdr = std::ranges;
namespace stdv = std::views;

template <auto First, auto Last>
constexpr auto constexpr_for = [](auto &&f) {
    [&]<auto I>(this auto &&self) {
        if constexpr (I < Last) {
            f.template operator()<I>();
            self.template operator()<I + 1>();
        }
    }.template operator()<First>();
};

template <size_t Lane>
struct simdify_t: stdr::range_adaptor_closure<simdify_t<Lane>> {
    constexpr auto operator()(auto &&r) const noexcept {
        auto v = std::forward<decltype(r)>(r) | stdv::all;
        auto n = stdr::size(v) / Lane;
        auto i = stdv::iota(size_t{0}, n);
        auto f = [v](auto index) -> decltype(auto) {
            return v[index * Lane];
        };
        return stdv::transform(i, f);
    }
};
template <auto i>
constexpr simdify_t<i> simdify;

template <size_t ILP = 4>
void lookup_ilp(stdr::range auto &&source, stdr::range auto &&lookup_table) {
    assert(stdr::size(lookup_table) >= 256 && "We need a char-width table.");
    constexpr auto table_size = 256;
    constexpr auto lane = sizeof(__m256i);
    constexpr auto bulk = ILP * lane;

    __m256i luts[table_size / sizeof(__m128i)];
    constexpr_for<0, stdr::size(luts)>([&, data = stdr::data(lookup_table)]<auto Index> {
        auto buffer128 = _mm_loadu_si128((__m128i*)(data) + Index);
        luts[Index] = _mm256_broadcastsi128_si256(buffer128);
    });

    const auto simd_zero = _mm256_setzero_si256();
    auto reduce = [&](__m256i *simd_addr) {
        // Reused in each reduction.
        // The first round of reduction uses size(luts)/2.
        // The second round uses size(luts)/4, size(luts)/8, ...
        __m256i arenas[stdr::size(luts) / 2];
        auto full = _mm256_loadu_si256(simd_addr);
        auto nibble = _mm256_and_si256(full, _mm256_set1_epi8((char)0x0f));

        // Start from bit 4 to bit 7.
        constexpr auto bias = 4;
        constexpr auto for_each_bit = constexpr_for<4, 8>;
        for_each_bit([&]<auto Bit> {
            constexpr auto bit_shift = static_cast<char>(1 << Bit);
            auto test1 = _mm256_and_si256(full, _mm256_set1_epi8(bit_shift));
            auto blend_mask = _mm256_cmpeq_epi8(test1, simd_zero);

            // Bit 4 as the first round.
            constexpr auto round = Bit - bias;
            constexpr auto tree_depth = stdr::size(arenas) >> round;
            constexpr auto tree_reduce = constexpr_for<0, tree_depth>;
            tree_reduce([&]<auto Level> {
                auto emit = [&]<auto i> /* -> decltype(auto) */ {
                    if constexpr (round > 0) return arenas[i];
                    else return _mm256_shuffle_epi8(luts[i], nibble);
                };
                auto &&lo = emit.template operator()<Level * 2>();
                auto &&hi = emit.template operator()<Level * 2 + 1>();
                arenas[Level] = _mm256_blendv_epi8(hi, lo, blend_mask);
            });
        });
        _mm256_storeu_si256(simd_addr, arenas[0]);
    };

    auto process_simd = [&](auto konstexpr_for, auto simd_view) {
        for(auto &&simd_v : simd_view) {
            konstexpr_for([&, addr = &simd_v]<auto Index> {
                reduce((__m256i*)(addr) + Index);
            });
        }
    };

    auto bulk_simd_view = source | simdify<lane> | simdify<ILP>;
    process_simd(constexpr_for<0, ILP>, bulk_simd_view);

    auto single_simd_view = source
                          | stdv::drop(bulk * stdr::size(bulk_simd_view))
                          | simdify<lane>;
    process_simd(constexpr_for<0, 1>, single_simd_view);

    auto scalar_view = source
                     | stdv::drop(bulk * stdr::size(bulk_simd_view))
                     | stdv::drop(lane * stdr::size(single_simd_view));
    for(auto &v : scalar_view) {
        v = lookup_table[v];
    }
}

void lookup_avx2(stdr::range auto &&source, stdr::range auto &&lookup_table) {
    assert(stdr::size(lookup_table) >= 256 && "We need a char-width table.");
    constexpr auto table_size = 256;
    constexpr auto lane = sizeof(__m256i);
    __m256i luts[table_size / sizeof(__m128i)];
    constexpr_for<0, stdr::size(luts)>([&, data = stdr::data(lookup_table)]<auto Index> {
        auto buffer128 = _mm_loadu_si128((__m128i*)(data) + Index);
        luts[Index] = _mm256_broadcastsi128_si256(buffer128);
    });

    // Reused in each reduction.
    // The first round of reduction uses size(luts)/2.
    // The second round uses size(luts)/4, size(luts)/8, ...
    __m256i arenas[stdr::size(luts) / 2];

    auto simd_zero = _mm256_setzero_si256();
    auto simd_view = source | simdify<lane>;
    for(auto &&simd_v : simd_view) {
        auto addr = &simd_v;
        auto full = _mm256_loadu_si256((__m256i*)addr);
        auto nibble = _mm256_and_si256(full, _mm256_set1_epi8((char)0x0f));

        // Start from bit 4 to bit 7.
        constexpr auto bias = 4;
        constexpr auto for_each_bit = constexpr_for<4, 8>;
        for_each_bit([&]<auto Bit> {
            constexpr auto bit_shift = static_cast<char>(1 << Bit);
            auto test1 = _mm256_and_si256(full, _mm256_set1_epi8(bit_shift));
            auto blend_mask = _mm256_cmpeq_epi8(test1, simd_zero);

            // Bit 4 as the first round.
            constexpr auto round = Bit - bias;
            constexpr auto tree_depth = stdr::size(arenas) >> round;
            constexpr auto tree_reduce = constexpr_for<0, tree_depth>;
            tree_reduce([&]<auto Level> {
                auto emit = [&]<auto i> /* -> decltype(auto) */ {
                    if constexpr (round > 0) return arenas[i];
                    else return _mm256_shuffle_epi8(luts[i], nibble);
                };
                auto &&lo = emit.template operator()<Level * 2>();
                auto &&hi = emit.template operator()<Level * 2 + 1>();
                arenas[Level] = _mm256_blendv_epi8(hi, lo, blend_mask);
            });
        });
        _mm256_storeu_si256((__m256i*)addr, arenas[0]);
    }
    auto scalar_view = source
                     | stdv::drop(lane * stdr::size(simd_view));
    for(auto &v : scalar_view) {
        v = lookup_table[v];
    }
}

void lookup_scalar(auto &&rng, auto &&lut) {
    for(auto &v : rng) {
        v = lut[v];
    }
}

void lookup_std_transform(auto &&rng, auto &&lut) {
    std::transform(stdr::begin(rng), stdr::end(rng), stdr::begin(rng),
        [&](auto idx) { return lut[idx]; });
}

void lookup_std_execution(auto &&rng, auto &&lut) {
    std::transform(std::execution::unseq,
        stdr::begin(rng), stdr::end(rng), stdr::begin(rng),
        [&](auto idx) { return lut[idx]; });
}

// ----------------------------------------------------------------------------
// Google Benchmark Infrastructure
// ----------------------------------------------------------------------------

// Generate constant random LUT (256 bytes)
const auto& get_lut() {
    static const auto arr = [] {
        std::array<uint8_t, 256> result;
        std::mt19937 gen{std::random_device{}()};
        std::uniform_int_distribution<int> dist(0, 255);
        stdr::generate(result, [&] { return static_cast<uint8_t>(dist(gen)); });
        return result;
    }();
    return arr;
}

// Generate random source data
template <size_t Size>
const auto& generate_data() {
    static const auto arr = [] {
        std::array<uint8_t, Size> result;
        std::mt19937 gen{std::random_device{}()};
        std::uniform_int_distribution<int> dist(0, 255);
        stdr::generate(result, [&] { return static_cast<uint8_t>(dist(gen)); });
        return result;
    }();
    return arr;
}

template <size_t Size>
void BM_run(benchmark::State& state, auto &&func) {
    // 1. Get static const data (no cost here)
    const auto& source_template = generate_data<Size>();
    const auto& lut = get_lut();

    for(auto _ : state) {
        // 2. Make a mutable copy on the stack
        // Note: This copy overhead is included in the benchmark time.
        // Since lookup is O(N) and copy is O(N), this dilutes the speedup ratio
        // but keeps the code structure simple as requested.
        auto rng = source_template;

        // 3. Run algo
        func(rng, lut);

        // 4. Prevent optimization
        benchmark::DoNotOptimize(rng);
    }

    state.SetBytesProcessed(int64_t(state.iterations()) * int64_t(Size) * sizeof(uint8_t));
}

template <size_t ...Is>
void register_tests(std::integer_sequence<size_t, Is...>,
                    auto name, auto func) {
    (benchmark::RegisterBenchmark(
        std::string(name) + "/" + std::to_string(Is),
        [func](benchmark::State& state) {
            BM_run<Is>(state, func);
        }
    ), ...);
}

int main(int argc, char** argv) {
    std::integer_sequence<size_t,
        35,
        350,
        3502,
        35023,
        350234
    > seq;

    auto register_test = [seq](auto name, auto func) {
        register_tests(seq, name, func);
    };

    register_test("BM_lookup_avx2",
        [](auto &r, auto &lut) { lookup_avx2(r, lut); });

    register_test("BM_lookup_ilp<4>",
        [](auto &r, auto &lut) { lookup_ilp<4>(r, lut); });

    register_test("BM_lookup_scalar",
        [](auto &r, auto &lut) { lookup_scalar(r, lut); });

    register_test("BM_std_transform",
        [](auto &r, auto &lut) { lookup_std_transform(r, lut); });

    register_test("BM_std_execution",
        [](auto &r, auto &lut) { lookup_std_execution(r, lut); });

    benchmark::Initialize(&argc, argv);
    benchmark::RunSpecifiedBenchmarks();
    benchmark::Shutdown();
    return 0;
}

NOTE: 如果 benchmark 使用延迟更高的指令,显式 ILP 仍然有性能提升。详见上方数据。

总结

基本上没别的事情了,因为向量求和算法很固定,所以自动向量化本来挺好的。

但是如果考虑手写/混用 SIMD intrinsic 的话,开发者还是要有手段去控制编译器的优化行为。constexpr_for 是一个极好的控制 ILP 的工具,它是现代且可移植的写法。

处理器的硬件调度能力也是个考虑点,Zen 3 的表现就是优化(暴力展开)到位了,显式 ILP 无所谓。但是前面在你不知道性能瓶颈的时候,简单的显式 ILP 设计就能把性能提升数倍,还不需要做剖析(本文有基线,省了 perf 环节)。考虑到其他更加复杂的算法和延迟更高的向量指令,显式 ILP 的做法仍然是非常的甜点。

std::ranges 的组合玩法很好,但是不看汇编的话你不知道有什么坑。本文中的 ILP 程序性能瓶颈就是 ranges 导致的。