一文帶你探索Python中的eventlet通信機(jī)制
一、源碼解析
對(duì)python原生文件打補(bǔ)丁:
import eventlet eventlet.monkey_patch()
跟蹤進(jìn)入該模塊方法:eventlet.patcher#monkey_patch
def monkey_patch(**on): ...... modules_to_patch = [] for name, modules_function in [ ('os', _green_os_modules), ('select', _green_select_modules), ('socket', _green_socket_modules), ('thread', _green_thread_modules), ('time', _green_time_modules), ('MySQLdb', _green_MySQLdb), ('builtins', _green_builtins), ('subprocess', _green_subprocess_modules), ]: if on[name] and not already_patched.get(name): modules_to_patch += modules_function() already_patched[name] = True ......
該方法對(duì)某些系統(tǒng)模塊進(jìn)行全局打補(bǔ)丁,使其對(duì)Greenthread友好。關(guān)鍵字參數(shù)用于指定哪些模塊需要打補(bǔ)丁,如果未提供關(guān)鍵字參數(shù),則會(huì)對(duì)所有默認(rèn)的模塊(如代碼所示)打補(bǔ)丁,例如: monkey_patch(socket = True,select = True)
僅對(duì)socket和select模塊打補(bǔ)丁。大多數(shù)參數(shù)都是對(duì)同名的單個(gè)模塊進(jìn)行打補(bǔ)丁,比如操作系統(tǒng),時(shí)間,選擇。但是socket例外,它也會(huì)對(duì)ssl模塊(如果存在)打補(bǔ)丁,thread用于對(duì)threading、thread、Queue打補(bǔ)丁。說(shuō)明:多次調(diào)用monkey_patch是安全的。
以socket為例:('socket', _green_socket_modules)
,進(jìn)入該方法:
def _green_socket_modules(): from eventlet.green import socket try: from eventlet.green import ssl return [('socket', socket), ('ssl', ssl)] except ImportError: return [('socket', socket)]
進(jìn)入socket模塊:eventlet.green.socket
__import__('eventlet.green._socket_nodns') __socket = sys.modules['eventlet.green._socket_nodns'] __all__ = __socket.__all__ __patched__ = __socket.__patched__ + [ 'create_connection', 'getaddrinfo', 'gethostbyname', 'gethostbyname_ex', 'getnameinfo', ]
在進(jìn)入eventlet.green._socket_nodns:
__socket = __import__('socket') __all__ = __socket.__all__ __patched__ = ['fromfd', 'socketpair', 'ssl', 'socket', 'timeout']
可以看到是對(duì)python的原生socket模塊進(jìn)行了打補(bǔ)?。簆ythonx.x/Lib/socket.py 以socket類為例:python原生的socket.socket()類并替換為了eventlet.greenio.base#GreenSocket類 該補(bǔ)丁類完全兼容原生socket類的API,它還可以識(shí)別關(guān)鍵字參數(shù)set_nonblocking = True
。用來(lái)設(shè)置socket為非阻塞模式。
class GreenSocket(object): # This placeholder is to prevent __getattr__ from creating an infinite call loop fd = None def __init__(self, family=socket.AF_INET, *args, **kwargs): should_set_nonblocking = kwargs.pop('set_nonblocking', True) if isinstance(family, six.integer_types): fd = _original_socket(family, *args, **kwargs) # Notify the hub that this is a newly-opened socket. notify_opened(fd.fileno()) else: fd = family # import timeout from other socket, if it was there try: self._timeout = fd.gettimeout() or socket.getdefaulttimeout() except AttributeError: self._timeout = socket.getdefaulttimeout() # Filter fd.fileno() != -1 so that won't call set non-blocking on # closed socket if should_set_nonblocking and fd.fileno() != -1: set_nonblocking(fd) self.fd = fd # when client calls setblocking(0) or settimeout(0) the socket must # act non-blocking self.act_non_blocking = False ......
我們?cè)賮?lái)看下ssl模塊。python原生的ssl模塊被替換為了evenlet.green.ssl模塊 該模塊提供了一個(gè)方法用來(lái)包裝socket:
def wrap_socket(sock, *a, **kw): return GreenSSLSocket(sock, *a, **kw)
直接進(jìn)入GreenSSLSocket類:
class GreenSSLSocket(_original_sslsocket): ......
可以看出該補(bǔ)丁模塊繼承了原生socket,將原生socket的api都重寫了,但是基本都是直接調(diào)用原生api。注:Python3.x版本中,如果socket的另一端已關(guān)閉時(shí),非阻塞模式的sslsocket對(duì)象不會(huì)再拋出錯(cuò)誤(雖然它們會(huì)在另一端關(guān)閉時(shí)發(fā)出通知)。如果另一端的socket已經(jīng)關(guān)閉,任何的寫/讀操作都會(huì)被簡(jiǎn)單地掛起。這個(gè)問(wèn)題目前沒(méi)有好的解決方案。它看起來(lái)是Python的sslsocket對(duì)象實(shí)現(xiàn)的一個(gè)限制。一個(gè)解決方法是使用命令settimeout()
在socket上設(shè)置合理的超時(shí)時(shí)間,并在超時(shí)時(shí)關(guān)閉/重新打開(kāi)連接。
下面看下原生ssl模塊:pythonx.x/Lib/ssl.py
def wrap_socket(sock, keyfile=None, certfile=None, server_side=False, cert_reqs=CERT_NONE, ssl_version=PROTOCOL_TLS, ca_certs=None, do_handshake_on_connect=True, suppress_ragged_eofs=True, ciphers=None): if server_side and not certfile: raise ValueError("certfile must be specified for server-side " "operations") if keyfile and not certfile: raise ValueError("certfile must be specified") context = SSLContext(ssl_version) context.verify_mode = cert_reqs if ca_certs: context.load_verify_locations(ca_certs) if certfile: context.load_cert_chain(certfile, keyfile) if ciphers: context.set_ciphers(ciphers) return context.wrap_socket( sock=sock, server_side=server_side, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs )
可以看到該調(diào)用了SSLContext.wrap_socket方法,進(jìn)入該方法:
class SSLContext(_SSLContext): ...... sslsocket_class = None # SSLSocket is assigned later. sslobject_class = None # SSLObject is assigned later. ...... def wrap_socket(self, sock, server_side=False, do_handshake_on_connect=True, suppress_ragged_eofs=True, server_hostname=None, session=None): # SSLSocket class handles server_hostname encoding before it calls # ctx._wrap_socket() return self.sslsocket_class._create( sock=sock, server_side=server_side, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, server_hostname=server_hostname, context=self, session=session )
該類中類屬性sslobject_class定義如下:
# Python does not support forward declaration of types. SSLContext.sslsocket_class = SSLSocket SSLContext.sslobject_class = SSLObject
進(jìn)入SSLSocket類:
class SSLSocket(socket): ...... @classmethod def _create(cls, sock, server_side=False, do_handshake_on_connect=True, suppress_ragged_eofs=True, server_hostname=None, context=None, session=None): ...... if connected: # create the SSL object try: self._sslobj = self._context._wrap_socket( self, server_side, self.server_hostname, owner=self, session=self._session, ) if do_handshake_on_connect: timeout = self.gettimeout() if timeout == 0.0: # non-blocking raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets") self.do_handshake() except (OSError, ValueError): self.close() raise return self
最終該self._sslobj實(shí)例就是cpython中定義的對(duì)象,所有后續(xù)的所有操作都是調(diào)用的cpython方法。
二、遺留問(wèn)題
問(wèn)題堆棧:
Traceback (most recent call last): File "test.py", line 40, in <module> main() File "test.py", line 35, in main srv(listener) File "test.py", line 10, in srv r.readline(1<<10) File "/usr/lib/python3.7/socket.py", line 589, in readinto return self._sock.recv_into(b) File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 241, in recv_into return self._base_recv(nbytes, flags, into=True, buffer_=buffer) File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 256, in _base_recv read = self.read(nbytes, buffer_) File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 176, in read super(GreenSSLSocket, self).read, *args, **kwargs) File "/usr/lib/python3.7/site-packages/eventlet/green/ssl.py", line 146, in _call_trampolining return func(*a, **kw) File "/usr/lib/python3.7/ssl.py", line 911, in read return self._sslobj.read(len, buffer) ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2488)
從這里我們可以看到系統(tǒng)調(diào)用的入口是python3.7/socket.py中的readinto方法,進(jìn)入該方法:
def readinto(self, b): self._checkClosed() self._checkReadable() if self._timeout_occurred: raise OSError("cannot read from timed out object") while True: try: return self._sock.recv_into(b) except timeout: self._timeout_occurred = True raise except error as e: if e.args[0] in _blocking_errnos: return None raise
最多將len(b)個(gè)字節(jié)讀入可寫緩沖區(qū)* b 并返回讀取的字節(jié)數(shù)。如果套接字是非阻塞的并且沒(méi)有字節(jié)可用,則返回None。如果 b *為非空,則返回值為0表示該連接在另一端被關(guān)閉。注:如果未設(shè)置默認(rèn)超時(shí)并且偵聽(tīng)套接字具有(非零)超時(shí),請(qǐng)強(qiáng)制新套接字處于阻塞模式,以覆蓋特定于平臺(tái)的套接字標(biāo)志繼承。
我們根據(jù)堆棧一步步進(jìn)入最終報(bào)錯(cuò)的地方:self._sslobj.read(len, buffer)
根據(jù)我們上面說(shuō)的,self._sslobj
實(shí)際上是cpython對(duì)象,那read方法是怎么就進(jìn)入到了cpython實(shí)際的方法里面的呢?通過(guò)python代用C代碼的機(jī)制可以找到如下代碼:
#define _SSL__SSLSOCKET_READ_METHODDEF \ {"read", (PyCFunction)_ssl__SSLSocket_read, METH_VARARGS, _ssl__SSLSocket_read__doc__}, static PyObject * _ssl__SSLSocket_read_impl(PySSLSocket *self, int len, int group_right_1, Py_buffer *buffer); static PyObject * _ssl__SSLSocket_read(PySSLSocket *self, PyObject *args) { PyObject *return_value = NULL; int len; int group_right_1 = 0; Py_buffer buffer = {NULL, NULL}; switch (PyTuple_GET_SIZE(args)) { case 1: if (!PyArg_ParseTuple(args, "i:read", &len)) { goto exit; } break; case 2: if (!PyArg_ParseTuple(args, "iw*:read", &len, &buffer)) { goto exit; } group_right_1 = 1; break; default: PyErr_SetString(PyExc_TypeError, "_ssl._SSLSocket.read requires 1 to 2 arguments"); goto exit; } return_value = _ssl__SSLSocket_read_impl(self, len, group_right_1, &buffer); exit: /* Cleanup for buffer */ if (buffer.obj) { PyBuffer_Release(&buffer); } return return_value; }
可以看出,read
是映射到了_ssl__SSLSocket_read
方法,而_ssl__SSLSocket_read
則調(diào)用了_ssl__SSLSocket_read_impl
方法。我們進(jìn)入_ssl__SSLSocket_read_impl
的實(shí)現(xiàn):
static PyObject * _ssl__SSLSocket_read_impl(PySSLSocket *self, int len, int group_right_1, Py_buffer *buffer) /*[clinic end generated code: output=00097776cec2a0af input=ff157eb918d0905b]*/ { ...... do { PySSL_BEGIN_ALLOW_THREADS count = SSL_read(self->ssl, mem, len); err = _PySSL_errno(count <= 0, self->ssl, count); PySSL_END_ALLOW_THREADS self->err = err; if (PyErr_CheckSignals()) goto error; if (has_timeout) timeout = deadline - _PyTime_GetMonotonicClock(); if (err.ssl == SSL_ERROR_WANT_READ) { sockstate = PySSL_select(sock, 0, timeout); } else if (err.ssl == SSL_ERROR_WANT_WRITE) { sockstate = PySSL_select(sock, 1, timeout); } else if (err.ssl == SSL_ERROR_ZERO_RETURN && SSL_get_shutdown(self->ssl) == SSL_RECEIVED_SHUTDOWN) { count = 0; goto done; } else sockstate = SOCKET_OPERATION_OK; if (sockstate == SOCKET_HAS_TIMED_OUT) { PyErr_SetString(PySocketModule.timeout_error, "The read operation timed out"); goto error; } else if (sockstate == SOCKET_IS_NONBLOCKING) { break; } } while (err.ssl == SSL_ERROR_WANT_READ || err.ssl == SSL_ERROR_WANT_WRITE); if (count <= 0) { PySSL_SetError(self, count, __FILE__, __LINE__); goto error; } if (self->exc_type != NULL) goto error; ...... }
從該模塊的include也可以看出,該模塊就是調(diào)用了系統(tǒng)的openssl庫(kù)進(jìn)行ssl通信
/* Include OpenSSL header files */ #include "openssl/rsa.h" #include "openssl/crypto.h" #include "openssl/x509.h" #include "openssl/x509v3.h" #include "openssl/pem.h" #include "openssl/ssl.h" #include "openssl/err.h" #include "openssl/rand.h" #include "openssl/bio.h" #include "openssl/dh.h"
進(jìn)入openssl頭文件,可以看到確實(shí)有定義這個(gè)錯(cuò)誤碼SSL_ERROR_WANT_READ
:
在openssl源碼中我們可以找到這個(gè)定義include.openssl.ssl.h
# define SSL_AD_NO_APPLICATION_PROTOCOL TLS1_AD_NO_APPLICATION_PROTOCOL # define SSL_ERROR_NONE 0 # define SSL_ERROR_SSL 1 # define SSL_ERROR_WANT_READ 2 # define SSL_ERROR_WANT_WRITE 3 # define SSL_ERROR_WANT_X509_LOOKUP 4 # define SSL_ERROR_SYSCALL 5/* look at error stack/return
下面我們來(lái)看下PySSL_SetError
方法:cpython->modules._ssl.c
static PyObject * PySSL_SetError(PySSLSocket *sslsock, int ret, const char *filename, int lineno) { PyObject *type = PySSLErrorObject; char *errstr = NULL; _PySSLError err; enum py_ssl_error p = PY_SSL_ERROR_NONE; unsigned long e = 0; assert(ret <= 0); e = ERR_peek_last_error(); if (sslsock->ssl != NULL) { err = sslsock->err; switch (err.ssl) { case SSL_ERROR_ZERO_RETURN: errstr = "TLS/SSL connection has been closed (EOF)"; type = PySSLZeroReturnErrorObject; p = PY_SSL_ERROR_ZERO_RETURN; break; case SSL_ERROR_WANT_READ: errstr = "The operation did not complete (read)"; type = PySSLWantReadErrorObject; p = PY_SSL_ERROR_WANT_READ; break; case SSL_ERROR_WANT_WRITE: p = PY_SSL_ERROR_WANT_WRITE; type = PySSLWantWriteErrorObject; errstr = "The operation did not complete (write)"; break;
經(jīng)過(guò)一步步跟進(jìn)去,確實(shí)會(huì)發(fā)現(xiàn)返回了一個(gè)SSLError類型的錯(cuò)誤。
到此這篇關(guān)于一文帶你探索Python中的eventlet通信機(jī)制的文章就介紹到這了,更多相關(guān)Python eventlet通信機(jī)制內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python編寫電話薄實(shí)現(xiàn)增刪改查功能
這篇文章主要為大家詳細(xì)介紹了Python編寫電話薄實(shí)現(xiàn)增刪改查功能的相關(guān)資料,感興趣的朋友可以參考一下2016-05-05Python的Scrapy框架中的CrawlSpider介紹和使用
這篇文章主要介紹了Python的Scrapy框架中的CrawlSpider介紹和使用,CrawlSpider其實(shí)是Spider的一個(gè)子類,除了繼承到Spider的特性和功能外,還派生除了其自己獨(dú)有的更加強(qiáng)大的特性和功能,其中最顯著的功能就是"LinkExtractors鏈接提取器",需要的朋友可以參考下2023-12-12python中plt.imshow與cv2.imshow顯示顏色問(wèn)題
這篇文章主要介紹了plt.imshow與cv2.imshow顯示顏色問(wèn)題,本文給大家介紹的非常詳細(xì),同時(shí)給大家提到了cv2.imshow()和plt.imshow()的區(qū)別講解,需要的朋友可以參考下2020-07-07新手該如何學(xué)python怎么學(xué)好python?
怎么學(xué)好python?怎么靈活應(yīng)用python?2008-10-10Python虛擬環(huán)境virtualenv是如何使用的
今天給大家?guī)?lái)的是關(guān)于Python虛擬環(huán)境的相關(guān)知識(shí),文章圍繞著Python虛擬環(huán)境virtualenv是如何使用的展開(kāi),文中有非常詳細(xì)的解釋及代碼示例,需要的朋友可以參考下2021-06-06tensorflow轉(zhuǎn)onnx的實(shí)現(xiàn)方法
本文主要介紹了tensorflow轉(zhuǎn)onnx的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03使用python實(shí)現(xiàn)兩數(shù)之和的畫解算法
這篇文章主要介紹了使用python實(shí)現(xiàn)兩數(shù)之和的畫解算法,采用實(shí)例問(wèn)題的描述來(lái)進(jìn)行問(wèn)題分析,并給出用暴力求解和哈希表兩種方法解決方案,有需要的朋友可以參考下2021-08-08