Tornado源碼分析之HTTP服務請求解析
listen fd的讀事件回調
代碼版本 tornado1.2版本下httpserver.py
Tornado定義類HTTPServer來表示一個HTTP服務器,該類在構造函數中會傳入事件循環(huán)ioloop,和Application對象。同時該HTTPServer提供了如下幾種方法:
- listen() 表示該Server的監(jiān)聽方法,調用該方法時,通過調用bind已經將套接字設置成non-blocking,并使用socket.SO_REUSEADDR。
- bind() 方法表示Server綁定端口,并設置socket 為non-blocking.
- start() 方法則是在ioloop中啟動該服務器。在不給start()方法傳入任何參數的情況下,使用單進程模型的IOLoop。
在start()方法啟動服務器時,要向IOLoop中注冊對listen fd的可讀事件的回調,listen fd可讀,表示有新的客戶接入到HTTPServer中。我們來看看接入HTTPServer的事件回調_handle_events
def _handle_events(self, fd, events): while True: try: connection, address = self._socket.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl.wrap_socket(connection, server_side=True, do_handshake_on_connect=False, **self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise try: if self.ssl_options is not None: stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) else: stream = iostream.IOStream(connection, io_loop=self.io_loop) HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
不看ssl處理部分。首先基于listen fd調用accept函數,獲取connection和address,注意到tornado處理了spurious wakeup的情況:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return
也就是說,當前l(fā)isten fd,沒有數據可以讀了,表示沒有新的客戶連接過來,那么就應該返回?;卣{函數_handle_events,創(chuàng)建了IOStream對象stream,
stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
注意到創(chuàng)建IOStream對象會給單進程的IOLoop添加新的回調函數,該函數是用來處理accept fd的讀事件,此時,在有客戶接入過來,那么IOLoop中的handler有如下幾個:
其中,
- 描述符4是listen fd 讀事件的處理回調
- 描述符7 是用來喚醒IOLoop _read_waker
- 描述符9 是新accept文件描述符讀事件的回調.
accept fd 回調函數
def _handle_events(self, fd, events): # 確保該連接還存在 if not self.socket: logging.warning("Got events for closed stream %d", fd) return try: # 如果該連接的讀事件產生了,調用讀回調 if events & self.io_loop.READ: self._handle_read() if not self.socket: return # 如果是寫事件 if events & self.io_loop.WRITE: # 如果該socket是客戶端創(chuàng)建的socket, 其已經被服務器處理 if self._connecting: # 連接服務器端 self._handle_connect() # 普通的寫事件回調 self._handle_write() if not self.socket: return # 錯誤 if events & self.io_loop.ERROR: # IOLoop中刪除該文件描述符對應的handler # 并關閉連接. self.close() return state = self.io_loop.ERROR # 如果正在讀 if self.reading(): # 添加狀態(tài)繼續(xù)讀 state |= self.io_loop.READ if self.writing(): # 如果正在寫,添加狀態(tài)寫 state |= self.io_loop.WRITE if state != self._state: self._state = state # 更新對應fd對應的關注狀態(tài) self.io_loop.update_handler(self.socket.fileno(), self._state) except: logging.error("Uncaught exception, closing connection.", exc_info=True) self.close() raise
HTTPConnection對象的創(chuàng)建
接著我們將思路放回到listen fd 的回調函數_handle_events中,在IOStream對象stream創(chuàng)建后,_handle_events將創(chuàng)建HTTPConnection
HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders)
在HTTPConnnection構造函數中,可以看到起調用了
self.stream.read_until("\r\n\r\n", self._header_callback)
此時,HTTPConnection中的成員stream,表示的是accept fd中的stream,read_until函數將會從accept fd中一直讀到\r\n\n,然后調用_header_callback來解析HTTP的頭部字段。
read_until函數
read_until從套接字中就是讀到指定的分隔符為止
def read_until(self, delimiter, callback): """Call callback when we read the given delimiter.""" assert not self._read_callback, "Already reading" self._read_delimiter = delimiter self._read_callback = stack_context.wrap(callback) while True: # See if we've already got the data from a previous read if self._read_from_buffer(): return self._check_closed() if self._read_to_buffer() == 0: break self._add_io_state(self.io_loop.READ)
readl_until有兩種退出方式:一種是從buffer中讀到數據后,直接返回,另一種是將數據讀到buffer,_\read_to_buffer就是將數據讀到buffer,如果沒有數據,則將該socket的讀事件添加進來。
accept fd獲取后,顯然該fd對應的IOStream中的緩沖區(qū)為0,所以_read_from_buffer返回False,流程將執(zhí)行_read_to_buffer。
從緩沖區(qū)讀_read_from_buffer
def _read_from_buffer(self): """Attempts to complete the currently-pending read from the buffer. Returns True if the read was completed. """ if self._read_bytes: if self._read_buffer_size() >= self._read_bytes: num_bytes = self._read_bytes callback = self._read_callback self._read_callback = None self._read_bytes = None self._run_callback(callback, self._consume(num_bytes)) return True elif self._read_delimiter: _merge_prefix(self._read_buffer, sys.maxint) loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) self._read_callback = None self._read_delimiter = None self._run_callback(callback, self._consume(loc + delimiter_len)) return True return False
read_from_buffer有兩種形式的讀:
- 一種是讀指定字節(jié)的數據
- 一種是讀到指定的分隔符
應該注意的是_read_from_buffer中指定了回調函數,意思是從socket中讀的數據后,使用回調函數來消費。應該注意的是_read_from_buffer并跟socket打交道,其假設所有的數據已經在buffer中了。
將數據讀到緩沖區(qū)
從socket中讀數據到緩沖區(qū),使用的是_read_to_buffer。read_to_buffer實際調用的是_read_from_socket,其從non-blocking中讀一次,最多讀4096個字節(jié)。注意錯誤的處理,當我們發(fā)現(xiàn)從socket中讀,發(fā)生了其他的錯誤(除了EAGAIN)的時候,就應該關閉連接。
def _read_to_buffer(self): """Reads from the socket and appends the result to the read buffer. Returns the number of bytes read. Returns 0 if there is nothing to read (i.e. the read returns EWOULDBLOCK or equivalent). On error closes the socket and raises an exception. """ try: chunk = self._read_from_socket() except socket.error, e: # ssl.SSLError is a subclass of socket.error logging.warning("Read error on %d: %s", self.socket.fileno(), e) self.close() raise if chunk is None: return 0 self._read_buffer.append(chunk) if self._read_buffer_size() >= self.max_buffer_size: logging.error("Reached maximum read buffer size") self.close() raise IOError("Reached maximum read buffer size") return len(chunk)
該函數總體來說就是講將據讀到bufferIOStream中的_read_buffer中,并返回實際讀的大小。注意錯誤的處理:如果所讀的數據過大,那么也應該關閉連接。
從socket讀一次數據_read_from_socket
def _read_from_socket(self): """Attempts to read from the socket Returns the data read or None if there is nothing to read. May be overridden in subclasses. """ try: # 盡可能讀得多 chunk = self.socket.recv(self.read_chunk_size) except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): # 沒有數據 return None else: raise if not chunk: # 客戶端數據沒有了,那么表示客戶端關閉了連接 self.close() return None return chunk
LT模式下從socket中讀數據_handle_read()
def _handle_read(self): while True: try: # Read from the socket until we get EWOULDBLOCK or equivalent. # SSL sockets do some internal buffering, and if the data is # sitting in the SSL object's buffer select() and friends # can't see it; the only way to find out if it's there is to # try to read it. result = self._read_to_buffer() except Exception: self.close() return if result == 0: break else: if self._read_from_buffer(): return
在LT模式下從socket中讀分成了兩步:
- 從socket中讀數據讀到_read_to_buffer
- 然后從buffer中讀。
從socket讀,要一直讀到出現(xiàn)EWOULDBLOCK或者是EAGAIN。注意到while True
,就是不停的讀,讀到沒有數據。
以上就是Tornado源碼分析之HTTP服務請求解析的詳細內容,更多關于Tornado HTTP服務請求的資料請關注腳本之家其它相關文章!
相關文章
詳解基于Transformer實現(xiàn)電影評論星級分類任務
這篇文章主要為大家介紹了詳解基于Transformer實現(xiàn)電影評論星級分類任務過程解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04利用python模擬實現(xiàn)POST請求提交圖片的方法
最近在利用python做接口測試,其中有個上傳圖片的接口,在網上各種搜索,各種嘗試。下面這篇文章主要給大家介紹了關于利用python模擬實現(xiàn)POST請求提交圖片的相關資料,需要的朋友可以參考借鑒,下面來一起看看吧。2017-07-07Python操作excel的方法總結(xlrd、xlwt、openpyxl)
這篇文章主要給大家介紹了關于Python操作excel的一些方法,其中包括xlrd、xlwt、openpyxl的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用Python具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧2019-09-09Pyspider進行API接口抓取和數據采集的實現(xiàn)
Pyspider是一個基于Python的強大的網絡爬蟲框架,它提供了豐富的功能和靈活的擴展性,使我們可以輕松地進行數據的抓取和處理,本文主要介紹了Pyspider進行API接口抓取和數據采集的實現(xiàn),感興趣的可以了解一下2023-09-09Pycharm中安裝wordcloud等庫失敗問題及終端通過pip安裝的Python庫如何添加到Pycharm解釋器中(
這篇文章主要介紹了Pycharm中安裝wordcloud等庫失敗問題及終端通過pip安裝的Python庫如何添加到Pycharm解釋器中,本文給大家介紹的非常詳細,需要的朋友可以參考下2020-05-05