欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

C++中的Reactor原理與實現(xiàn)

 更新時間:2022年07月01日 12:18:54   作者:恒者走天下  
reactor設(shè)計模式是event-driven?architecture的一種實現(xiàn)方式,處理多個客戶端并發(fā)的向服務(wù)端請求服務(wù)的場景,每種服務(wù)在服務(wù)端可能由多個方法組成,這篇文章主要介紹了Reactor原理與實現(xiàn),需要的朋友可以參考下

一、Reactor介紹

reactor設(shè)計模式是event-driven architecture的一種實現(xiàn)方式,處理多個客戶端并發(fā)的向服務(wù)端請求服務(wù)的場景。每種服務(wù)在服務(wù)端可能由多個方法組成。reactor會解耦并發(fā)請求的服務(wù)并分發(fā)給對應(yīng)的事件處理器來處理。

中心思想是將所有要處理的I/o事件注冊到一個中心I/o多路復(fù)用器上,同時主線程/進程阻塞在多路復(fù)用器上;一旦有I/o事件到來或是準備就緒(文件描述符或socket可讀、寫),多路復(fù)用器返回并將事先注冊的相應(yīng)l/o事件分發(fā)到對應(yīng)的處理器中。

處理機制為:主程序?qū)⑹录约皩?yīng)事件處理的方法在Reactor上進行注冊, 如果相應(yīng)的事件發(fā)生,Reactor將會主動調(diào)用事件注冊的接口,即 回調(diào)函數(shù).

二、代碼實現(xiàn)

前提準備:1單例模式:單例模式(Singleton Pattern,也稱為單件模式),使用最廣泛的設(shè)計模式之一。其意圖是保證一個類(結(jié)構(gòu)體)僅有一個實例,并提供一個訪問它的全局訪問點,該實例被所有程序模塊共享。
2.回調(diào)函數(shù):把一段可執(zhí)行的代碼像參數(shù)傳遞那樣傳給其他代碼,而這段代碼會在某個時刻被調(diào)用執(zhí)行,這就叫做回調(diào)。

對epoll反應(yīng)堆中結(jié)構(gòu)體定義

/*fd包含的屬性*/
struct nitem { // fd

	int fd;		//要監(jiān)聽的文件描述符

	int status;	//是否在監(jiān)聽:1->在紅黑樹上(監(jiān)聽),0->不在(不監(jiān)聽)
	int events;	//對應(yīng)的監(jiān)聽事件,	EPOLLIN和EPOLLOUT(不同的事件,走不同的回調(diào)函數(shù))
	void *arg;	//指向自己結(jié)構(gòu)體指針
#if 0
	NCALLBACK callback;
#else
	NCALLBACK *readcb;   // epollin
	NCALLBACK *writecb;  // epollout
	NCALLBACK *acceptcb; // epollin
#endif
	unsigned char sbuffer[BUFFER_LENGTH]; //
	int slength;

	unsigned char rbuffer[BUFFER_LENGTH];
	int rlength;
	
};

/*分塊存儲*/
struct itemblock {

	struct itemblock *next;
	struct nitem *items;

};
/*epoll反應(yīng)堆中包括通信的fd以及epoll的(epfd)*/
struct reactor {

	int epfd;
	struct itemblock *head; 

};

單例模式,創(chuàng)建reactor的一個實例

/*單例模式*/
struct reactor *instance = NULL;
int init_reactor(struct reactor *r) {

	if (r == NULL) return -1;

	int epfd = epoll_create(1); //int size
	r->epfd = epfd;

	// fd --> item
	r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
	if (r->head == NULL) {
		close(epfd);
		return -2;
	} 
	memset(r->head, 0, sizeof(struct itemblock));

	r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
	if (r->head->items == NULL) {
		free(r->head);
		close(epfd);
		return -2;
	}
	memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
	
	r->head->next = NULL;
	
	return 0;
}
struct reactor *getInstance(void) { //singleton

	if (instance == NULL) {

		instance = (struct reactor *)malloc(sizeof(struct reactor));
		if (instance == NULL) return NULL;
		memset(instance, 0, sizeof(struct reactor));

		if (0 > init_reactor(instance)) {
			free(instance);
			return NULL;
		}

	}

	return instance;
}

事件注冊

/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/
/*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/
/*fd找到對應(yīng)事件*/
/*驅(qū)動注冊*/
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {

	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	//1
	if (event == READ_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].readcb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLIN;
		
	}
	//2
	else if (event == WRITE_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].writecb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLOUT;
	} 
	//3
	else if (event == ACCEPT_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].acceptcb = cb;	//回調(diào)函數(shù)
		r->head->items[fd].arg = arg;

		ev.events = EPOLLIN;
	}

	ev.data.ptr = &r->head->items[fd];

	/*NOSET_CB 0*/
	if (r->head->items[fd].events == NOSET_CB) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
			return -1;
		}
		r->head->items[fd].events = event;
	} else if (r->head->items[fd].events != event) {

		if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_MOD failed\n");
			return -1;
		}
		r->head->items[fd].events = event;
	}
	
	return 0;
}

回調(diào)函數(shù)書寫

int write_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	
	unsigned char *sbuffer = R->head->items[fd].sbuffer;
	int length = R->head->items[fd].slength;
	int ret = send(fd, sbuffer, length, 0);
	if (ret < length) {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	} else {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	}
	return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	unsigned char *buffer = R->head->items[fd].rbuffer;
	
#if 0 //ET
	int idx = 0, ret = 0;
	while (idx < BUFFER_LENGTH) {
		ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
		if (ret == -1) { 
			break;
		} else if (ret > 0) {
			idx += ret;
		} else {// == 0
			break;
		}
	}
	if (idx == BUFFER_LENGTH && ret != -1) {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	} else if (ret == 0) {
		nreactor_set_event
		//close(fd);
	} else {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
	
#else //LT
	int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
	if (ret == 0) { // fin
		
		nreactor_del_event(fd, NULL, 0, NULL);
		close(fd);
		
	} else if (ret > 0) {
		unsigned char *sbuffer = R->head->items[fd].sbuffer;
		memcpy(sbuffer, buffer, ret);
		R->head->items[fd].slength = ret;
		printf("readcb: %s\n", sbuffer);
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
		
#endif
	
}
// web server 
// ET / LT
int accept_callback(int fd, int event, void *arg) {
	int connfd;
	struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}

監(jiān)聽描述符變化

// accept --> EPOLL
/*epoll_wait監(jiān)聽0*/
int reactor_loop(int listenfd) {

	struct reactor *R = getInstance();	
	
	struct epoll_event events[POLL_SIZE] = {0};
	while (1) {
		int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
		if (nready == -1) {
			continue;
		}

		int i = 0;
		for (i = 0;i < nready;i ++) {
			
			struct nitem *item = (struct nitem *)events[i].data.ptr;
			int connfd = item->fd;

			if (connfd == listenfd) { //
				item->acceptcb(listenfd, 0, NULL);
			} else {
			
				if (events[i].events & EPOLLIN) { //
					item->readcb(connfd, 0, NULL);
				
				} 
				if (events[i].events & EPOLLOUT) {
					item->writecb(connfd, 0, NULL);
		
				}
			}
		}

	}
	return 0;
}

完整代碼實現(xiàn)

#define MAXLNE  4096
#define POLL_SIZE	1024
#define BUFFER_LENGTH		1024
#define MAX_EPOLL_EVENT		1024
#define NOSET_CB	0
#define READ_CB		1
#define WRITE_CB	2
#define ACCEPT_CB	3
/*單例模式*/
typedef int NCALLBACK(int fd, int event, void *arg);
/*fd包含的屬性*/
struct nitem { // fd
	int fd;		//要監(jiān)聽的文件描述符
	int status;	//是否在監(jiān)聽:1->在紅黑樹上(監(jiān)聽),0->不在(不監(jiān)聽)
	int events;	//對應(yīng)的監(jiān)聽事件,	EPOLLIN和EPOLLOUT(不同的事件,走不同的回調(diào)函數(shù))
	void *arg;	//指向自己結(jié)構(gòu)體指針
#if 0
	NCALLBACK callback;
#else
	NCALLBACK *readcb;   // epollin
	NCALLBACK *writecb;  // epollout
	NCALLBACK *acceptcb; // epollin
#endif
	unsigned char sbuffer[BUFFER_LENGTH]; //
	int slength;
	unsigned char rbuffer[BUFFER_LENGTH];
	int rlength;
	
};
/*分塊存儲*/
struct itemblock {
	struct itemblock *next;
	struct nitem *items;
};
/*epoll反應(yīng)堆中包括通信的fd以及epoll的(epfd)*/
struct reactor {
	int epfd;
	struct itemblock *head; 
};
/*初始化結(jié)構(gòu)體*/
int init_reactor(struct reactor *r);
int read_callback(int fd, int event, void *arg);
int write_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg);
/*單例模式*/
struct reactor *instance = NULL;
struct reactor *getInstance(void) { //singleton
	if (instance == NULL) {
		instance = (struct reactor *)malloc(sizeof(struct reactor));
		if (instance == NULL) return NULL;
		memset(instance, 0, sizeof(struct reactor));
		if (0 > init_reactor(instance)) {
			free(instance);
			return NULL;
		}
	}
	return instance;
}
/*nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);*/
/*nreactor_set_event(fd, read_callback, READ_CB, NULL);*/
/*fd找到對應(yīng)事件*/
/*驅(qū)動注冊*/
int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	//1
	if (event == READ_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].readcb = cb;
		r->head->items[fd].arg = arg;
		ev.events = EPOLLIN;
		
	}
	//2
	else if (event == WRITE_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].writecb = cb;
		r->head->items[fd].arg = arg;
		ev.events = EPOLLOUT;
	} 
	//3
	else if (event == ACCEPT_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].acceptcb = cb;	//回調(diào)函數(shù)
		r->head->items[fd].arg = arg;
		ev.events = EPOLLIN;
	}
	ev.data.ptr = &r->head->items[fd];
	/*NOSET_CB 0*/
	if (r->head->items[fd].events == NOSET_CB) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
			return -1;
		}
		r->head->items[fd].events = event;
	} else if (r->head->items[fd].events != event) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_MOD failed\n");
			return -1;
		}
		r->head->items[fd].events = event;
	}
	
	return 0;
}
/*nreactor_del_event(fd, NULL, 0, NULL);*/
/*下樹*/
/*nreactor_del_event(fd, NULL, 0, NULL);*/
int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {
	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	ev.data.ptr = arg;
	epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
	r->head->items[fd].events = 0;
	return 0;
}
int write_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	
	unsigned char *sbuffer = R->head->items[fd].sbuffer;
	int length = R->head->items[fd].slength;
	int ret = send(fd, sbuffer, length, 0);
	if (ret < length) {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	} else {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	}
	return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	unsigned char *buffer = R->head->items[fd].rbuffer;
	
#if 0 //ET
	int idx = 0, ret = 0;
	while (idx < BUFFER_LENGTH) {
		ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
		if (ret == -1) { 
			break;
		} else if (ret > 0) {
			idx += ret;
		} else {// == 0
			break;
		}
	}
	if (idx == BUFFER_LENGTH && ret != -1) {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	} else if (ret == 0) {
		nreactor_set_event
		//close(fd);
	} else {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
	
#else //LT
	int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
	if (ret == 0) { // fin
		
		nreactor_del_event(fd, NULL, 0, NULL);
		close(fd);
		
	} else if (ret > 0) {
		unsigned char *sbuffer = R->head->items[fd].sbuffer;
		memcpy(sbuffer, buffer, ret);
		R->head->items[fd].slength = ret;
		printf("readcb: %s\n", sbuffer);
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
		
#endif
	
}
// web server 
// ET / LT
int accept_callback(int fd, int event, void *arg) {
	int connfd;
	struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}
int init_server(int port) {
	int listenfd;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
 
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);
 
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    if (listen(listenfd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	return listenfd;
}
int init_reactor(struct reactor *r) {
	if (r == NULL) return -1;
	int epfd = epoll_create(1); //int size
	r->epfd = epfd;
	// fd --> item
	r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
	if (r->head == NULL) {
		close(epfd);
		return -2;
	} 
	memset(r->head, 0, sizeof(struct itemblock));
	r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
	if (r->head->items == NULL) {
		free(r->head);
		close(epfd);
		return -2;
	}
	memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
	
	r->head->next = NULL;
	
	return 0;
}
// accept --> EPOLL
/*epoll_wait監(jiān)聽0*/
int reactor_loop(int listenfd) {
	struct reactor *R = getInstance();	
	
	struct epoll_event events[POLL_SIZE] = {0};
	while (1) {
		int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
		if (nready == -1) {
			continue;
		}
		int i = 0;
		for (i = 0;i < nready;i ++) {
			
			struct nitem *item = (struct nitem *)events[i].data.ptr;
			int connfd = item->fd;
			if (connfd == listenfd) { //
				item->acceptcb(listenfd, 0, NULL);
			} else {
			
				if (events[i].events & EPOLLIN) { //
					item->readcb(connfd, 0, NULL);
				
				} 
				if (events[i].events & EPOLLOUT) {
					item->writecb(connfd, 0, NULL);
		
				}
			}
		}
	}
	return 0;
}
int main(int argc, char **argv) 
{
    
 	int  connfd, n;
	int listenfd = init_server(9999);
	nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);
	//nreactor_set_event(listenfd, accept_callback, read_callback, write_callback);
	
	reactor_loop(listenfd);
	 
    return 0;
}

到此這篇關(guān)于Reactor原理與實現(xiàn)的文章就介紹到這了,更多相關(guān)Reactor原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • C++詳細實現(xiàn)完整圖書管理功能

    C++詳細實現(xiàn)完整圖書管理功能

    隨著網(wǎng)絡(luò)技術(shù)的高速發(fā)展,計算機應(yīng)用的普及,利用計算機對圖書館的日常工作進行管理勢在必行,本篇文章涵蓋一個圖書管理系統(tǒng)的全部實現(xiàn)代碼,大家可以查缺補漏,提升水平
    2022-05-05
  • C語言打印輸出楊輝三角

    C語言打印輸出楊輝三角

    這篇文章主要為大家詳細介紹了C語言打印輸出楊輝三角,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-12-12
  • 深入解析設(shè)計模式中的適配器模式在C++中的運用

    深入解析設(shè)計模式中的適配器模式在C++中的運用

    這篇文章主要介紹了設(shè)計模式中的適配器模式在C++中的運用,通常適配器模式可以細分為類適配器和對象適配器兩種情況,需要的朋友可以參考下
    2016-03-03
  • C++實現(xiàn)Date類各種運算符重載的示例代碼

    C++實現(xiàn)Date類各種運算符重載的示例代碼

    這篇文章主要為大家詳細介紹了C++實現(xiàn)Date類各種運算符重載的相關(guān)知識,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2024-02-02
  • 淺析C++中cout的運行機制

    淺析C++中cout的運行機制

    關(guān)于C++中cout的使用,相信大家再熟悉不過了,然而對于cout是如何輸出的?輸出的機制是啥,需要進一步的了解。本章娓娓道來。前幾天在網(wǎng)上看到這么一個題目
    2013-10-10
  • C++實現(xiàn)掃雷程序開發(fā)

    C++實現(xiàn)掃雷程序開發(fā)

    這篇文章主要為大家詳細介紹了C++實現(xiàn)掃雷程序開發(fā),文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2020-07-07
  • C語言實現(xiàn)消消樂游戲的代碼分享

    C語言實現(xiàn)消消樂游戲的代碼分享

    本章我們將編寫十字消除游戲,用戶點擊空白方塊,沿其上下左右方向?qū)ふ业谝粋€彩色方塊,如果有兩個或兩個以上顏色一致,就將其消除,感興趣的可以了解一下
    2023-02-02
  • C語言中設(shè)置用戶識別碼的相關(guān)函數(shù)的簡單講解

    C語言中設(shè)置用戶識別碼的相關(guān)函數(shù)的簡單講解

    這篇文章主要介紹了C語言中設(shè)置用戶識別碼的相關(guān)函數(shù)的簡單講解,包括setuid()函數(shù)和setreuid()函數(shù)以及setfsuid()函數(shù),需要的朋友可以參考下
    2015-08-08
  • 基于Qt的TCP實現(xiàn)通信

    基于Qt的TCP實現(xiàn)通信

    這篇文章主要為大家詳細介紹了基于Qt的TCP實現(xiàn)通信,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-08-08
  • C++移動操作,RVO和NRVO詳細

    C++移動操作,RVO和NRVO詳細

    本文將討論了何時C++會自動進行移動操作,并且說明了復(fù)制消除,RVO和NRVO優(yōu)的化等香瓜吧資料,需要的小伙伴可以參考一下
    2021-09-09

最新評論