背景

字符串处理和无分支编程是相当矛盾的需求(总之我不会),ClickHouse 给了一个无分支向量处理 UTF-8 验证的算法,学学人家怎么做的。

规范

Code Points First Byte Second Byte Third Byte Fourth Byte
U+0000..U+007F 00..7F      
U+0080..U+07FF C2..DF 80..BF    
U+0800..U+0FFF E0 A0..BF 80..BF  
U+1000..U+CFFF E1..EC 80..BF 80..BF  
U+D000..U+D7FF ED 80..9F 80..BF  
U+E000..U+FFFF EE..EF 80..BF 80..BF  
U+10000..U+3FFFF F0 90..BF 80..BF 80..BF
U+40000..U+FFFFF F1..F3 80..BF 80..BF 80..BF
U+100000..U+10FFFF F4 80..8F 80..BF 80..BF

上表给出了合规的 UTF-8 范围。需要注意这几点:

  • UTF-8 编码的字符所占字节数是变长的,取决于第一个字节,并且最多占用四字节。
  • 第一个字节不允许存在 0xC0、0xC1 和 [0xF5, 0xFF]。
  • 第二、第三和第四个字节只允许存在 [0x80, 0xBF]。
  • 第一个字节取 0xE0、0xED、0xF0 和 0xF4 时存在特殊情况,详见表格加粗部分。

实现

// https://github.com/ClickHouse/ClickHouse/blob/v25.1.1.1-new/src/Functions/isValidUTF8.cpp#L70
static UInt8 isValidUTF8(const UInt8 * data, UInt64 len)
{
    /*
    * Map high nibble of "First Byte" to legal character length minus 1
    * 0x00 ~ 0xBF --> 0
    * 0xC0 ~ 0xDF --> 1
    * 0xE0 ~ 0xEF --> 2
    * 0xF0 ~ 0xFF --> 3
    */
    const __m128i first_len_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 3);

    /* Map "First Byte" to 8-th item of range table (0xC2 ~ 0xF4) */
    const __m128i first_range_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8);

    /*
    * Range table, map range index to min and max values
    */
    const __m128i range_min_tbl
        = _mm_setr_epi8(0x00, 0x80, 0x80, 0x80, 0xA0, 0x80, 0x90, 0x80,
                        0xC2, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F);

    const __m128i range_max_tbl
        = _mm_setr_epi8(0x7F, 0xBF, 0xBF, 0xBF, 0xBF, 0x9F, 0xBF, 0x8F,
                        0xF4, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80);

    /*
    * Tables for fast handling of four special First Bytes(E0,ED,F0,F4), after
    * which the Second Byte are not 80~BF. It contains "range index adjustment".
    * +------------+---------------+------------------+----------------+
    * | First Byte | original range| range adjustment | adjusted range |
    * +------------+---------------+------------------+----------------+
    * | E0         | 2             | 2                | 4              |
    * +------------+---------------+------------------+----------------+
    * | ED         | 2             | 3                | 5              |
    * +------------+---------------+------------------+----------------+
    * | F0         | 3             | 3                | 6              |
    * +------------+---------------+------------------+----------------+
    * | F4         | 4             | 4                | 8              |
    * +------------+---------------+------------------+----------------+
    */

    /* index1 -> E0, index14 -> ED */
    const __m128i df_ee_tbl = _mm_setr_epi8(0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0);

    /* index1 -> F0, index5 -> F4 */
    const __m128i ef_fe_tbl = _mm_setr_epi8(0, 3, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);

    __m128i prev_input = _mm_set1_epi8(0);
    __m128i prev_first_len = _mm_set1_epi8(0);
    __m128i error = _mm_set1_epi8(0);

    auto check_packed = [&](__m128i input) noexcept
    {
        /* high_nibbles = input >> 4 */
        const __m128i high_nibbles = _mm_and_si128(_mm_srli_epi16(input, 4), _mm_set1_epi8(0x0F));

        /* first_len = legal character length minus 1 */
        /* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
        /* first_len = first_len_tbl[high_nibbles] */
        __m128i first_len = _mm_shuffle_epi8(first_len_tbl, high_nibbles);

        /* First Byte: set range index to 8 for bytes within 0xC0 ~ 0xFF */
        /* range = first_range_tbl[high_nibbles] */
        __m128i range = _mm_shuffle_epi8(first_range_tbl, high_nibbles);

        /* Second Byte: set range index to first_len */
        /* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
        /* range |= (first_len, prev_first_len) << 1 byte */
        range = _mm_or_si128(range, _mm_alignr_epi8(first_len, prev_first_len, 15));

        /* Third Byte: set range index to saturate_sub(first_len, 1) */
        /* 0 for 00~7F, 0 for C0~DF, 1 for E0~EF, 2 for F0~FF */
        __m128i tmp1;
        __m128i tmp2;
        /* tmp1 = saturate_sub(first_len, 1) */
        tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(1));
        /* tmp2 = saturate_sub(prev_first_len, 1) */
        tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(1));
        /* range |= (tmp1, tmp2) << 2 bytes */
        range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 14));

        /* Fourth Byte: set range index to saturate_sub(first_len, 2) */
        /* 0 for 00~7F, 0 for C0~DF, 0 for E0~EF, 1 for F0~FF */
        /* tmp1 = saturate_sub(first_len, 2) */
        tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(2));
        /* tmp2 = saturate_sub(prev_first_len, 2) */
        tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(2));
        /* range |= (tmp1, tmp2) << 3 bytes */
        range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 13));

        /*
            * Now we have below range indices calculated
            * Correct cases:
            * - 8 for C0~FF
            * - 3 for 1st byte after F0~FF
            * - 2 for 1st byte after E0~EF or 2nd byte after F0~FF
            * - 1 for 1st byte after C0~DF or 2nd byte after E0~EF or
            *         3rd byte after F0~FF
            * - 0 for others
            * Error cases:
            *   9,10,11 if non ascii First Byte overlaps
            *   E.g., F1 80 C2 90 --> 8 3 10 2, where 10 indicates error
            */

        /* Adjust Second Byte range for special First Bytes(E0,ED,F0,F4) */
        /* Overlaps lead to index 9~15, which are illegal in range table */
        __m128i shift1;
        __m128i pos;
        __m128i range2;
        /* shift1 = (input, prev_input) << 1 byte */
        shift1 = _mm_alignr_epi8(input, prev_input, 15);
        pos = _mm_sub_epi8(shift1, _mm_set1_epi8(0xEF));
        /*
            * shift1:  | EF  F0 ... FE | FF  00  ... ...  DE | DF  E0 ... EE |
            * pos:     | 0   1      15 | 16  17           239| 240 241    255|
            * pos-240: | 0   0      0  | 0   0            0  | 0   1      15 |
            * pos+112: | 112 113    127|       >= 128        |     >= 128    |
            */
        tmp1 = _mm_subs_epu8(pos, _mm_set1_epi8(0xF0));
        range2 = _mm_shuffle_epi8(df_ee_tbl, tmp1);
        tmp2 = _mm_adds_epu8(pos, _mm_set1_epi8(112));
        range2 = _mm_add_epi8(range2, _mm_shuffle_epi8(ef_fe_tbl, tmp2));

        range = _mm_add_epi8(range, range2);

        /* Load min and max values per calculated range index */
        __m128i minv = _mm_shuffle_epi8(range_min_tbl, range);
        __m128i maxv = _mm_shuffle_epi8(range_max_tbl, range);

        /* Check value range */
        error = _mm_or_si128(error, _mm_cmplt_epi8(input, minv));
        error = _mm_or_si128(error, _mm_cmpgt_epi8(input, maxv));

        prev_input = input;
        prev_first_len = first_len;

        data += 16;
        len -= 16;
    };

    while (len >= 16) // NOLINT
        check_packed(_mm_loadu_si128(reinterpret_cast<const __m128i *>(data)));

    /// 0 <= len <= 15 for now. Reading data from data - 1 because of right padding of 15 and left padding
    /// Then zero some bytes from the unknown memory and check again.
    alignas(16) char buf[32];
    _mm_store_si128(reinterpret_cast<__m128i *>(buf), _mm_loadu_si128(
        reinterpret_cast<const __m128i *>(data - 1)));
    memset(buf + len + 1, 0, 16);
    check_packed(_mm_loadu_si128(reinterpret_cast<__m128i *>(buf + 1)));

    return _mm_testz_si128(error, error);
}

注释也足够多了。简单来说,该算法使用 range 表以及 adjust 操作来实现划分不同场景的字节索引。range 表可以视为以字节为单位所构成的数组。字节中的值可以作为索引,交给 range min/max table 去进一步验证。比如,当某字节计算得出 index=6 时,该字节只允许 [0x90, 0xBF] 的范围。而 adjust 就是用来修正特殊情况的,在 index 提交到 range min/max table 前执行。

具体点就一步步来看吧。

分析

/*
* Map high nibble of "First Byte" to legal character length minus 1
* 0x00 ~ 0xBF --> 0
* 0xC0 ~ 0xDF --> 1
* 0xE0 ~ 0xEF --> 2
* 0xF0 ~ 0xFF --> 3
*/
const __m128i first_len_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 3);

/* high_nibbles = input >> 4 */
const __m128i high_nibbles = _mm_and_si128(_mm_srli_epi16(input, 4), _mm_set1_epi8(0x0F));

/* first_len = legal character length minus 1 */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* first_len = first_len_tbl[high_nibbles] */
__m128i first_len = _mm_shuffle_epi8(first_len_tbl, high_nibbles);

第零步,获取 first length。观察到 UTF-8 字符的长度取决于第一个字节的高半字节(nibble),我们可以使用查表法做 0xc 映射到 2,0xf 映射到 4 等操作。注释中提到 high_nibbles = input >> 4 就是这种思路,但是实际上的做法有偏差,因为 x86 SIMD 只有最少按 16 位操作的 SIMD srli_epi16 指令,所以还需要做 (u16>>4)&0x0f0f 的妥协设计。而且也从实际取值得知,first length 为实际长度减去一,并且不区分单字节字符和非法字符。

/* Map "First Byte" to 8-th item of range table (0xC2 ~ 0xF4) */
const __m128i first_range_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8);

/* First Byte: set range index to 8 for bytes within 0xC0 ~ 0xFF */
/* range = first_range_tbl[high_nibbles] */
__m128i range = _mm_shuffle_epi8(first_range_tbl, high_nibbles);

第一步,构造 range 的第一个字节(first byte)。只有第一步中属于 [0xc, 0xf] 的高半字节才会映射到 8。注意文中说的「range 第 N 个字节」都是指代 input 中的某个 UTF-8 字符对应的字节(看),因此 N 不可能超过 4;而 range 向量本身不管后续怎么处理,都是把数值通过 or 的形式放到同一个字节位置当中。

/* Second Byte: set range index to first_len */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* range |= (first_len, prev_first_len) << 1 byte */
range = _mm_or_si128(range, _mm_alignr_epi8(first_len, prev_first_len, 15));

higher FFFF FFFF FFFF FFFF PPPP PPPP PPPP PPPP lower

第二步,构造 range 的第二个字节。prev_first_len 是备份上次循环 first length 数值的变量,因为 UTF-8 是变长的,有必要获取上一个字节的信息。将 first length 标记为 F, previous first length 标记为 P,并且左边是高位,右边是低位,可以看出 alignr 就是求出绿色的区域,而且每个标量的数值为 0/1/2/3。总之意思就是当前位置的上一个字节所对应的 first length,更进一步的意思是识别当前位置是否为第二个字节。比如数值 1 的前提是上一个字节求出其 first lenght 为 1,那么就能确定当前位置是一个长度为 2 的 UTF-8 字符的 second byte。数值 2 和 3 同理。

/* Third Byte: set range index to saturate_sub(first_len, 1) */
/* 0 for 00~7F, 0 for C0~DF, 1 for E0~EF, 2 for F0~FF */
__m128i tmp1;
__m128i tmp2;
/* tmp1 = saturate_sub(first_len, 1) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(1));
/* tmp2 = saturate_sub(prev_first_len, 1) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(1));
/* range |= (tmp1, tmp2) << 2 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 14));

/* Fourth Byte: set range index to saturate_sub(first_len, 2) */
/* 0 for 00~7F, 0 for C0~DF, 0 for E0~EF, 1 for F0~FF */
/* tmp1 = saturate_sub(first_len, 2) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(2));
/* tmp2 = saturate_sub(prev_first_len, 2) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(2));
/* range |= (tmp1, tmp2) << 3 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 13));

第三步和第四步,构造 range 的第三和第四个字节。差不多意思,但是用了 subs 饱和减法,长度都分别扣掉 1 和 2。由于是基于 first length 的减法,第三步绝对不会得到数值 3,第四步也绝对不会得到数值 2/3。

/*
    * Now we have below range indices calculated
    * Correct cases:
    * - 8 for C0~FF
    * - 3 for 1st byte after F0~FF
    * - 2 for 1st byte after E0~EF or 2nd byte after F0~FF
    * - 1 for 1st byte after C0~DF or 2nd byte after E0~EF or
    *         3rd byte after F0~FF
    * - 0 for others
    * Error cases:
    *   9,10,11 if non ascii First Byte overlaps
    *   E.g., F1 80 C2 90 --> 8 3 10 2, where 10 indicates error
    */

现在我们知道一个 range 向量中的每个标量(字节)可以是 0 | 8 | [1-3] 的运算组合。比如,数值 2 意味着它自身不是第一个字节(没有第一步的 8 参与运算),但是可以是第二个字节或者第三个字节(前者从第二步得知,后者从第三步得知,range 本身使用了 or)。又比如,数值 9 意味着它被判断为第一个字节的同时又是第二个字节,因此能做到错误检测。

/*
* Tables for fast handling of four special First Bytes(E0,ED,F0,F4), after
* which the Second Byte are not 80~BF. It contains "range index adjustment".
* +------------+---------------+------------------+----------------+
* | First Byte | original range| range adjustment | adjusted range |
* +------------+---------------+------------------+----------------+
* | E0         | 2             | 2                | 4              |
* +------------+---------------+------------------+----------------+
* | ED         | 2             | 3                | 5              |
* +------------+---------------+------------------+----------------+
* | F0         | 3             | 3                | 6              |
* +------------+---------------+------------------+----------------+
* | F4         | 4             | 4                | 8              |
* +------------+---------------+------------------+----------------+
*/

/* index1 -> E0, index14 -> ED */
const __m128i df_ee_tbl = _mm_setr_epi8(0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0);

/* index1 -> F0, index5 -> F4 */
const __m128i ef_fe_tbl = _mm_setr_epi8(0, 3, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);

/* Adjust Second Byte range for special First Bytes(E0,ED,F0,F4) */
/* Overlaps lead to index 9~15, which are illegal in range table */
__m128i shift1;
__m128i pos;
__m128i range2;
/* shift1 = (input, prev_input) << 1 byte */
shift1 = _mm_alignr_epi8(input, prev_input, 15);
pos = _mm_sub_epi8(shift1, _mm_set1_epi8(0xEF));
/*
    * shift1:  | EF  F0 ... FE | FF  00  ... ...  DE | DF  E0 ... EE |
    * pos:     | 0   1      15 | 16  17           239| 240 241    255|
    * pos-240: | 0   0      0  | 0   0            0  | 0   1      15 |
    * pos+112: | 112 113    127|       >= 128        |     >= 128    |
    */
tmp1 = _mm_subs_epu8(pos, _mm_set1_epi8(0xF0));
range2 = _mm_shuffle_epi8(df_ee_tbl, tmp1);
tmp2 = _mm_adds_epu8(pos, _mm_set1_epi8(112));
range2 = _mm_add_epi8(range2, _mm_shuffle_epi8(ef_fe_tbl, tmp2));

range = _mm_add_epi8(range, range2);

第五步,adjust 操作。前面步骤存在的问题是粒度不够细。比如搞一个 0x80E0 的非法字符,range 是认不出第二个字节有问题的,因为 E0 场景的第二个字节不得小于 0xA0。这里的解决方案依然是 alignr 和饱和运算,借由上一个字节为 0xE_ 或 0xF_ 的前提定位到第二个字节,然后 shuffle 筛选出 0xE0、0xED、0xF0 和 0xF4,从而提供一个 adjusted range 进行修正,修正结果详见上方表格。

/*
* Range table, map range index to min and max values
*/
const __m128i range_min_tbl
    = _mm_setr_epi8(0x00, 0x80, 0x80, 0x80, 0xA0, 0x80, 0x90, 0x80,
                    0xC2, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F);

const __m128i range_max_tbl
    = _mm_setr_epi8(0x7F, 0xBF, 0xBF, 0xBF, 0xBF, 0x9F, 0xBF, 0x8F,
                    0xF4, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80);

/* Load min and max values per calculated range index */
__m128i minv = _mm_shuffle_epi8(range_min_tbl, range);
__m128i maxv = _mm_shuffle_epi8(range_max_tbl, range);

/* Check value range */
error = _mm_or_si128(error, _mm_cmplt_epi8(input, minv));
error = _mm_or_si128(error, _mm_cmpgt_epi8(input, maxv));

第六步,也是最后一步。每个数值都作为(识别不同场景的)索引送到 range min/max table,使用 SIMD cmp 指令就能水平地得知有无错误(超出规范范围)产生。比如第四步后算出索引 8 for C0~FF,同时 range_min_tbl[8] = 0xC2,那么 first byte 就不允许存在 0xC0 的非法字节;又比如 adjusted 后索引 5 的特殊场景(对应于 first byte 为 0xED 前提的 second byte),range_max_tbl[5] = 0x9F,从而区分常规 second byte 最大允许 0xBF 的情况。

总之,通过非常精细的表驱动设计和计算技巧,UTF-8 验证过程中所有的步骤都不需要任何分支。

THE END

ClickHouse 的 SIMD UTF-8 验证实现算是少见的别出心裁的设计。因为就其他内部实现来说,ClickHouse 大部分的 SIMD 操作都只是应用于可以直接替代的场景(比如,for … max 改为 mm_max)。

不要质疑,我真的找过全部 mm/mm256/mm512 使用代码。

图文无关

unreal-unity

按照惯例,文末随机附有内容无关的图。