muduo源碼分析之TcpServer模塊詳細(xì)介紹
這次我們開(kāi)始muduo源代碼的實(shí)際編寫(xiě),首先我們知道muduo是LT模式,Reactor模式,下圖為Reactor模式的流程圖[來(lái)源1]

然后我們來(lái)看下muduo的整體架構(gòu)[來(lái)源1]

首先muduo有一個(gè)主反應(yīng)堆mainReactor以及幾個(gè)子反應(yīng)堆subReactor,其中子反應(yīng)堆的個(gè)數(shù)由用戶使用setThreadNum函數(shù)設(shè)置,mainReactor中主要有一個(gè)Acceptor,當(dāng)用戶建立新的連接的時(shí)候,Acceptor會(huì)將connfd和對(duì)應(yīng)的事件打包為一個(gè)channel然后采用輪詢的算法,指定將該channel給所選擇的subReactor,以后該subReactor就負(fù)責(zé)該channel的所有工作。
TcpServer類(lèi)
我們按照從上到下的思路進(jìn)行講解,以下內(nèi)容我們按照一個(gè)簡(jiǎn)單的EchoServer的實(shí)現(xiàn)思路來(lái)講解,我們知道當(dāng)我們自己實(shí)現(xiàn)一個(gè)Server的時(shí)候,會(huì)在構(gòu)造函數(shù)中實(shí)例化一個(gè)TcpServer
EchoServer(EventLoop *loop,
const InetAddress &addr,
const std::string &name)
: server_(loop, addr, name)
, loop_(loop)
{
// 注冊(cè)回調(diào)函數(shù)
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
// 設(shè)置合適的loop線程數(shù)量 loopthread 不包括baseloop
server_.setThreadNum(3);
}于是我們?nèi)タ聪?code>TcpServer的構(gòu)造函數(shù)是在干什么
TcpServer::TcpServer(EventLoop *loop,
const InetAddress &listenAddr,
const std::string &nameArg,
Option option)
: loop_(CheckLoopNotNull(loop))
, ipPort_(listenAddr.toIpPort())
, name_(nameArg)
, acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))
, threadPool_(new EventLoopThreadPool(loop, name_))
, connectionCallback_()
, messageCallback_()
, nextConnId_(1)
, started_(0)
{
// 當(dāng)有新用戶連接時(shí)候,會(huì)執(zhí)行該回調(diào)函數(shù)
acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,
std::placeholders::_1, std::placeholders::_2));
}我們只需要關(guān)注acceptor_(new Acceptor(loop, listenAddr, option == kReusePort))和threadPool_(new EventLoopThreadPool(loop, name_))
首先很明確的一點(diǎn),構(gòu)造了一個(gè)Acceptor,我們首先要知道Acceptor主要就是連接新用戶并打包為一個(gè)Channel,所以我們就應(yīng)該知道Acceptor按道理應(yīng)該實(shí)現(xiàn)socket,bind,listen,accept這四個(gè)函數(shù)。
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport)
: loop_(loop), acceptSocket_(createNonblocking()) // socket
,
acceptChannel_(loop, acceptSocket_.fd()), listenning_(false)
{
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(true);
acceptSocket_.bindAddress(listenAddr); // 綁定套接字
// 有新用戶的連接,執(zhí)行一個(gè)回調(diào)(打包為channel)
acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));
}其中Acceptor中有個(gè)acceptSocket_,其實(shí)就是我們平時(shí)所用的listenfd,構(gòu)造函數(shù)中實(shí)現(xiàn)了socket,bind,而其余的兩個(gè)函數(shù)的使用在其余代碼
// 開(kāi)啟服務(wù)器監(jiān)聽(tīng)
void TcpServer::start()
{
// 防止一個(gè)TcpServer被start多次
if (started_++ == 0)
{
threadPool_->start(threadInitCallback_); // 啟動(dòng)底層的loop線程池,這里會(huì)按照設(shè)定了threadnum設(shè)置pool的數(shù)量
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get()));
}
}我們知道,當(dāng)我們?cè)O(shè)置了threadnum之后,就會(huì)有一個(gè)mainloop,那么這個(gè)loop_就是那個(gè)mainloop,其中可以看見(jiàn)這個(gè)loop_就只做一個(gè)事情Acceptor::listen。
void Acceptor::listen()
{
listenning_ = true;
acceptSocket_.listen(); // listen
acceptChannel_.enableReading(); // acceptChannel_ => Poller
}
這里就實(shí)現(xiàn)了listen函數(shù),還有最后一個(gè)函數(shù)accept,我們慢慢向下分析,從代碼可以知道acceptChannel_.enableReading()之后就會(huì)使得這個(gè)listenfd所在的channel對(duì)讀事件感興趣,那什么時(shí)候會(huì)有讀事件呢,就是當(dāng)用戶建立新連接的時(shí)候,那么我們應(yīng)該想一下,那當(dāng)感興趣的事件發(fā)生之后,listenfd應(yīng)該干什么呢,應(yīng)該執(zhí)行一個(gè)回調(diào)函數(shù)呀。注意Acceptor構(gòu)造函數(shù)中有這樣一行代碼acceptChannel_.setReadCallback(std::bind(&Acceptor::handleRead, this));這就是那個(gè)回調(diào),我們?nèi)タ聪?code>handleRead在干嘛。
// listenfd有事件發(fā)生了,就是有新用戶連接了
void Acceptor::handleRead()
{
InetAddress peerAddr;
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
// 若用戶實(shí)現(xiàn)定義了,則執(zhí)行,否則說(shuō)明用戶對(duì)新到來(lái)的連接沒(méi)有需要執(zhí)行的,所以直接關(guān)閉
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); // 輪詢找到subLoop,喚醒,分發(fā)當(dāng)前的新客戶端的Channel
}
else
{
::close(connfd);
}
}
...
}這里是不是就實(shí)現(xiàn)了accept函數(shù),至此當(dāng)用戶建立一個(gè)新的連接時(shí)候,Acceptor就會(huì)得到一個(gè)connfd和其對(duì)應(yīng)的peerAddr返回給mainloop,這時(shí)候我們?cè)谧⒁獾?code>TcpServer構(gòu)造函數(shù)中有這樣一行代碼acceptor_->setNewConnectionCallback(std::bind(&TcpServer::newConnection, this,std::placeholders::_1, std::placeholders::_2));我們給acceptor_設(shè)置了一個(gè)newConnectionCallback_,于是由上面的代碼就可以知道,if (newConnectionCallback_)為真,就會(huì)執(zhí)行這個(gè)回調(diào)函數(shù),于是就會(huì)執(zhí)行TcpServer::newConnection,我們?nèi)タ聪逻@個(gè)函數(shù)是在干嘛。
void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
// 輪詢算法選擇一個(gè)subloop來(lái)管理對(duì)應(yīng)的這個(gè)新連接
EventLoop *ioLoop = threadPool_->getNextLoop();
char buf[64] = {0};
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
std::string connName = name_ + buf;
LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s \n",
name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str());
// 通過(guò)sockfd獲取其綁定的本地ip和端口
sockaddr_in local;
::bzero(&local, sizeof local);
socklen_t addrlen = sizeof local;
if (::getsockname(sockfd, (sockaddr*)&local, &addrlen) < 0)
{
LOG_ERROR("sockets::getLocalAddr");
}
InetAddress localAddr(local);
// 根據(jù)連接成功的sockfd,創(chuàng)建TcpConnection
TcpConnectionPtr conn(new TcpConnection(
ioLoop,
connName,
sockfd, // Socket Channel
localAddr,
peerAddr));
connections_[connName] = conn;
// 下面的回調(diào)時(shí)用戶設(shè)置給TcpServer,TcpServer又設(shè)置給TcpConnection,TcpConnetion又設(shè)置給Channel,Channel又設(shè)置給Poller,Poller通知channel調(diào)用這個(gè)回調(diào)
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
// 設(shè)置了如何關(guān)閉連接的回調(diào)
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)
);
// 直接調(diào)用connectEstablished
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}這里就比較長(zhǎng)了,我先說(shuō)下大概他干了啥事情:首先通過(guò)輪詢找到下一個(gè)subloop,然后將剛剛返回的connfd和對(duì)應(yīng)的peerAddr以及localAddr構(gòu)造為一個(gè)TcpConnection給subloop,然后給這個(gè)conn設(shè)置了一系列的回調(diào)函數(shù),比如讀回調(diào),寫(xiě)回調(diào),斷開(kāi)回調(diào)等等。下一章我們來(lái)說(shuō)下上面的代碼最后幾行在干嘛。
到此這篇關(guān)于muduo源碼分析之TcpServer模塊的文章就介紹到這了,更多相關(guān)muduo TcpServer模塊內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis監(jiān)控工具RedisInsight安裝與使用
這篇文章主要為大家介紹了Redis監(jiān)控工具RedisInsight的安裝步驟與使用方法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03
Redis調(diào)用Lua腳本及使用場(chǎng)景快速掌握
Redis?是一種非常流行的內(nèi)存數(shù)據(jù)庫(kù),常用于數(shù)據(jù)緩存與高頻數(shù)據(jù)存儲(chǔ)。大多數(shù)開(kāi)發(fā)人員可能聽(tīng)說(shuō)過(guò)redis可以運(yùn)行?Lua?腳本,但是可能不知道redis在什么情況下需要使用到Lua腳本2022-03-03
Redis配置文件redis.conf詳細(xì)配置說(shuō)明
本文列出了Redis的配置文件redis.conf的各配置項(xiàng)的詳細(xì)說(shuō)明,簡(jiǎn)單易懂2018-03-03
淺談redis的過(guò)期時(shí)間設(shè)置和過(guò)期刪除機(jī)制
本文主要介紹了redis的過(guò)期時(shí)間設(shè)置和過(guò)期刪除機(jī)制,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
Deepin UOS編譯安裝Redis的實(shí)現(xiàn)步驟
本文主要介紹了Deepin UOS編譯安裝Redis的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
redis使用watch秒殺搶購(gòu)實(shí)現(xiàn)思路
這篇文章主要為大家詳細(xì)介紹了redis使用watch秒殺搶購(gòu)的實(shí)現(xiàn)思路,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02
Redis中緩存和數(shù)據(jù)庫(kù)雙寫(xiě)數(shù)據(jù)不一致的原因及解決方案
這篇文章主要介紹了Redis中緩存和數(shù)據(jù)庫(kù)雙寫(xiě)數(shù)據(jù)不一致的原因及解決方案,文中通過(guò)圖文結(jié)合的方式講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-03-03

