背景
字符串处理和无分支编程是相当矛盾的需求(总之我不会),ClickHouse 给了一个无分支向量处理 UTF-8 验证的算法,学学人家怎么做的。
规范
Code Points | First Byte | Second Byte | Third Byte | Fourth Byte |
---|---|---|---|---|
U+0000..U+007F | 00..7F | |||
U+0080..U+07FF | C2..DF | 80..BF | ||
U+0800..U+0FFF | E0 | A0..BF | 80..BF | |
U+1000..U+CFFF | E1..EC | 80..BF | 80..BF | |
U+D000..U+D7FF | ED | 80..9F | 80..BF | |
U+E000..U+FFFF | EE..EF | 80..BF | 80..BF | |
U+10000..U+3FFFF | F0 | 90..BF | 80..BF | 80..BF |
U+40000..U+FFFFF | F1..F3 | 80..BF | 80..BF | 80..BF |
U+100000..U+10FFFF | F4 | 80..8F | 80..BF | 80..BF |
上表给出了合规的 UTF-8 范围。需要注意这几点:
- UTF-8 编码的字符所占字节数是变长的,取决于第一个字节,并且最多占用四字节。
- 第一个字节不允许存在 0xC0、0xC1 和 [0xF5, 0xFF]。
- 第二、第三和第四个字节只允许存在 [0x80, 0xBF]。
- 第一个字节取 0xE0、0xED、0xF0 和 0xF4 时存在特殊情况,详见表格加粗部分。
实现
// https://github.com/ClickHouse/ClickHouse/blob/v25.1.1.1-new/src/Functions/isValidUTF8.cpp#L70
static UInt8 isValidUTF8(const UInt8 * data, UInt64 len)
{
/*
* Map high nibble of "First Byte" to legal character length minus 1
* 0x00 ~ 0xBF --> 0
* 0xC0 ~ 0xDF --> 1
* 0xE0 ~ 0xEF --> 2
* 0xF0 ~ 0xFF --> 3
*/
const __m128i first_len_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 3);
/* Map "First Byte" to 8-th item of range table (0xC2 ~ 0xF4) */
const __m128i first_range_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8);
/*
* Range table, map range index to min and max values
*/
const __m128i range_min_tbl
= _mm_setr_epi8(0x00, 0x80, 0x80, 0x80, 0xA0, 0x80, 0x90, 0x80,
0xC2, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F);
const __m128i range_max_tbl
= _mm_setr_epi8(0x7F, 0xBF, 0xBF, 0xBF, 0xBF, 0x9F, 0xBF, 0x8F,
0xF4, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80);
/*
* Tables for fast handling of four special First Bytes(E0,ED,F0,F4), after
* which the Second Byte are not 80~BF. It contains "range index adjustment".
* +------------+---------------+------------------+----------------+
* | First Byte | original range| range adjustment | adjusted range |
* +------------+---------------+------------------+----------------+
* | E0 | 2 | 2 | 4 |
* +------------+---------------+------------------+----------------+
* | ED | 2 | 3 | 5 |
* +------------+---------------+------------------+----------------+
* | F0 | 3 | 3 | 6 |
* +------------+---------------+------------------+----------------+
* | F4 | 4 | 4 | 8 |
* +------------+---------------+------------------+----------------+
*/
/* index1 -> E0, index14 -> ED */
const __m128i df_ee_tbl = _mm_setr_epi8(0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0);
/* index1 -> F0, index5 -> F4 */
const __m128i ef_fe_tbl = _mm_setr_epi8(0, 3, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
__m128i prev_input = _mm_set1_epi8(0);
__m128i prev_first_len = _mm_set1_epi8(0);
__m128i error = _mm_set1_epi8(0);
auto check_packed = [&](__m128i input) noexcept
{
/* high_nibbles = input >> 4 */
const __m128i high_nibbles = _mm_and_si128(_mm_srli_epi16(input, 4), _mm_set1_epi8(0x0F));
/* first_len = legal character length minus 1 */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* first_len = first_len_tbl[high_nibbles] */
__m128i first_len = _mm_shuffle_epi8(first_len_tbl, high_nibbles);
/* First Byte: set range index to 8 for bytes within 0xC0 ~ 0xFF */
/* range = first_range_tbl[high_nibbles] */
__m128i range = _mm_shuffle_epi8(first_range_tbl, high_nibbles);
/* Second Byte: set range index to first_len */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* range |= (first_len, prev_first_len) << 1 byte */
range = _mm_or_si128(range, _mm_alignr_epi8(first_len, prev_first_len, 15));
/* Third Byte: set range index to saturate_sub(first_len, 1) */
/* 0 for 00~7F, 0 for C0~DF, 1 for E0~EF, 2 for F0~FF */
__m128i tmp1;
__m128i tmp2;
/* tmp1 = saturate_sub(first_len, 1) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(1));
/* tmp2 = saturate_sub(prev_first_len, 1) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(1));
/* range |= (tmp1, tmp2) << 2 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 14));
/* Fourth Byte: set range index to saturate_sub(first_len, 2) */
/* 0 for 00~7F, 0 for C0~DF, 0 for E0~EF, 1 for F0~FF */
/* tmp1 = saturate_sub(first_len, 2) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(2));
/* tmp2 = saturate_sub(prev_first_len, 2) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(2));
/* range |= (tmp1, tmp2) << 3 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 13));
/*
* Now we have below range indices calculated
* Correct cases:
* - 8 for C0~FF
* - 3 for 1st byte after F0~FF
* - 2 for 1st byte after E0~EF or 2nd byte after F0~FF
* - 1 for 1st byte after C0~DF or 2nd byte after E0~EF or
* 3rd byte after F0~FF
* - 0 for others
* Error cases:
* 9,10,11 if non ascii First Byte overlaps
* E.g., F1 80 C2 90 --> 8 3 10 2, where 10 indicates error
*/
/* Adjust Second Byte range for special First Bytes(E0,ED,F0,F4) */
/* Overlaps lead to index 9~15, which are illegal in range table */
__m128i shift1;
__m128i pos;
__m128i range2;
/* shift1 = (input, prev_input) << 1 byte */
shift1 = _mm_alignr_epi8(input, prev_input, 15);
pos = _mm_sub_epi8(shift1, _mm_set1_epi8(0xEF));
/*
* shift1: | EF F0 ... FE | FF 00 ... ... DE | DF E0 ... EE |
* pos: | 0 1 15 | 16 17 239| 240 241 255|
* pos-240: | 0 0 0 | 0 0 0 | 0 1 15 |
* pos+112: | 112 113 127| >= 128 | >= 128 |
*/
tmp1 = _mm_subs_epu8(pos, _mm_set1_epi8(0xF0));
range2 = _mm_shuffle_epi8(df_ee_tbl, tmp1);
tmp2 = _mm_adds_epu8(pos, _mm_set1_epi8(112));
range2 = _mm_add_epi8(range2, _mm_shuffle_epi8(ef_fe_tbl, tmp2));
range = _mm_add_epi8(range, range2);
/* Load min and max values per calculated range index */
__m128i minv = _mm_shuffle_epi8(range_min_tbl, range);
__m128i maxv = _mm_shuffle_epi8(range_max_tbl, range);
/* Check value range */
error = _mm_or_si128(error, _mm_cmplt_epi8(input, minv));
error = _mm_or_si128(error, _mm_cmpgt_epi8(input, maxv));
prev_input = input;
prev_first_len = first_len;
data += 16;
len -= 16;
};
while (len >= 16) // NOLINT
check_packed(_mm_loadu_si128(reinterpret_cast<const __m128i *>(data)));
/// 0 <= len <= 15 for now. Reading data from data - 1 because of right padding of 15 and left padding
/// Then zero some bytes from the unknown memory and check again.
alignas(16) char buf[32];
_mm_store_si128(reinterpret_cast<__m128i *>(buf), _mm_loadu_si128(
reinterpret_cast<const __m128i *>(data - 1)));
memset(buf + len + 1, 0, 16);
check_packed(_mm_loadu_si128(reinterpret_cast<__m128i *>(buf + 1)));
return _mm_testz_si128(error, error);
}
注释也足够多了。简单来说,该算法使用 range 表以及 adjust 操作来实现划分不同场景的字节索引。range 表可以视为以字节为单位所构成的数组。字节中的值可以作为索引,交给 range min/max table 去进一步验证。比如,当某字节计算得出 index=6 时,该字节只允许 [0x90, 0xBF] 的范围。而 adjust 就是用来修正特殊情况的,在 index 提交到 range min/max table 前执行。
具体点就一步步来看吧。
分析
/*
* Map high nibble of "First Byte" to legal character length minus 1
* 0x00 ~ 0xBF --> 0
* 0xC0 ~ 0xDF --> 1
* 0xE0 ~ 0xEF --> 2
* 0xF0 ~ 0xFF --> 3
*/
const __m128i first_len_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 2, 3);
/* high_nibbles = input >> 4 */
const __m128i high_nibbles = _mm_and_si128(_mm_srli_epi16(input, 4), _mm_set1_epi8(0x0F));
/* first_len = legal character length minus 1 */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* first_len = first_len_tbl[high_nibbles] */
__m128i first_len = _mm_shuffle_epi8(first_len_tbl, high_nibbles);
第零步,获取 first length。观察到 UTF-8 字符的长度取决于第一个字节的高半字节(nibble),我们可以使用查表法做 0xc 映射到 2,0xf 映射到 4 等操作。注释中提到 high_nibbles = input >> 4
就是这种思路,但是实际上的做法有偏差,因为 x86 SIMD 只有最少按 16 位操作的 SIMD srli_epi16 指令,所以还需要做 (u16>>4)&0x0f0f
的妥协设计。而且也从实际取值得知,first length 为实际长度减去一,并且不区分单字节字符和非法字符。
/* Map "First Byte" to 8-th item of range table (0xC2 ~ 0xF4) */
const __m128i first_range_tbl = _mm_setr_epi8(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8);
/* First Byte: set range index to 8 for bytes within 0xC0 ~ 0xFF */
/* range = first_range_tbl[high_nibbles] */
__m128i range = _mm_shuffle_epi8(first_range_tbl, high_nibbles);
第一步,构造 range 的第一个字节(first byte)。只有第一步中属于 [0xc, 0xf] 的高半字节才会映射到 8。注意文中说的「range 第 N 个字节」都是指代 input 中的某个 UTF-8 字符对应的字节(看表),因此 N 不可能超过 4;而 range 向量本身不管后续怎么处理,都是把数值通过 or 的形式放到同一个字节位置当中。
/* Second Byte: set range index to first_len */
/* 0 for 00~7F, 1 for C0~DF, 2 for E0~EF, 3 for F0~FF */
/* range |= (first_len, prev_first_len) << 1 byte */
range = _mm_or_si128(range, _mm_alignr_epi8(first_len, prev_first_len, 15));
higher
FFFF FFFF FFFF FFFF PPPP PPPP PPPP PPPP lower
第二步,构造 range 的第二个字节。prev_first_len 是备份上次循环 first length 数值的变量,因为 UTF-8 是变长的,有必要获取上一个字节的信息。将 first length 标记为 F, previous first length 标记为 P,并且左边是高位,右边是低位,可以看出 alignr 就是求出绿色的区域,而且每个标量的数值为 0/1/2/3。总之意思就是当前位置的上一个字节所对应的 first length,更进一步的意思是识别当前位置是否为第二个字节。比如数值 1 的前提是上一个字节求出其 first lenght 为 1,那么就能确定当前位置是一个长度为 2 的 UTF-8 字符的 second byte。数值 2 和 3 同理。
/* Third Byte: set range index to saturate_sub(first_len, 1) */
/* 0 for 00~7F, 0 for C0~DF, 1 for E0~EF, 2 for F0~FF */
__m128i tmp1;
__m128i tmp2;
/* tmp1 = saturate_sub(first_len, 1) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(1));
/* tmp2 = saturate_sub(prev_first_len, 1) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(1));
/* range |= (tmp1, tmp2) << 2 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 14));
/* Fourth Byte: set range index to saturate_sub(first_len, 2) */
/* 0 for 00~7F, 0 for C0~DF, 0 for E0~EF, 1 for F0~FF */
/* tmp1 = saturate_sub(first_len, 2) */
tmp1 = _mm_subs_epu8(first_len, _mm_set1_epi8(2));
/* tmp2 = saturate_sub(prev_first_len, 2) */
tmp2 = _mm_subs_epu8(prev_first_len, _mm_set1_epi8(2));
/* range |= (tmp1, tmp2) << 3 bytes */
range = _mm_or_si128(range, _mm_alignr_epi8(tmp1, tmp2, 13));
第三步和第四步,构造 range 的第三和第四个字节。差不多意思,但是用了 subs 饱和减法,长度都分别扣掉 1 和 2。由于是基于 first length 的减法,第三步绝对不会得到数值 3,第四步也绝对不会得到数值 2/3。
/*
* Now we have below range indices calculated
* Correct cases:
* - 8 for C0~FF
* - 3 for 1st byte after F0~FF
* - 2 for 1st byte after E0~EF or 2nd byte after F0~FF
* - 1 for 1st byte after C0~DF or 2nd byte after E0~EF or
* 3rd byte after F0~FF
* - 0 for others
* Error cases:
* 9,10,11 if non ascii First Byte overlaps
* E.g., F1 80 C2 90 --> 8 3 10 2, where 10 indicates error
*/
现在我们知道一个 range 向量中的每个标量(字节)可以是 0 | 8 | [1-3]
的运算组合。比如,数值 2 意味着它自身不是第一个字节(没有第一步的 8 参与运算),但是可以是第二个字节或者第三个字节(前者从第二步得知,后者从第三步得知,range 本身使用了 or)。又比如,数值 9 意味着它被判断为第一个字节的同时又是第二个字节,因此能做到错误检测。
/*
* Tables for fast handling of four special First Bytes(E0,ED,F0,F4), after
* which the Second Byte are not 80~BF. It contains "range index adjustment".
* +------------+---------------+------------------+----------------+
* | First Byte | original range| range adjustment | adjusted range |
* +------------+---------------+------------------+----------------+
* | E0 | 2 | 2 | 4 |
* +------------+---------------+------------------+----------------+
* | ED | 2 | 3 | 5 |
* +------------+---------------+------------------+----------------+
* | F0 | 3 | 3 | 6 |
* +------------+---------------+------------------+----------------+
* | F4 | 4 | 4 | 8 |
* +------------+---------------+------------------+----------------+
*/
/* index1 -> E0, index14 -> ED */
const __m128i df_ee_tbl = _mm_setr_epi8(0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3, 0);
/* index1 -> F0, index5 -> F4 */
const __m128i ef_fe_tbl = _mm_setr_epi8(0, 3, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
/* Adjust Second Byte range for special First Bytes(E0,ED,F0,F4) */
/* Overlaps lead to index 9~15, which are illegal in range table */
__m128i shift1;
__m128i pos;
__m128i range2;
/* shift1 = (input, prev_input) << 1 byte */
shift1 = _mm_alignr_epi8(input, prev_input, 15);
pos = _mm_sub_epi8(shift1, _mm_set1_epi8(0xEF));
/*
* shift1: | EF F0 ... FE | FF 00 ... ... DE | DF E0 ... EE |
* pos: | 0 1 15 | 16 17 239| 240 241 255|
* pos-240: | 0 0 0 | 0 0 0 | 0 1 15 |
* pos+112: | 112 113 127| >= 128 | >= 128 |
*/
tmp1 = _mm_subs_epu8(pos, _mm_set1_epi8(0xF0));
range2 = _mm_shuffle_epi8(df_ee_tbl, tmp1);
tmp2 = _mm_adds_epu8(pos, _mm_set1_epi8(112));
range2 = _mm_add_epi8(range2, _mm_shuffle_epi8(ef_fe_tbl, tmp2));
range = _mm_add_epi8(range, range2);
第五步,adjust 操作。前面步骤存在的问题是粒度不够细。比如搞一个 0x80E0 的非法字符,range 是认不出第二个字节有问题的,因为 E0 场景的第二个字节不得小于 0xA0。这里的解决方案依然是 alignr 和饱和运算,借由上一个字节为 0xE_ 或 0xF_ 的前提定位到第二个字节,然后 shuffle 筛选出 0xE0、0xED、0xF0 和 0xF4,从而提供一个 adjusted range 进行修正,修正结果详见上方表格。
/*
* Range table, map range index to min and max values
*/
const __m128i range_min_tbl
= _mm_setr_epi8(0x00, 0x80, 0x80, 0x80, 0xA0, 0x80, 0x90, 0x80,
0xC2, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F, 0x7F);
const __m128i range_max_tbl
= _mm_setr_epi8(0x7F, 0xBF, 0xBF, 0xBF, 0xBF, 0x9F, 0xBF, 0x8F,
0xF4, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80, 0x80);
/* Load min and max values per calculated range index */
__m128i minv = _mm_shuffle_epi8(range_min_tbl, range);
__m128i maxv = _mm_shuffle_epi8(range_max_tbl, range);
/* Check value range */
error = _mm_or_si128(error, _mm_cmplt_epi8(input, minv));
error = _mm_or_si128(error, _mm_cmpgt_epi8(input, maxv));
第六步,也是最后一步。每个数值都作为(识别不同场景的)索引送到 range min/max table,使用 SIMD cmp 指令就能水平地得知有无错误(超出规范范围)产生。比如第四步后算出索引 8 for C0~FF,同时 range_min_tbl[8] = 0xC2,那么 first byte 就不允许存在 0xC0 的非法字节;又比如 adjusted 后索引 5 的特殊场景(对应于 first byte 为 0xED 前提的 second byte),range_max_tbl[5] = 0x9F,从而区分常规 second byte 最大允许 0xBF 的情况。
总之,通过非常精细的表驱动设计和计算技巧,UTF-8 验证过程中所有的步骤都不需要任何分支。
THE END
ClickHouse 的 SIMD UTF-8 验证实现算是少见的别出心裁的设计。因为就其他内部实现来说,ClickHouse 大部分的 SIMD 操作都只是应用于可以直接替代的场景(比如,for … max 改为 mm_max)。
不要质疑,我真的找过全部 mm/mm256/mm512 使用代码。
图文无关
按照惯例,文末随机附有内容无关的图。