C++小練習(xí)之高性能實現(xiàn)字符串分割
字符串分割是日常工作中比較常見的基礎(chǔ)函數(shù),通常大家會使用現(xiàn)成的基礎(chǔ)庫,基礎(chǔ)庫的性能是否是最佳的?本文基于一個周末小練習(xí),研究如何最大限度的提升字符串分割的性能。
1、背景
字符串按照分隔符拆成多個子串在日常工作中很常見,譬如:后臺服務(wù)對配置的解析、模型服務(wù)對輸入特征的拆分、磁盤格式索引文件轉(zhuǎn)內(nèi)存格式等等,通常最簡單的實現(xiàn)是使用 boost 庫:
std::vector<std::string> output_tokens;<br>boost::split(output_tokens, input_sentence, boost::is_any_of(" "), boost::token_compress_off);
這是最簡單也是性能最差的寫法。在一些頻繁做字符串拆分的場景下,部分開發(fā)者會發(fā)現(xiàn)用 string 會觸發(fā)字符串拷貝的代價,于是改為自己使用 string_view,這會顯著提高性能,除此之外,在使用 string_view 的基礎(chǔ)上仍然有兩個點需要特別注意:
- 盡可能的減少分割時的計算量
- 使用 SIMD 指令
本文將對此做細致分析。
2、目標(biāo)
針對單字符分割和任意字符分割兩類場景,本文以代碼案例為主,講解如何寫出高性能的字符串分割函數(shù)。在考慮高性能的情況下,最基本的要求是字符串分割不應(yīng)觸發(fā)拷貝,因此本文實現(xiàn)的多個字符串分割函數(shù),都基于 string_view 消除了拷貝,在此基礎(chǔ)上再進行性能優(yōu)化分析。本文兩類場景的函數(shù)簽名如下:
std::vector<std::string_view> SplitString(std::string_view input, char delimiter); std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters);
3、高性能實現(xiàn)
單字符分割和任意字符分割這兩類場景分開介紹。
3.1、單字符分割字符串的5 個版本
下面提供 5 種單字符分割字符串的實現(xiàn),并對其性能做總結(jié)分析。
(1)版本1-簡單版本
單字符分割是最常見的場景,譬如用于日志解析、特征解析等,首先我們考慮基于遍歷自行實現(xiàn),實現(xiàn)如下:
std::vector<std::string_view> SplitStringV1(std::string_view input, char delimiter) { std::vector<std::string_view> tokens; int start_pos = 0; int size = 0; for (size_t i = 0; i < input.size(); ++i) { if (input[i] == delimiter) { if (size != 0) { tokens.emplace_back(input.data() + start_pos, size); size = 0; } start_pos = i + 1; } else { ++size; } } if (size > 0) { tokens.emplace_back(input.data() + start_pos, size); } return tokens; }
這樣的實現(xiàn)很好理解:遍歷 input 字符串,發(fā)現(xiàn)有分隔符 delimiter 時,考慮生成結(jié)果子字符串,生成子字符串需要知道起點和長度,因此定義了 token_start 和 size 兩個臨時變量,并在遍歷過程中維護這兩臨時變量。但仔細觀察這個函數(shù)的實現(xiàn),可以發(fā)現(xiàn)有三個變量:token_start、size、i,但通常來說,定位一個子字符串只需要兩個變量(或者叫游標(biāo)):start、end 即可,這里存在性能優(yōu)化空間。
(2)版本2-優(yōu)化版本
基于兩個游標(biāo)的實現(xiàn)代碼:
std::vector<std::string_view> SplitStringV2(std::string_view input, char delimiter) { std::vector<std::string_view> tokens; const char* token_start = input.data(); const char* p = token_start; const char* end_pos = input.data() + input.size(); for (; p != end_pos; ++p) { if (*p == delimiter) { if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } token_start = p + 1; continue; } } if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } return tokens; }
這里 token_start 作為子串的起始位置,p 作為遞增游標(biāo),上一份代碼的 size 可以通過 p - token_start 獲得。這個實現(xiàn)少維護 size 變量,因此性能更好。
(3)版本3-STL 版本
有時候,我們也會考慮使用標(biāo)準(zhǔn)庫的 find_first_of 函數(shù)實現(xiàn),代碼量更少,代碼如下:
std::vector<std::string_view> SplitStringV3(std::string_view input, char delimiter) { std::vector<std::string_view> tokens; size_t token_start = 0; while (token_start < input.size()) { auto token_end = input.find_first_of(delimiter, token_start); if (token_end > token_start) { tokens.emplace_back(input.substr(token_start, token_end - token_start)); } if (token_end == std::string_view::npos) { break; } token_start = token_end + 1; } return tokens; }
但上述實現(xiàn),性能比我們自己實現(xiàn)遍歷的 SplitStringV2 版本性能要差,畢竟 find_first_of 的每次查找都要重新初始化其實位置,相比自己實現(xiàn)遍歷有性能浪費。
(4)版本4-SIMD 最佳版本
字符串分割很重要的一步是逐個字符對字符串做比較,是否可以并行比較呢?是可以的,SIMD 指令可以加速這里的比較,當(dāng)前大多數(shù)機器都已支持 AVX2,但還未普遍支持 AVX512,下面以 AVX2 為例。代碼如下:
// 編譯:g++ split_string_by_char.cc -mavx2 -o split_string_by_char std::vector<std::string_view> SplitStrintV4(std::string_view input, char delimiter) { if (input.size() < 32) { return SplitStringV2(input, delimiter); } std::vector<std::string_view> tokens; uint32_t end_pos = input.size() >> 5 << 5; __m256i cmp_a = _mm256_set1_epi8(delimiter); // 8bit的分隔重復(fù)32次擴充到256bit const char* p = input.data(); const char* end = p + end_pos; uint32_t last_lead_zero = 0; // 上一輪256bit(32個字符)處理后剩下的未拷貝進結(jié)果集的字符串個數(shù) while (p < end) { __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字符加載進內(nèi)存 __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b); // 256 bit 一次比較 uint32_t mask = _mm256_movemask_epi8(cmp); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 記錄本次的頭部0個數(shù),注:mask的序和字符串序是相反的,所以這里頭部的0對應(yīng)字符串尾部的不匹配字符 uint32_t lead_zero = __builtin_clz(mask); // 補上一次未拷貝的字符串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; // 補完,繼續(xù)處理 while (mask != 0) { uint32_t tail_zero = __builtin_ctz(mask); if (tail_zero != 0) { tokens.emplace_back(p, tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; } last_lead_zero = lead_zero; p += lead_zero; } // 256 bit(32字節(jié)) 對齊之后剩下的部分 const char* token_start = input.data() + end_pos - last_lead_zero; const char* pp = token_start; const char* sentence_end = input.data() + input.size(); for (; pp != sentence_end; ++pp) { if (*pp == delimiter) { if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } token_start = pp + 1; continue; } } if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } return tokens; }
這里使用了 5 個關(guān)鍵的函數(shù):
- _mm256_loadu_si256,用于加載 256 位(32 字節(jié))數(shù)據(jù)
- _mm256_cmpeq_epi8,用于比較 256 位數(shù)據(jù)
- _mm256_movemask_epi8,用于將比較的結(jié)果壓縮到 32 位中,一位代表一個字節(jié)
- __builtin_clz,獲取頭部的 0 bit 個數(shù)
- __builtin_ctz,獲取尾部的 0 bit 個數(shù)
同時在代碼實現(xiàn)上需要考慮多個細節(jié)點:
- 待比較字符串小于 32 字節(jié),此時不需要用 SIMD 指令,直接逐個字符比較
- 在比較過程中,
- 在每一次比較結(jié)果的處理時,除了逐個判斷 32 個字符中的分隔符之外,還要考慮上一輪 32 字節(jié)比較的尾部有部分字符沒有生成結(jié)果子字符串,要和本輪次的頭部字符串拼成一個子字符串
- 多輪的 SIMD 指令比較都完成后,考慮:(1)末尾輪有部分字符沒有進入結(jié)果子字符串;(2)部分字符沒有對齊 32 字節(jié),有尾巴部分的數(shù)據(jù)需要逐個字符比較垂類
要考慮好這些細節(jié)點,代碼很復(fù)雜,考慮到 SIMD 指令是單條指令,而我們代碼中對 SIMD 比較完成后的 32 bit(4字節(jié)) 的逐個比較是由多個單條指令祖?zhèn)?,因此嘗試讓 SIMD 指令多算一些,而減少對 32 bit 的輪詢判斷。
(5)版本5- SIMD 較差版本
減少 SIMD 比較指令執(zhí)行完后的 32 bit 遍歷處理,改成每次 SIMD 比較指令完成后,只取頭一個分隔符的結(jié)果。代碼如下:
// 編譯:g++ split_string_by_char.cc -mavx2 -o split_string_by_char std::vector<std::string_view> SplitStrintV5(std::string_view input, char delimiter) { if (input.size() < 32) { return SplitStringV2(input, delimiter); } std::vector<std::string_view> tokens; __m256i cmp_a = _mm256_set1_epi8(delimiter); // 8bit的分隔重復(fù)32次擴充到256bit const char* p = input.data(); uint32_t last_lead_zero = 0; // 上一輪256bit(32個字符)處理后剩下的未拷貝進結(jié)果集的字符串個數(shù) while (p + 32 < input.data() + input.size()) { __m256i cmp_b = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字符加載進內(nèi)存 __m256i cmp = _mm256_cmpeq_epi8(cmp_a, cmp_b); // 256 bit 一次比較 uint32_t mask = _mm256_movemask_epi8(cmp); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 補上一次未拷貝的字符串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); last_lead_zero = 0; } p += tail_zero + 1; } // 不足 256 bit(32字節(jié))部分 const char* token_start = p - last_lead_zero; const char* sentence_end = input.data() + input.size(); for (; p != sentence_end; ++p) { if (*p == delimiter) { if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } token_start = p + 1; continue; } } if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } return tokens; }
在長文本下評測發(fā)現(xiàn)本代碼性能較差。
(6)性能對比
以上 5 個版本的代碼,再加上 google 開源的 absl 基礎(chǔ)庫 absl::StrSplit 函數(shù),對 2K+ 的長文本循環(huán) 10000 次做壓測,性能對比如下:
性能對比 | V1-簡單版 | V2-優(yōu)化版 | V3-stl版本 | V4-SIMD最佳版本 | V5-SIMD較差版本 | absl |
耗時(ms) | 552 | 426 | 508 | 413 | 501 | 709 |
可以看到,absl::StrSplit 性能最差,可能是因為其會返回空字符串導(dǎo)致。性能最佳的是需要復(fù)雜處理的 SIMD 版本,性能次之的是采用兩個游標(biāo)的非 SIMD 版本,兩者的差異非常小??紤]到 SIMD 實現(xiàn)的復(fù)雜性,且性能收益較小,在實際業(yè)務(wù)場景中,可以在復(fù)雜性和性能收益之間做個權(quán)衡。
3.2、任意字符分割字符串
任意字符分割字符串也很常見,譬如有些日志支持空格、tab、逗號任意一個作為分隔符。和 3.1 章的單字符分割類似,提供三種實現(xiàn),并對其性能做總結(jié)。
(1)STL 版
// 不使用 SIMD,使用標(biāo)準(zhǔn)庫的查找,是性能最差的方式 std::vector<std::string_view> SplitString(std::string_view input, std::string_view delimiters) { if (delimiters.empty()) { return {input}; } std::vector<std::string_view> tokens; std::string_view::size_type token_start = input.find_first_not_of(delimiters, 0); std::string_view::size_type token_end = input.find_first_of(delimiters, token_start); while (token_start != std::string_view::npos || token_end != std::string_view::npos) { tokens.emplace_back(input.substr(token_start, token_end - token_start)); token_start = input.find_first_not_of(delimiters, token_end); token_end = input.find_first_of(delimiters, token_start); } return tokens; }
這個實現(xiàn)和 3.1 里的 stl 版本類似,代碼量比較少,性能比較差。
(2)自行遍歷版
// 不使用 SIMD,使用遍歷,是非 SIMD 模式下性能最好的方式 std::vector<std::string_view> SplitStringV2(std::string_view input, std::string_view delimiters) { if (delimiters.empty()) { return {input}; } std::vector<std::string_view> tokens; const char* token_start = input.data(); const char* p = token_start; const char* end_pos = input.data() + input.size(); for (; p != end_pos; ++p) { bool match_delimiter = false; for (auto delimiter : delimiters) { if (*p == delimiter) { match_delimiter = true; break; } } if (match_delimiter) { if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } token_start = p + 1; continue; } } if (p > token_start) { tokens.emplace_back(token_start, p - token_start); } return tokens; }
自行遍歷版本,和 3.1 里的版本 2 代碼很像,性能也是非 SIMD 模式下最好的。
(3)SIMD 版本
std::vector<std::string_view> SplitStringWithSimd256(std::string_view input, std::string_view delimiters) { if (delimiters.empty()) { return {input}; } if (input.size() < 32) { return SplitStringV2(input, delimiters); } std::vector<std::string_view> tokens; uint32_t end_pos = input.size() >> 5 << 5; const char* p = input.data(); const char* end = p + end_pos; uint32_t last_lead_zero = 0; // 上一輪256bit(32個字符)處理后剩下的未拷貝進結(jié)果集的字符串個數(shù) while (p < end) { __m256i cmp_a = _mm256_loadu_si256(reinterpret_cast<const __m256i*>(p)); // 32個字符加載進內(nèi)存 __m256i cmp_result_a = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[0])); for (int i = 1; i != delimiters.size(); ++i) { __m256i cmp_result_b = _mm256_cmpeq_epi8(cmp_a, _mm256_set1_epi8(delimiters[i])); cmp_result_a = _mm256_or_si256(cmp_result_a, cmp_result_b); } uint32_t mask = _mm256_movemask_epi8(cmp_result_a); if (mask == 0) { last_lead_zero += 32; p += 32; continue; } // 記錄本次的頭部0個數(shù),注:mask的序和字符串序是相反的,所以這里頭部的0對應(yīng)字符串尾部的不匹配字符 uint32_t lead_zero = __builtin_clz(mask); // 補上一次未拷貝的字符串 uint32_t tail_zero = __builtin_ctz(mask); if (last_lead_zero != 0 || tail_zero != 0) { tokens.emplace_back(p - last_lead_zero, last_lead_zero + tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; // 補完,繼續(xù)處理 while (mask != 0) { uint32_t tail_zero = __builtin_ctz(mask); if (tail_zero != 0) { tokens.emplace_back(p, tail_zero); } mask >>= (tail_zero + 1); p += tail_zero + 1; } last_lead_zero = lead_zero; p += lead_zero; } // 256 bit(32字節(jié)) 對齊之后剩下的部分 const char* token_start = input.data() + end_pos - last_lead_zero; const char* pp = token_start; const char* sentence_end = input.data() + input.size(); for (; pp != sentence_end; ++pp) { bool match_delimiter = false; for (auto delimiter : delimiters) { if (*pp == delimiter) { match_delimiter = true; break; } } if (match_delimiter) { if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } token_start = pp + 1; continue; } } if (pp > token_start) { tokens.emplace_back(token_start, pp - token_start); } return tokens; }
處理任意分隔符和處理單分隔符大部分代碼類似,只有兩部分有變化:
- 一個待比較字符串和多個分隔符比較得到的多個 mask 碼,使用 _mm256_or_si256 做合并
- 非 SIMD 部分使用 for 循環(huán)遍歷判斷多個分隔符
(4)性能對比
以上 3 個版本的代碼,再加上 google 開源的 absl 基礎(chǔ)庫 absl::StrSplit 函數(shù),對 2K+ 的長文本循環(huán) 10000 次做壓測,性能對比如下:
性能對比 | stl版 | 自行遍歷版 | SIMD版 | absl |
耗時(ms) | 768 | 658 | 480 | 1054 |
absl::StrSplit 性能最差,SIMD 版本有多個分隔符的場景下,性能提升非常明顯,SIMD 應(yīng)用代碼的高復(fù)雜度付出是值得的。
4、思考
字符串分割是很常見的功能,通常其實現(xiàn)代碼也很簡潔,這就使得開發(fā)者容易忽略其性能,寫出非最佳性能的代碼,譬如:沒有使用現(xiàn)代 C++ 中的 string_view、對遍歷過程沒有精細考慮。通過精細的控制計算量以及應(yīng)用 SIMD 指令可以獲得比較好的收益,特別是 SIMD 指令在任意多分隔符場景下性能優(yōu)化效果非常明顯。
以上就是C++小練習(xí)之高性能實現(xiàn)字符串分割的詳細內(nèi)容,更多關(guān)于C++字符串分割的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
C++?分割字符串?dāng)?shù)據(jù)的實現(xiàn)方法
這篇文章主要介紹了C++?分割字符串?dāng)?shù)據(jù)的實現(xiàn)方法,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-09-09