GoLang協(xié)程庫libtask學(xué)習(xí)筆記
協(xié)程解決了什么問題
我們先從一次網(wǎng)絡(luò)IO請(qǐng)求過程中的read操作為例,請(qǐng)求數(shù)據(jù)會(huì)先拷貝到系統(tǒng)內(nèi)核空間中,再從操作系統(tǒng)的內(nèi)核空間拷貝到應(yīng)用程序的用戶空間中。從內(nèi)核空間將數(shù)據(jù)拷貝到用戶空間過程中,會(huì)經(jīng)歷兩個(gè)階段:
- 等待數(shù)據(jù)準(zhǔn)備
- 拷貝數(shù)據(jù)
因?yàn)橛羞@兩個(gè)階段,所以就有了各種網(wǎng)絡(luò)IO的模型:
同步編程:應(yīng)用程序等待IO結(jié)果(比如等待打開一個(gè)大的文件,或者等待遠(yuǎn)端服務(wù)器的響應(yīng)),阻塞當(dāng)前線程。
- 優(yōu)點(diǎn):邏輯簡(jiǎn)單。
- 缺點(diǎn):效率太低,其他與IO無關(guān)的業(yè)務(wù)也要等待IO的響應(yīng)。
異步多線程/進(jìn)程:將IO操作頻繁的邏輯、或者單純的IO操作獨(dú)立到一/多個(gè)線程中,業(yè)務(wù)線程與IO線程間靠通信/全局變量來共享數(shù)據(jù)。
- 優(yōu)點(diǎn):充分利用CPU資源,防止阻塞資源。
- 缺點(diǎn):線程切換代價(jià)相對(duì)較高,異步邏輯代碼復(fù)雜。
異步消息+回調(diào)函數(shù):設(shè)計(jì)一個(gè)消息循環(huán)處理器,接收外部消息(包括系統(tǒng)通知和網(wǎng)絡(luò)報(bào)文等),收到消息時(shí)調(diào)用注冊(cè)的回調(diào)函數(shù)。
- 優(yōu)點(diǎn):充分利用CPU資源,防止阻塞資源。
- 缺點(diǎn):代碼邏輯復(fù)雜。
而協(xié)程,就是用同步的語義去解決異步問題,即業(yè)務(wù)邏輯看起來是同步的,但實(shí)際上并不阻塞當(dāng)前線程(一般是靠事件循環(huán)處理來分發(fā)消息)。所以協(xié)程實(shí)際上是在單線程的環(huán)境下實(shí)現(xiàn)的應(yīng)用程序級(jí)別的并發(fā),就是把本來由操作系統(tǒng)控制的切換+保存狀態(tài)在應(yīng)用程序里面實(shí)現(xiàn)了。
由于協(xié)程在應(yīng)用程序級(jí)別來處理任務(wù),所以協(xié)程更像是一個(gè)函數(shù),只是比普通的函數(shù)多了兩個(gè)動(dòng)作:yield()和resume(),即讓出和恢復(fù)。讓出的時(shí)候我們需要將寄存器的協(xié)程上下文保存起來,恢復(fù)的時(shí)候再將上下文重新壓入寄存器,繼續(xù)執(zhí)行。
簡(jiǎn)介
Libtask 是一個(gè)簡(jiǎn)單的協(xié)程庫,它展示了最簡(jiǎn)單的一種協(xié)程實(shí)現(xiàn)方式。操作系統(tǒng)只能看見一個(gè)內(nèi)核線程,無法感知到客戶端協(xié)程的存在。
Libtask中的協(xié)程是協(xié)作式的,也就是說,使用的不是時(shí)間片輪轉(zhuǎn)算法,調(diào)度器根據(jù)先來先服務(wù)的策略來執(zhí)行就緒隊(duì)列中的協(xié)程,只有當(dāng)每個(gè)協(xié)程主動(dòng)退出時(shí)調(diào)度器才會(huì)把CPU分配給下一個(gè)協(xié)程。
對(duì)協(xié)程的抽象
在libtask中,協(xié)程被抽象成一個(gè)Task結(jié)構(gòu)體,結(jié)構(gòu)體中的字段用于描述協(xié)程的相關(guān)信息:
// 一個(gè)Task可以看成是一個(gè)需要異步執(zhí)行的任務(wù),coroutine的抽象描述
struct Task
{
char name[256];
char state[256];
// 前后指針
Task *next;
Task *prev;
Task *allnext;
Task *allprev;
// 執(zhí)行上下文
Context context;
// 睡眠時(shí)間
uvlong alarmtime;
uint id;
// 協(xié)程棧指針
uchar *stk;
// 協(xié)程棧大小
uint stksize;
// 協(xié)程是否退出了
int exiting;
// 在在alltask的中的索引下標(biāo)
int alltaskslot;
// 是否是系統(tǒng)協(xié)程
int system;
// 是否在就緒狀態(tài)
int ready;
// Task需要執(zhí)行的函數(shù)
void (*startfn)(void*);
// startfn的參數(shù)
void *startarg;
// 自定義數(shù)據(jù)
void *udata;
};
創(chuàng)建協(xié)程
int taskcreate(void (*fn)(void*), void *arg, uint stack)
{
int id;
Task *t;
// 分配task和stack的空間
t = taskalloc(fn, arg, stack);
// 協(xié)程的數(shù)量+1
taskcount++;
id = t->id;
if(nalltask%64 == 0){
alltask = realloc(alltask, (nalltask+64)*sizeof(alltask[0]));
if(alltask == nil){
fprint(2, "out of memory\n");
abort();
}
}
// 記錄位置
t->alltaskslot = nalltask;
// 保存到alltask中
alltask[nalltask++] = t;
// 修改狀態(tài)為就緒,可以被調(diào)度,并且加入到就緒隊(duì)列
taskready(t);
return id;
}
我們可以使用taskcreate函數(shù)來創(chuàng)建協(xié)程,在taskcreate函數(shù)中,首先會(huì)調(diào)用taskalloc函數(shù)為Task和執(zhí)行棧分配內(nèi)存,然后初始化協(xié)程的上下文信息,在此之后,一個(gè)協(xié)程就被創(chuàng)建成功了。
/*
* taskalloc函數(shù)的主要邏輯是申請(qǐng)Task結(jié)構(gòu)體所需的內(nèi)存和執(zhí)行時(shí)棧的內(nèi)存,
* 然后初始化各個(gè)字段。在此之后,一個(gè)協(xié)程就被創(chuàng)建成功了,接著執(zhí)行taskready
* 把協(xié)程加入就緒隊(duì)列中。
* */
static Task*
taskalloc(void (*fn)(void*), void *arg, uint stack)
{
Task *t;
sigset_t zero;
uint x, y;
ulong z;
/* allocate the task and stack together */
// 結(jié)構(gòu)體本身的大小和棧大小
// 協(xié)程棧大小是256*1024
t = malloc(sizeof *t+stack);
if(t == nil){
fprint(2, "taskalloc malloc: %r\n");
abort();
}
memset(t, 0, sizeof *t);
// 棧的內(nèi)存位置
t->stk = (uchar*)(t+1);
// 棧大小
t->stksize = stack;
// 協(xié)程id
t->id = ++taskidgen;
// 協(xié)程工作函數(shù)和參數(shù)
t->startfn = fn;
t->startarg = arg;
/* do a reasonable initialization */
memset(&t->context.uc, 0, sizeof t->context.uc);
sigemptyset(&zero);
// 初始化uc_sigmask字段為空,即不阻塞信號(hào)
sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
/* must initialize with current context */
// 初始化uc字段
if(getcontext(&t->context.uc) < 0){
fprint(2, "getcontext: %r\n");
abort();
}
/* call makecontext to do the real work. */
/* leave a few words open on both ends */
// 設(shè)置協(xié)程執(zhí)行時(shí)的棧位置和大小
t->context.uc.uc_stack.ss_sp = t->stk+8;
t->context.uc.uc_stack.ss_size = t->stksize-64;
#if defined(__sun__) && !defined(__MAKECONTEXT_V2_SOURCE) /* sigh */
#warning "doing sun thing"
/* can avoid this with __MAKECONTEXT_V2_SOURCE but only on SunOS 5.9 */
t->context.uc.uc_stack.ss_sp =
(char*)t->context.uc.uc_stack.ss_sp
+t->context.uc.uc_stack.ss_size;
#endif
/*
* All this magic is because you have to pass makecontext a
* function that takes some number of word-sized variables,
* and on 64-bit machines pointers are bigger than words.
*/
//print("make %p\n", t);
z = (ulong)t;
y = z;
z >>= 16; /* hide undefined 32-bit shift from 32-bit compilers */
x = z>>16;
// 保存信息到uc字段
makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
return t;
}
創(chuàng)建好一個(gè)協(xié)程之后,taskcreate函數(shù)會(huì)調(diào)用taskready()函數(shù)把協(xié)程的狀態(tài)修改為就緒態(tài),并加入到就緒隊(duì)列中。
/*
* 修改協(xié)程的狀態(tài)并加入就緒隊(duì)列
* */
void taskready(Task *t)
{
t->ready = 1;
addtask(&taskrunqueue, t);
}
如何保存上下文信息
我們可以發(fā)現(xiàn),在調(diào)用taskalloc函數(shù)初始化協(xié)程的時(shí)候,我們還會(huì)對(duì)協(xié)程的上下文進(jìn)行初始化,以下代碼的流程是將當(dāng)前CPU寄存器的上下文信息保存到當(dāng)前Task的上下文中,同時(shí)將當(dāng)前Task的棧位置和大小保存進(jìn)上下文中,最后將協(xié)程的工作函數(shù)保存到上下文信息中。
// 將上下文置為零值
memset(&t->context.uc, 0, sizeof t->context.uc);
// 將信號(hào)集zero清空
sigemptyset(&zero);
// 初始化uc_sigmask字段為空,即不阻塞信號(hào)
sigprocmask(SIG_BLOCK, &zero, &t->context.uc.uc_sigmask);
// 將當(dāng)前上下文信息保存到t-context.uc結(jié)構(gòu)體中
if(getcontext(&t->context.uc) < 0){
fprint(2, "getcontext: %r\n");
abort();
}
// 設(shè)置協(xié)程執(zhí)行時(shí)的棧位置和大小
t->context.uc.uc_stack.ss_sp = t->stk+8;
t->context.uc.uc_stack.ss_size = t->stksize-64;
z = (ulong)t;
y = z;
z >>= 16;
x = z>>16;
// 設(shè)置協(xié)程的工作函數(shù)到上下文信息中
makecontext(&t->context.uc, (void(*)())taskstart, 2, y, x);
ucontext族函數(shù)
其實(shí)對(duì)協(xié)程上下文的初始化以及保存是通過linux下的ucontext族函數(shù)來實(shí)現(xiàn)的。
ucontext_t結(jié)構(gòu)體
我們發(fā)現(xiàn)Task結(jié)構(gòu)體中有一個(gè)Context字段,這個(gè)字段其實(shí)就是對(duì)ucontext_t結(jié)構(gòu)體的封裝,ucontext_t結(jié)構(gòu)體用于保存當(dāng)前的上下文信息,它的結(jié)構(gòu)是這樣的:
typedef struct ucontext
{
unsigned long int uc_flags;
struct ucontext *uc_link;//后序上下文
__sigset_t uc_sigmask;// 信號(hào)屏蔽字掩碼
stack_t uc_stack;// 上下文所使用的棧
// 保存的上下文的寄存器信息
// 比如pc、sp、bp
// pc程序計(jì)數(shù)器:記錄下一條指令的地址
// sp堆棧指針:指向函數(shù)調(diào)用棧棧頂?shù)闹羔?,所以新?shù)據(jù)入棧將存入sp+1的地址
// bp基址指針:指向函數(shù)調(diào)用棧的首地址
mcontext_t uc_mcontext;
long int uc_filler[5];
} ucontext_t;
//其中mcontext_t 定義如下
typedef struct
{
gregset_t __ctx(gregs);//所裝載寄存器
fpregset_t __ctx(fpregs);//寄存器的類型
} mcontext_t;
//其中g(shù)regset_t 定義如下
typedef greg_t gregset_t[NGREG];//包括了所有的寄存器的信息
getcontext()函數(shù)
函數(shù)原型:
int getcontext(ucontext_t* ucp)
getcontext()函數(shù)的底層是通過匯編來實(shí)現(xiàn)的,其主要的功能是將當(dāng)前運(yùn)行到的寄存器信息保存到參數(shù)ucp中。
setcontext()函數(shù)
函數(shù)原型:
int setcontext(const ucontext_t *ucp)
setcontext()函數(shù)的作用是將ucontext_t結(jié)構(gòu)體變量ucp中的上下文信息重新恢復(fù)到cpu中并執(zhí)行。
makecontext()函數(shù)
函數(shù)原型:
void makecontext(ucontext_t *ucp, void (*func)(), int argc, ...)
makecontext()函數(shù)的主要功能是設(shè)置協(xié)程的工作函數(shù)到上下文(ucontext_t)中,同時(shí)在用戶設(shè)置的棧上保存一些信息,并且設(shè)置棧頂指針的值到上下文信息中。
argc是入口函數(shù)的參數(shù)個(gè)數(shù),后面的...是具體的入口函數(shù)參數(shù),該參數(shù)必須是整形值。
swapcontext()函數(shù)
函數(shù)原型:
int swapcontext(ucontext_t *oucp, ucontext_t *ucp)
該函數(shù)可以將當(dāng)前cpu中的上下文信息保存到oucp結(jié)構(gòu)體變量中,然后將ucp結(jié)構(gòu)體的上下文信息恢復(fù)到cpu中。
這里可以理解為調(diào)用了兩個(gè)函數(shù),第一次是調(diào)用了getcontext(oucp)然后再調(diào)用setcontext(ucp)。
協(xié)程的調(diào)度
在使用taskcreate創(chuàng)建協(xié)程的時(shí)候,這個(gè)函數(shù)內(nèi)部會(huì)調(diào)用taskready函數(shù)修改新建協(xié)程的狀態(tài)并加入就緒隊(duì)列taskrunqueue中,taskrunqueue中的協(xié)程需要一個(gè)調(diào)度器來調(diào)度執(zhí)行。
tasklib庫中實(shí)現(xiàn)了一個(gè)協(xié)程調(diào)度中心的函數(shù)。調(diào)度中心會(huì)不斷的從就緒隊(duì)列中取出協(xié)程來執(zhí)行,它的核心邏輯是這樣的:
- 從就緒隊(duì)列中拿出一個(gè)協(xié)程t,并把t移出就緒隊(duì)列。
- 通過
contextswitch函數(shù)將協(xié)程t的上下文信息切換到taskschedcontext中執(zhí)行。 - 將協(xié)程t切換回調(diào)度中心,如果t已經(jīng)退出,修改數(shù)據(jù)結(jié)構(gòu),然后回收他的內(nèi)存,然后繼續(xù)調(diào)度其它的協(xié)程執(zhí)行。這里的調(diào)度機(jī)制比較簡(jiǎn)單,是非搶占式的協(xié)作式調(diào)度,沒有時(shí)間片的概念,一個(gè)協(xié)程的執(zhí)行時(shí)間由自己決定,放棄執(zhí)行的權(quán)力也是自己控制的,當(dāng)協(xié)程不想執(zhí)行了可以調(diào)用
taskyield()函數(shù)讓出cpu。
static void taskscheduler(void)
{
int i;
Task *t;
taskdebug("scheduler enter");
for(;;){
// 如果沒有就緒態(tài)協(xié)程了,就退出
if(taskcount == 0)
exit(taskexitval);
// 從就緒隊(duì)列中拿出一個(gè)協(xié)程
t = taskrunqueue.head;
if(t == nil){
fprint(2, "no runnable tasks! %d tasks stalled\n", taskcount);
exit(1);
}
// 從就緒隊(duì)列中刪除這個(gè)協(xié)程
deltask(&taskrunqueue, t);
// 將協(xié)程狀態(tài)改為非就緒態(tài)
t->ready = 0;
// 保存正在執(zhí)行的協(xié)程
taskrunning = t;
// 切換次數(shù)+1
tasknswitch++;
taskdebug("run %d (%s)", t->id, t->name);
// 切換到t執(zhí)行,將當(dāng)前cpu中的上下文信息保存到taskschedcontext中
// 然后將t->context中的上下文信息恢復(fù)到cpu中執(zhí)行
contextswitch(&taskschedcontext, &t->context);
// 執(zhí)行結(jié)束
taskrunning = nil;
// 剛才執(zhí)行的協(xié)程t退出了
if(t->exiting){
// 如果不是系統(tǒng)協(xié)程,協(xié)程個(gè)數(shù)減一
if(!t->system)
taskcount--;
// 保存當(dāng)前協(xié)程在alltask的索引
i = t->alltaskslot;
// 將最后一個(gè)協(xié)程切換到當(dāng)前協(xié)程的位置,因?yàn)楫?dāng)前協(xié)程要退出了
alltask[i] = alltask[--nalltask];
// 更新被置換協(xié)程的索引
alltask[i]->alltaskslot = i;
// 釋放堆內(nèi)存
free(t);
}
}
}
從上述調(diào)度器執(zhí)行的代碼中可以發(fā)現(xiàn),我們使用contextswitch函數(shù)實(shí)現(xiàn)了協(xié)程間的上下文切換,這個(gè)函數(shù)的內(nèi)部調(diào)用了swapcontext(&from->uc, &to->uc)函數(shù),這個(gè)函數(shù)將當(dāng)前cpu中的上下文信息保存到from結(jié)構(gòu)體變量中,然后將to結(jié)構(gòu)體的上下文信息恢復(fù)到cpu中執(zhí)行。
執(zhí)行結(jié)束之后,會(huì)重新將上下文切換回調(diào)度中心。
static void contextswitch(Context *from, Context *to)
{
if(swapcontext(&from->uc, &to->uc) < 0){
fprint(2, "swapcontext failed: %r\n");
assert(0);
}
}
其實(shí)我們還可以通過調(diào)用taskyield函數(shù)來控制協(xié)程在沒有執(zhí)行完就主動(dòng)讓出,那么當(dāng)前正在執(zhí)行的task會(huì)被 插入就緒隊(duì)列的尾部,等待后續(xù)的調(diào)度,然后調(diào)度器會(huì)從就緒隊(duì)列的頭部重新取出一個(gè)task來執(zhí)行。
/*
* 協(xié)程主動(dòng)讓出CPU
* 1.將主動(dòng)讓出的協(xié)程重新加入到就緒隊(duì)列
* 2.將當(dāng)前協(xié)程的狀態(tài)標(biāo)記為讓出
* 3.執(zhí)行協(xié)程切換的邏輯
* */
int taskyield(void)
{
int n;
// 協(xié)程的讓出次數(shù)
n = tasknswitch;
// 將當(dāng)前主動(dòng)讓出的協(xié)程放進(jìn)等待隊(duì)列
taskready(taskrunning);
// 標(biāo)記當(dāng)前協(xié)程的狀態(tài)為“讓出”
taskstate("yield");
// 切換協(xié)程
taskswitch();
// 等于0說明當(dāng)前只有自己一個(gè)協(xié)程,調(diào)度的時(shí)候taskswitch加1,所以這里要減1
return tasknswitch - n - 1;
}
我們可以發(fā)現(xiàn),切換流程的時(shí)候?qū)嶋H上是調(diào)用了taskswitch()函數(shù),這個(gè)函數(shù)內(nèi)部會(huì)調(diào)用contextswitch函數(shù)來切換上下文。
/*
* 切換協(xié)程
* */
void taskswitch(void)
{
needstack(0);
// 將當(dāng)前CPU中的上下文信息保存到taskrunning->context結(jié)構(gòu)體中
// 然后將調(diào)度中心上下文恢復(fù)到CPU中執(zhí)行
contextswitch(&taskrunning->context, &taskschedcontext);
}
總結(jié)
所以整個(gè)調(diào)度流程是這樣的:
每一個(gè)協(xié)程對(duì)應(yīng)一個(gè)Task結(jié)構(gòu)體。然后調(diào)度中心不斷地按照先進(jìn)先出的方式去調(diào)度協(xié)程的執(zhí)行就可以。因?yàn)闆]有搶占機(jī)制,所以調(diào)度中心是依賴協(xié)程本身去驅(qū)動(dòng)的,協(xié)程需要主動(dòng)讓出cpu,把上下文切換回調(diào)度中心,調(diào)度中心才能進(jìn)行下一輪的調(diào)度。
當(dāng)然我們也可以調(diào)用taskyield函數(shù)主動(dòng)讓出CPU,他會(huì)將當(dāng)前正在執(zhí)行的task插入就緒隊(duì)列的尾部,等待后續(xù)的調(diào)度,然后調(diào)度器會(huì)從就緒隊(duì)列的頭部重新取出一個(gè)task來執(zhí)行。
到此這篇關(guān)于GoLang協(xié)程庫libtask學(xué)習(xí)筆記的文章就介紹到這了,更多相關(guān)GoLang libtask內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言學(xué)習(xí)之結(jié)構(gòu)體和方法使用詳解
這篇文章主要為大家詳細(xì)介紹了Go語言中結(jié)構(gòu)體和方法的使用,文中的示例代碼講解詳細(xì),對(duì)我們學(xué)習(xí)Go語言有一定的幫助,需要的可以參考一下2022-04-04
go語言reflect.Type?和?reflect.Value?應(yīng)用示例詳解
這篇文章主要為大家介紹了go語言reflect.Type?和?reflect.Value?應(yīng)用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
Go語言內(nèi)建函數(shù)cap的實(shí)現(xiàn)示例
cap 是一個(gè)常用的內(nèi)建函數(shù),它用于獲取某些數(shù)據(jù)結(jié)構(gòu)的容量,本文主要介紹了Go語言內(nèi)建函數(shù)cap的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-08-08

