muduo源碼分析之TcpServer模塊詳細(xì)介紹
這次我們開(kāi)始muduo
源代碼的實(shí)際編寫,首先我們知道muduo
是LT
模式,Reactor
模式,下圖為Reactor
模式的流程圖[來(lái)源1]
然后我們來(lái)看下muduo
的整體架構(gòu)[來(lái)源1]
首先muduo
有一個(gè)主反應(yīng)堆mainReactor
以及幾個(gè)子反應(yīng)堆subReactor
,其中子反應(yīng)堆的個(gè)數(shù)由用戶使用setThreadNum
函數(shù)設(shè)置,mainReactor
中主要有一個(gè)Acceptor
,當(dāng)用戶建立新的連接的時(shí)候,Acceptor
會(huì)將connfd
和對(duì)應(yīng)的事件打包為一個(gè)channel
然后采用輪詢的算法,指定將該channel
給所選擇的subReactor
,以后該subReactor
就負(fù)責(zé)該channel
的所有工作。
TcpServer類
我們按照從上到下的思路進(jìn)行講解,以下內(nèi)容我們按照一個(gè)簡(jiǎn)單的EchoServer
的實(shí)現(xiàn)思路來(lái)講解,我們知道當(dāng)我們自己實(shí)現(xiàn)一個(gè)Server
的時(shí)候,會(huì)在構(gòu)造函數(shù)中實(shí)例化一個(gè)TcpServer
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name) : server_(loop, addr, name) , loop_(loop) { // 注冊(cè)回調(diào)函數(shù) server_.setConnectionCallback( std::bind(&EchoServer::onConnection, this, std::placeholders::_1) ); server_.setMessageCallback( std::bind(&EchoServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3) // 設(shè)置合適的loop線程數(shù)量 loopthread 不包括baseloop server_.setThreadNum(3); }
于是我們?nèi)タ聪?code>TcpServer的構(gòu)造函數(shù)是在干什么
TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option) : loop_(CheckLoopNotNull(loop)) , ipPort_(listenAddr.toIpPort()) , name_(nameArg) , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) , threadPool_(new EventLoopThreadPool(loop, name_)) , connectionCallback_() , messageCallback_() , nextConnId_(1) , started_(0) { // 當(dāng)有新用戶連接時(shí)候,會(huì)執(zhí)行該回調(diào)函數(shù) acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }
我們只需要關(guān)注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點(diǎn),構(gòu)造了一個(gè)Acceptor
,我們首先要知道Acceptor
主要就是連接新用戶并打包為一個(gè)Channel
,所以我們就應(yīng)該知道Acceptor
按道理應(yīng)該實(shí)現(xiàn)socket
,bind
,listen
,accept
這四個(gè)函數(shù)。
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_(loop), acceptSocket_(createNonblocking()) // socket , acceptChannel_(loop, acceptSocket_.fd()), listenning_(false) { acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(true); acceptSocket_.bindAddress(listenAddr); // 綁定套接字 // 有新用戶的連接,執(zhí)行一個(gè)回調(diào)(打包為channel) acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this)); }
其中Acceptor
中有個(gè)acceptSocket_
,其實(shí)就是我們平時(shí)所用的listenfd
,構(gòu)造函數(shù)中實(shí)現(xiàn)了socket
,bind
,而其余的兩個(gè)函數(shù)的使用在其余代碼
// 開(kāi)啟服務(wù)器監(jiān)聽(tīng) void TcpServer::start() { // 防止一個(gè)TcpServer被start多次 if (started_++ == 0) { threadPool_->start(threadInitCallback_); // 啟動(dòng)底層的loop線程池,這里會(huì)按照設(shè)定了threadnum設(shè)置pool的數(shù)量 loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())); } }
我們知道,當(dāng)我們?cè)O(shè)置了threadnum
之后,就會(huì)有一個(gè)mainloop
,那么這個(gè)loop_
就是那個(gè)mainloop
,其中可以看見(jiàn)這個(gè)loop_
就只做一個(gè)事情Acceptor::listen
。
void Acceptor::listen() { listenning_ = true; acceptSocket_.listen(); // listen acceptChannel_.enableReading(); // acceptChannel_ => Poller }
這里就實(shí)現(xiàn)了listen
函數(shù),還有最后一個(gè)函數(shù)accept
,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()
之后就會(huì)使得這個(gè)listenfd
所在的channel
對(duì)讀事件感興趣,那什么時(shí)候會(huì)有讀事件呢,就是當(dāng)用戶建立新連接的時(shí)候,那么我們應(yīng)該想一下,那當(dāng)感興趣的事件發(fā)生之后,listenfd
應(yīng)該干什么呢,應(yīng)該執(zhí)行一個(gè)回調(diào)函數(shù)呀。注意Acceptor
構(gòu)造函數(shù)中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
這就是那個(gè)回調(diào),我們?nèi)タ聪?code>handleRead在干嘛。
// listenfd有事件發(fā)生了,就是有新用戶連接了 void Acceptor::handleRead() { InetAddress peerAddr; int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { // 若用戶實(shí)現(xiàn)定義了,則執(zhí)行,否則說(shuō)明用戶對(duì)新到來(lái)的連接沒(méi)有需要執(zhí)行的,所以直接關(guān)閉 if (newConnectionCallback_) { newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發(fā)當(dāng)前的新客戶端的Channel } else { ::close(connfd); } } ... }
這里是不是就實(shí)現(xiàn)了accept
函數(shù),至此當(dāng)用戶建立一個(gè)新的連接時(shí)候,Acceptor
就會(huì)得到一個(gè)connfd
和其對(duì)應(yīng)的peerAddr
返回給mainloop
,這時(shí)候我們?cè)谧⒁獾?code>TcpServer構(gòu)造函數(shù)中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));
我們給acceptor_
設(shè)置了一個(gè)newConnectionCallback_
,于是由上面的代碼就可以知道,if (newConnectionCallback_)
為真,就會(huì)執(zhí)行這個(gè)回調(diào)函數(shù),于是就會(huì)執(zhí)行TcpServer::newConnection
,我們?nèi)タ聪逻@個(gè)函數(shù)是在干嘛。
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) { // 輪詢算法選擇一個(gè)subloop來(lái)管理對(duì)應(yīng)的這個(gè)新連接 EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[64] = {0}; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; std::string connName = name_ + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); // 通過(guò)sockfd獲取其綁定的本地ip和端口 sockaddr_in local; ::bzero(&local, sizeof local); socklen_t addrlen = sizeof local; if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0) { LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); // 根據(jù)連接成功的sockfd,創(chuàng)建TcpConnection TcpConnectionPtr conn(new TcpConnection( ioLoop, connName, sockfd, // Socket Channel localAddr, peerAddr)); connections_[connName] = conn; // 下面的回調(diào)時(shí)用戶設(shè)置給TcpServer,TcpServer又設(shè)置給TcpConnection,TcpConnetion又設(shè)置給Channel,Channel又設(shè)置給Poller,Poller通知channel調(diào)用這個(gè)回調(diào) conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); // 設(shè)置了如何關(guān)閉連接的回調(diào) conn->setCloseCallback( std::bind(&TcpServer::removeConnection, this, std::placeholders::_1) ); // 直接調(diào)用connectEstablished ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); }
這里就比較長(zhǎng)了,我先說(shuō)下大概他干了啥事情:首先通過(guò)輪詢找到下一個(gè)subloop
,然后將剛剛返回的connfd
和對(duì)應(yīng)的peerAddr
以及localAddr
構(gòu)造為一個(gè)TcpConnection
給subloop
,然后給這個(gè)conn
設(shè)置了一系列的回調(diào)函數(shù),比如讀回調(diào),寫回調(diào),斷開(kāi)回調(diào)等等。下一章我們來(lái)說(shuō)下上面的代碼最后幾行在干嘛。
到此這篇關(guān)于muduo源碼分析之TcpServer模塊的文章就介紹到這了,更多相關(guān)muduo TcpServer模塊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis監(jiān)控工具RedisInsight安裝與使用
這篇文章主要為大家介紹了Redis監(jiān)控工具RedisInsight的安裝步驟與使用方法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03Redis調(diào)用Lua腳本及使用場(chǎng)景快速掌握
Redis?是一種非常流行的內(nèi)存數(shù)據(jù)庫(kù),常用于數(shù)據(jù)緩存與高頻數(shù)據(jù)存儲(chǔ)。大多數(shù)開(kāi)發(fā)人員可能聽(tīng)說(shuō)過(guò)redis可以運(yùn)行?Lua?腳本,但是可能不知道redis在什么情況下需要使用到Lua腳本2022-03-03Redis配置文件redis.conf詳細(xì)配置說(shuō)明
本文列出了Redis的配置文件redis.conf的各配置項(xiàng)的詳細(xì)說(shuō)明,簡(jiǎn)單易懂2018-03-03淺談redis的過(guò)期時(shí)間設(shè)置和過(guò)期刪除機(jī)制
本文主要介紹了redis的過(guò)期時(shí)間設(shè)置和過(guò)期刪除機(jī)制,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03Deepin UOS編譯安裝Redis的實(shí)現(xiàn)步驟
本文主要介紹了Deepin UOS編譯安裝Redis的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01redis使用watch秒殺搶購(gòu)實(shí)現(xiàn)思路
這篇文章主要為大家詳細(xì)介紹了redis使用watch秒殺搶購(gòu)的實(shí)現(xiàn)思路,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02Redis中緩存和數(shù)據(jù)庫(kù)雙寫數(shù)據(jù)不一致的原因及解決方案
這篇文章主要介紹了Redis中緩存和數(shù)據(jù)庫(kù)雙寫數(shù)據(jù)不一致的原因及解決方案,文中通過(guò)圖文結(jié)合的方式講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-03-03