詳解c++20協(xié)程如何使用
什么是協(xié)程
新接觸的人看了網(wǎng)上很多人的見解都是一頭霧水,本人的理解,協(xié)程就是可中斷的函數(shù),這個函數(shù)在執(zhí)行到某一時刻可以暫停,保存當前的上下文(比如當前作用域的變量,函數(shù)參數(shù)等等),在后來某一時刻可以手動恢復這個中斷的函數(shù),把保存的上下文恢復并從中斷的地方繼續(xù)執(zhí)行。簡而言之,協(xié)程就是可中斷的函數(shù),協(xié)程如何實現(xiàn):保存上下文和恢復上下文。
你可能會說協(xié)程不會這么簡單的吧,我這里來舉例一下啊,如python的協(xié)程
def test():
print('begin')
yield
print('hello world')
yield
print('end')
t = test()
next(t)
以上就是一個協(xié)程,怎么調(diào)用它呢,如果直接使用test(),它不是調(diào)用,而是返回一個句柄(python中叫生成器),通過這個句柄就可以啟動這個協(xié)程,以下是調(diào)用結果

很顯然,這個函數(shù)只執(zhí)行了一部分,繼續(xù)執(zhí)行下去只要繼續(xù)調(diào)用next就可以,如上的test函數(shù)只有兩次“中斷”,調(diào)用三次next就會執(zhí)行完畢(由于是主講c++20協(xié)程,python協(xié)程的細節(jié)不會去講)

調(diào)度器
如果是上面的這種協(xié)程是沒有什么實際用途的,協(xié)程和調(diào)度器結合起來才是真正發(fā)揮作用的時候。調(diào)度器就是處理好協(xié)程之間的調(diào)用,知道所有協(xié)程調(diào)用的時機,通過調(diào)度器可以實現(xiàn)更多的功能,如定時協(xié)程,io協(xié)程,以下依舊拿python的協(xié)程來舉例(各位請勿著急,實在是python太好舉例了,前面先說明白,后面c++20的協(xié)程才好講)
還是定義一個協(xié)程
async def test():
print('begin')
print('end')
python為了區(qū)分迭代器生成器和協(xié)程,加入了新關鍵字async和await,并且在里面不能使用yield關鍵字,不過原理都是一樣的,以上的協(xié)程中途沒有中斷(沒有上下文的切換),一次便可以執(zhí)行完畢。
好,現(xiàn)在開始說調(diào)度器,調(diào)度器簡單理解為一個隊列,將一個協(xié)程扔進調(diào)度器,調(diào)度器根據(jù)來執(zhí)行所有的協(xié)程,那么調(diào)度器如何執(zhí)行呢,簡單來說就是使用一個循環(huán),從隊列中取出協(xié)程,然后“復蘇”這個協(xié)程,如下

首先看main函數(shù),asyncio.ensure_future(test())就是將main這個協(xié)程扔進調(diào)度器的隊列中
asyncio.get_event_loop()就是獲得這個循環(huán),
loop.run_forever()就是開始這個循環(huán),在循環(huán)中,會從隊列中取出協(xié)程執(zhí)行,先看執(zhí)行結果

因為test協(xié)程沒有進行上下文的切換,當循環(huán)直接復蘇一次test協(xié)程后,test協(xié)程就直接執(zhí)行完畢了,前面所講,基于這個調(diào)度器可以實現(xiàn)很多額外的功能,如果說在這個循環(huán)中我加入一個睡眠的協(xié)程,用一個鍵值對(鍵為超時的時間戳,值為協(xié)程句柄),在循環(huán)中不停的獲取當前的時間戳,然后從這個隊列中比對時間戳,當時間戳相等后就表明這個協(xié)程就已經(jīng)可以執(zhí)行了,直接取出協(xié)程并復蘇執(zhí)行(當前可以這樣理解,調(diào)度器肯定不是這樣的步驟,還有很多很復雜的步驟,不過我們并不需要知道(一般來講))
看如下的改造

在test協(xié)程中增加了一句await asyncio.sleep(1),這樣就發(fā)生了一次上下文的切換,在循環(huán)中,開始從隊列中取出這個test協(xié)程執(zhí)行,執(zhí)行途中遇到了asyncio.sleep(1),test協(xié)程就保存當前的上下文,然后“中斷”,中斷后,程序流程又回到了循環(huán)中,然后在隊列中又增加一個鍵為時間戳值是test協(xié)程句柄的一項,下一次訓換開始直接獲取當前時間戳,然后比對,如果超時了,就繼續(xù)拿出test協(xié)程進行執(zhí)行(暫時這樣理解),
所以執(zhí)行結果如下

先打印begin,然后等待一秒中,然后再打印end,然后test協(xié)程執(zhí)行完畢,從代碼上看這個邏輯是這樣的,如果調(diào)度器中有多個協(xié)程,在這等待的一秒時間又會上下文切換去執(zhí)行別的協(xié)程,時間到了又會到test協(xié)程中從睡眠的地方恢復執(zhí)行。
c++20的協(xié)程
c++20的標準中,新增了協(xié)程的支持,也就是可以在c++中定義一個協(xié)程了,但是看過的小伙伴肯定是知道的,要定義一個協(xié)程只要定義一些必要的函數(shù),在這里,我推薦知乎的一篇文章,看一下要實現(xiàn)哪一些接口,C++20協(xié)程初探,然后有小伙伴肯定會說了,你這算什么意思,直接拿別人的結果,然后直接寫一個標題,直接套用。
不會的,我當然也不會做這樣的事情,首先我想說明的是,這些接口只是官方定義的,如果記這些簡直跟死背書沒有什么區(qū)別,我先表明的只是協(xié)程究竟是什么,以上python中講到了如何定義一個協(xié)程
async def test(): pass
前面的那個async就是一個協(xié)程要實現(xiàn)的接口,20的標準中支持的就是如何定義一個類似async的東西,好,繼續(xù)往下
如果在c++中能定義了這么一個協(xié)程,肯定也是沒有什么作用的,需要一個調(diào)度器才是協(xié)程的真正強大之處,很抱歉,20官方并沒有提供這樣的東西,以下是我本人寫的提供了類似這樣功能的一套代碼,有人肯定會說網(wǎng)上有那么多c++協(xié)程的代碼,都寫的亂七八糟,根本無法理解(可以這么說,不要噴我,反正我就是這么想的)。
先放鏈接吧libfuture,是的,沒錯,我把這個小工具庫叫做libfuture(感覺有點興奮,畢竟是自己真正意味上第一次寫小工具庫),下載0.0.6版本的就好(臉紅,因為還在完善,只能不停的修bug,0.0.6算是修改的比較完善了,雖然還有一點)

下載好了就是一個解壓包,直接打開libfuture/src/libfuture.sln(如果你是windows的話,當然,這個庫我作了跨平臺,因為涉及到了socket和數(shù)據(jù)收發(fā),使用了windows平臺的iocp和linux平臺的epoll),如果你不想自己編譯,那么可以使用我編譯好了的庫文件和動態(tài)鏈接庫
開始使用
使用之前我說明一下,使用vs2019,而且要支持20標準的,一般直接下載最新的是支持的,打開創(chuàng)建一個項目,我這里直接叫做testlibfuture了,

直接點擊添加現(xiàn)有項將lib文件添加進來

選擇lib文件,點擊直接添加,之后就是這樣

你也可以使用其他的方法,然后隨便寫一個main函數(shù)編譯一下,注意選擇32位debug版本的,如果你是使用我編譯的話

直接將dll文件放入和可執(zhí)行文件一層的目錄中


先說一下配置,語言標準要選擇c++20,

附加包含目錄直接把剛剛下載的libfuture源碼的include目錄包含進去就好了

然后重磅戲就來了,先來寫一個栗子
#include <libfuture.h>
#include <iostream>
using namespace std;
using namespace libfuture;
future_t<> task1()
{
cout << "task1 begin" << endl;
cout << "task1 end" << endl;
co_return;
}
int main(int argc, char** argv)
{
auto sche = current_scheduler();
//開啟一個協(xié)程
sche->ensure_future(task1());
sche->run_until_no_task();
return 0;
}
我用了一個libfuture的命名空間,直接引用頭文件加using namespace就可以,
首先定義一個協(xié)程,就是task1,前面的future_t<> 就是類似python的async的東西,auto sche = current_scheduler();就是得到調(diào)度器,
sche->ensure_future(task1());就是把task1協(xié)程丟進調(diào)度器,sche->run_until_no_task();就是啟動調(diào)度器??匆幌逻\行結果

好,也是上面一樣的思路,將task1協(xié)程扔進調(diào)度器中存放協(xié)程的隊列,調(diào)度器啟動一個循環(huán),直接得到這個循環(huán)執(zhí)行,這個協(xié)程中沒有進行上下文的切換,因此一下就執(zhí)行完畢了。
由于我特別喜歡go語言開啟協(xié)程的方式,我就定義了一個宏叫做cpp,跟ensure_future一樣的功能,所以直接改成以下的
#include <libfuture.h>
#include <iostream>
using namespace std;
using namespace libfuture;
future_t<> task1()
{
cout << "task1 begin" << endl;
cout << "task1 end" << endl;
co_return;
}
int main(int argc, char** argv)
{
auto sche = current_scheduler();
//開啟一個協(xié)程
cpp task1();
sche->run_until_no_task();
return 0;
}
執(zhí)行結果也是上面一樣,現(xiàn)在再來加上一個協(xié)程的睡眠,
#include <libfuture.h>
#include <iostream>
using namespace std;
using namespace libfuture;
future_t<> task1()
{
cout << "task1 begin" << endl;
co_await 1s;
cout << "task1 end" << endl;
co_return;
}
int main(int argc, char** argv)
{
auto sche = current_scheduler();
sche->init();
//開啟一個協(xié)程
cpp task1();
sche->run_until_no_task();
return 0;
}
說明一下,要使用睡眠功能要進行調(diào)度器要進行初始化,也就是init,在python中協(xié)程的睡眠是await asyncio.sleep(1),這樣就是睡眠一秒,這里直接就是co_await 1s就是睡眠一秒,libfuture的睡眠的時間基準是使用標準庫的chrono。執(zhí)行是這樣的

第一個的iocp。。。。。。。87可以先忽略哈,有些設計上的,和日志的打印還沒有具體完善,先打印task1 begin 在等待一秒,再打印task1 end ,好,如果還有其他的協(xié)程,在這一秒的上下文切換中就會去執(zhí)行其他的協(xié)程,當?shù)綍r間了,會回到當前執(zhí)行協(xié)程恢復執(zhí)行,
再來介紹libfuture內(nèi)置的非常重要的協(xié)程,
open_accept(接收一個客戶端),返回值是一個sockaddr_in指針,是客戶端的地址信息;
open_connection(連接一個服務端),返回值是是否連接上;
buffer_read(往一個socket中讀數(shù)據(jù)),返回值是是否接收數(shù)據(jù)是否超時,
buffer_write(往一個socket中寫數(shù)據(jù)),返回值是發(fā)送數(shù)據(jù)是否超時,
是的,我往其中添加的io的協(xié)程,為什么呢,像以上說的,一直判斷隊列中的時間戳,那么在時間沒到的途中一直判斷就會造成cpu的空轉,浪費cpu,所以要把等待的時間讓出去,讓cpu去執(zhí)行其他的程序。
先看一個客戶端栗子
#include "libfuture.h"
#include <string>
#include <iostream>
using namespace std;
using namespace libfuture;
#define BUF_LEN 10240
string send_str = "\
GET / HTTP/1.1\r\n\
Host: 42.192.165.127\r\n\
Connection: keep-alive\r\n\r\n";
future_t<> test_connect(const char* ip, unsigned short port)
{
//空間要大
buffer_t buffer(BUF_LEN + 1);
socket_t client_socket(AF_INET, SOCK_STREAM, 0);
bool has_c = co_await open_connection(&client_socket, ip, port);
if (!has_c)
{
cout << "連接失敗" << endl;
co_return;
}
cout << "連接成功" << endl;
buffer.push(send_str.c_str(), send_str.size());
bool is_timeout = co_await buffer_write(&buffer, &client_socket, 5s);
if (is_timeout)
{
cout << "超時未發(fā)送" << endl;
co_return;
}
cout << "發(fā)送消息成功" << endl;
buffer.clear();
//看看回了什么消息
is_timeout = co_await buffer_read(&buffer, &client_socket, 5s);
if (is_timeout)
{
cout << "超時未讀取到消息" << endl;
co_return;
}
if (buffer.has_data())
{
//防止燙燙或屯屯
int len = buffer.data_len();
if (len >= BUF_LEN)
len = BUF_LEN;
buffer.data()[len] = 0;
cout << buffer.data() << endl;
}
co_return;
}
int main(int argc, char** argv)
{
#ifdef _WIN32
WSADATA data;
WSAStartup(MAKEWORD(2, 2), &data);
#endif
auto sche = current_scheduler();
sche->init();
for (int i = 0; i < 10; ++i)
cpp test_connect("42.192.165.127", 80);
sche->run_until_no_task();
#ifdef _WIN32
WSACleanup();
#endif
return 0;
}
以上,因為windows的socket要先進行初始化才能用,所以有WsaStartup之類的函數(shù),首先
auto sche = current_scheduler(); sche->init();
兩行代碼是獲得調(diào)度器,并初始化調(diào)度器,
for (int i = 0; i < 10; ++i)
cpp test_connect(“42.192.165.127”, 80);
是往調(diào)度器中扔進10個連接的協(xié)程,說明一下,這個ip地址是我服務器的ip地址,我沒做防護,是的,沒有做防護,所以我拿來測試,大家不要搞我?。I目)
,先來看test_connect協(xié)程

buffer_t和socket_t均是libfuture中定義的,在libfuture.h頭文件中引入,
BUF_LEN是一個宏,被定義為10240,

open_connection是一個用于打開一個連接的協(xié)程,在連接成功之前會一直掛起當連接成功后會恢復執(zhí)行,返回值為是否連接成功,由于是模擬http的請求,要發(fā)送的字符串為下

然后開始發(fā)送

buffer.push,見名知義,往緩沖區(qū)中推入數(shù)據(jù),然后使用buffer_write發(fā)送數(shù)據(jù),返回值為是否超時,因為我加上了超時的機制,同樣,在消息發(fā)送出去前會一直掛起,

然后把緩沖區(qū)清空,然后再讀取,同樣流程,然后如何返回后有數(shù)據(jù),將緩沖區(qū)的最后一位設置為字符串結尾,這個大家都應該知道吧,然后打印出來,具體流程就是這樣,在上下的切換中回去執(zhí)行其他的協(xié)程,因為在調(diào)度器中我加入了10個協(xié)程,以下是結果

瞬間所有請求處理完畢,看右邊的拉條,10次的請求你全部打印出來了(畢竟是自己的服務器,沒有防護。。。),至于為什么亂碼,不用說,windows控制臺gbk編碼。看到這里,各位看官老爺是不是很有想法了呢,
繼續(xù),拿出一個服務器的栗子
#include "libfuture.h"
#include <iostream>
#include <sstream>
#include <string>
using namespace std;
using namespace libfuture;
#define BUF_LEN 10240
future_t<> test_send_and_recv(socket_t* client_socket, string addr)
{
buffer_t buffer(BUF_LEN + 1);
while (true)
{
buffer.clear();
//超時時間為5秒
bool is_timeout = co_await buffer_read(&buffer, client_socket, 5s);
if (is_timeout)
{
cout << "讀取超時" << endl;
break;
}
if (buffer.has_data())
{
//防止燙燙燙燙燙燙燙燙燙燙燙燙燙或屯屯屯屯屯屯屯屯屯屯屯屯屯屯
int len = buffer.data_len();
if (len > BUF_LEN)
len = BUF_LEN;
buffer.data()[len] = 0;
cout << "recv from " << addr << ":" << buffer.data() << endl;
//超時時間為5秒
bool is_timeout = co_await buffer_write(&buffer, client_socket, 5000ms);
if (is_timeout)
{
cout << "發(fā)送超時" << endl;
break;
}
}
else
{
client_socket->close();
break;
}
}
cout << "client leave" << endl;
delete client_socket;
co_return;
}
future_t<> test_accept()
{
socket_t* client_socket = nullptr;
while (true)
{
client_socket = new socket_t();
//在接收到客戶端之前會一直掛起
sockaddr_in* client_addr = co_await open_accept(client_socket);
stringstream ss;
ss << inet_ntoa(client_addr->sin_addr) << ":";
ss << ntohs(client_addr->sin_port);
cout << ss.str() << " join" << endl;
//開啟一個協(xié)程來處理這個socket的接收和發(fā)送數(shù)據(jù)
cpp test_send_and_recv(client_socket, ss.str());
}
co_return;
}
int main(int argc, char** argv)
{
#ifdef _WIN32
WSADATA _data;
WSAStartup(MAKEWORD(2, 2), &_data);
#endif
auto sche = current_scheduler();
socket_t* listen_socket = new socket_t(AF_INET, SOCK_STREAM, 0);
listen_socket->reuse_addr();
listen_socket->bind(8000, "127.0.0.1");
listen_socket->listen(128);
sche->set_init_sockfd(listen_socket->sockfd());
//要成為一個服務端必須要設置一個監(jiān)聽套接字進行初始化
sche->init();
//開啟一個協(xié)程
cpp test_accept();
sche->run_until_no_task();
#ifdef _WIN32
WSACleanup();
#endif
return 0;
}
流程我就不再講了,我直接運行走起,要說明的只有一點,要成為一個服務端,要設置一個監(jiān)聽套接字然后初始化,對,是因為坑*的windows,iocp簡直不是人
再后來我直接打開兩個telnet

用過windows的telnet的都知道windows的telnet每按一下就會發(fā)送出去,,,
都開始連接

都開始進入狀態(tài)了,現(xiàn)在我連個客戶端都可以發(fā)送信息,無堵塞,注意,我這個程序是單線程的,但使用協(xié)程方式的異步io就是
很強

后面我還沒寫到這里,直接就超時了,因為我代碼里寫的超時都是5秒,,,我直接改成100秒然后編譯運行開始連接

開始異步收發(fā)消息

關掉telnet后,也是會提示退出,同樣,buffer那一行提示可以忽視啊,我還沒完善錯誤打印

我給大家準備好了本地下載地址
testlibfuture的代碼
libfuture0.0.6代碼
解壓后的sample文件夾中有所有的栗子

最后
我覺得一個協(xié)程庫要具備一個簡單聲明協(xié)程的方式,還有就是要有一個處理所有協(xié)程的調(diào)度器,協(xié)程能夠直接調(diào)用另外一個協(xié)程,libfuture是能做到的,基于這個調(diào)度器要實現(xiàn)休眠協(xié)程,數(shù)據(jù)的協(xié)程,協(xié)程鎖,要讓用戶能將自己寫的協(xié)程嵌入這個調(diào)度器,實在是python的協(xié)程庫實實在在的做到了這一點,但是要在c++中實現(xiàn)這些,真的是無比困難,只有一步一步的探索。
文件夾下有所有的栗子代碼,可以一步一步調(diào)試,協(xié)程是如何創(chuàng)建的,調(diào)度器是怎么執(zhí)行的;說明,linux上也可以直接編譯使用的,我用的是gcc10.2.0,makefile直接在src下跟源代碼一個路徑,好了,到此為止,第一次寫文,亂七八糟,敬請見諒。
到此這篇關于詳解c++20協(xié)程如何使用的文章就介紹到這了,更多相關c++20協(xié)程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Matlab實現(xiàn)統(tǒng)計集合中各元素出現(xiàn)次數(shù)的示例代碼
統(tǒng)計數(shù)組中各個元素數(shù)量是一個很常用的功能,本文主要為大家介紹了如何利用Matlab優(yōu)雅的統(tǒng)計集合中各元素出現(xiàn)的次數(shù),感興趣的可以了解一下2022-05-05
Visual Studio Code (VSCode) 配置搭建 C/C++ 開發(fā)編譯環(huán)境的流程
記得N年前剛開始接觸編程時,使用的是Visual C++6.0,下面這個可愛的圖標很多人一定很熟悉。不過今天想嘗鮮新的工具 Visual Studio Code 來搭建C/C++開發(fā)環(huán)境,感興趣的朋友一起看看吧2021-09-09
C++中priority_queue模擬實現(xiàn)的代碼示例
在c++語言中數(shù)據(jù)結構中的堆結構可以通過STL庫中的priority_queue 優(yōu)先隊列來實現(xiàn),這樣做極大地簡化了我們的工作量,這篇文章主要給大家介紹了關于C++中priority_queue模擬實現(xiàn)的相關資料,需要的朋友可以參考下2021-08-08

