剖析Python的Twisted框架的核心特性
一. reactor
twisted的核心是reactor,而提到reactor不可避免的是同步/異步,阻塞/非阻塞,在Dave的第一章概念性介紹中,對同步/異步的界限有點模糊,關(guān)于同步/異步,阻塞/非阻塞可參見知乎討論。而關(guān)于proactor(主動器)和reactor(反應(yīng)堆),這里有一篇推薦博客有比較詳細(xì)的介紹。
就reactor模式的網(wǎng)絡(luò)IO而言,應(yīng)該是同步IO而不是異步IO。而Dave第一章中提到的異步,核心在于:顯式地放棄對任務(wù)的控制權(quán)而不是被操作系統(tǒng)隨機(jī)地停止,程序員必須將任務(wù)組織成序列來交替的小步完成。因此,若其中一個任務(wù)用到另外一個任務(wù)的輸出,則依賴的任務(wù)(即接收輸出的任務(wù))需要被設(shè)計成為要接收系列比特或分片而不是一下全部接收。
顯式主動地放棄任務(wù)的控制權(quán)有點類似協(xié)程的思考方式,reactor可看作協(xié)程的調(diào)度器。reactor是一個事件循環(huán),我們可以向reactor注冊自己感興趣的事件(如套接字可讀/可寫)和處理器(如執(zhí)行讀寫操作),reactor會在事件發(fā)生時回調(diào)我們的處理器,處理器執(zhí)行完成之后,相當(dāng)于協(xié)程掛起(yield),回到reactor的事件循環(huán)中,等待下一個事件來臨并回調(diào)。reactor本身有一個同步事件多路分解器(Synchronous Event Demultiplexer),可用select/epoll等機(jī)制實現(xiàn),當(dāng)然twisted reactor的事件觸發(fā)不一定是基于IO,也可以由定時器等其它機(jī)制觸發(fā)。
twisted的reactor無需我們主動注冊事件和回調(diào)函數(shù),而是通過多態(tài)(繼承特定類,并實現(xiàn)所關(guān)心的事件接口,然后傳給twisted reactor)來實現(xiàn)。關(guān)于twisted的reactor,有幾個需要注意的地方:
twisted.internet.reactor是單例模式,每個程序只能有一個reactor;
盡量在reactor回調(diào)函數(shù)盡快完成操作,不要執(zhí)行阻塞任務(wù),reactor本質(zhì)是單線程,用戶回調(diào)代碼與twisted代碼運行在同一個上下文,某個回調(diào)函數(shù)中阻塞,會導(dǎo)致reactor整個事件循環(huán)阻塞;
reactor會一直運行,除非通過reactor.stop()顯示停止它,但一般調(diào)用reactor.stop(),也就意味著應(yīng)用程序結(jié)束;
二. twisted簡單使用
twisted的本質(zhì)是reactor,我們可以使用twisted的底層API(避開twisted便利的高層抽象)來使用reactor:
# 示例一 twisted底層API的使用 from twisted.internet import reacto from twisted.internet import main from twisted.internet.interfaces import IReadDescriptor import socket class MySocket(IReadDescriptor): def __init__(self, address): # 連接服務(wù)器 self.address = address self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.connect(address) self.sock.setblocking(0) # tell the Twisted reactor to monitor this socket for reading reactor.addReader(self) # 接口: 告訴reactor 監(jiān)聽的套接字描述符 def fileno(self): try: return self.sock.fileno() except socket.error: return -1 # 接口: 在連接斷開時的回調(diào) def connectionLost(self, reason): self.sock.close() reactor.removeReader(self) # 當(dāng)應(yīng)用程序需要終止時 調(diào)用: # reactor.stop() # 接口: 當(dāng)套接字描述符有數(shù)據(jù)可讀時 def doRead(self): bytes = '' # 盡可能多的讀取數(shù)據(jù) while True: try: bytesread = self.sock.recv(1024) if not bytesread: break else: bytes += bytesread except socket.error, e: if e.args[0] == errno.EWOULDBLOCK: break return main.CONNECTION_LOST if not bytes: return main.CONNECTION_DONE else: # 在這里解析協(xié)議并處理數(shù)據(jù) print bytes
示例一可以很清晰的看到twisted的reactor本質(zhì):添加監(jiān)聽描述符,監(jiān)聽可讀/可寫事件,當(dāng)事件來臨時回調(diào)函數(shù),回調(diào)完成之后繼續(xù)監(jiān)聽事件。
需要注意:
套接字為非阻塞,如果為阻塞則失去了reactor的意義
我們通過繼承IReadDescriptor來提供reactor所需要的接口
通過reactor.addReader將套接字類加入reactor的監(jiān)聽對象中
main.CONNECTION_LOST是twisted預(yù)定義的值,通過這些值它我們可以一定程度控制下一步回調(diào)(類似于模擬一個事件)
但是上面的MySocket類不夠好,主要有以下缺點:
需要我們自己去讀取數(shù)據(jù),而不是框架幫我們讀好,并處理異常
網(wǎng)絡(luò)IO和數(shù)據(jù)處理混為一塊,沒有剝離開來
三. twisted抽象
twisted在reactor的基礎(chǔ)上,建立了更高的抽象,對一個網(wǎng)絡(luò)連接而言,twisted建立了如下三個概念:
Transports:網(wǎng)絡(luò)連接層,僅負(fù)責(zé)網(wǎng)絡(luò)連接和讀/寫字節(jié)數(shù)據(jù)
Protocols: 協(xié)議層,服務(wù)業(yè)務(wù)相關(guān)的網(wǎng)絡(luò)協(xié)議,將字節(jié)流轉(zhuǎn)換成應(yīng)用所需數(shù)據(jù)
Protocol Factories:協(xié)議工廠,負(fù)責(zé)創(chuàng)建Protocols,每個網(wǎng)絡(luò)連接都有一個Protocols對象(因為要保存協(xié)議解析狀態(tài))
twisted的這些概念和erlang中的ranch網(wǎng)絡(luò)框架很像,ranch框架也抽象了Transports和Protocols概念,在有新的網(wǎng)絡(luò)連接時,ranch自動創(chuàng)建Transports和Protocols,其中Protocols由用戶在啟動ranch時傳入,是一個實現(xiàn)了ranch_protocol behaviour的模塊,Protocols初始化時,會收到該連接對應(yīng)的Transports,如此我們可以在Protocols中處理字節(jié)流數(shù)據(jù),按照我們的協(xié)議解析并處理數(shù)據(jù)。同時可通過Transports來發(fā)送數(shù)據(jù)(ranch已經(jīng)幫你讀取了字節(jié)流數(shù)據(jù)了)。
和ranch類似,twisted也會在新連接到達(dá)時創(chuàng)建Protocols并且將Transport傳入,twisted會幫我們讀取字節(jié)流數(shù)據(jù),我們只需在dataReceived(self, data)接口中處理字節(jié)流數(shù)據(jù)即可。此時的twisted在網(wǎng)絡(luò)IO上可以算是真正的異步了,它幫我們處理了網(wǎng)絡(luò)IO和可能遇到的異常,并且將網(wǎng)絡(luò)IO和數(shù)據(jù)處理剝離開來,抽象為Transports和Protocols,提高了程序的清晰性和健壯性。
# 示例二 twisted抽象的使用 from twisted.internet import reactor from twisted.internet.protocol import Protocol, ClientFactory class MyProtocol(Protocol): # 接口: Protocols初始化時調(diào)用,并傳入Transports # 另外 twisted會自動將Protocols的factory對象成員設(shè)為ProtocolsFactory實例的引用 # 如此就可以通過factory來與MyProtocolFactory交互 def makeConnection(self,trans): print 'make connection: get transport: ', trans print 'my factory is: ', self.factory # 接口: 有數(shù)據(jù)到達(dá) def dataReceived(self, data): self.poem += data msg = 'Task %d: got %d bytes of poetry from %s' print msg % (self.task_num, len(data), self.transport.getPeer()) # 接口: 連接斷開 def connectionLost(self, reason): # 連接斷開的處理 class MyProtocolFactory(ClientFactory): # 接口: 通過protocol類成員指出需要創(chuàng)建的Protocols protocol = PoetryProtocol # tell base class what proto to build def __init__(self, address): self.poetry_count = poetry_count self.poems = {} # task num -> poem # 接口: 在創(chuàng)建Protocols的回調(diào) def buildProtocol(self, address): proto = ClientFactory.buildProtocol(self, address) # 在這里對proto做一些初始化.... return proto # 接口: 連接Server失敗時的回調(diào) def clientConnectionFailed(self, connector, reason): print 'Failed to connect to:', connector.getDestination() def main(address): factory = MyClientFactory(address) host, port = address # 連接服務(wù)端時傳入ProtocolsFactory reactor.connectTCP(host, port, factory) reactor.run()
示例二要比示例一要簡單清晰很多,因為它無需處理網(wǎng)絡(luò)IO,并且邏輯上更為清晰,實際上ClientFactory和Protocol提供了更多的接口用于實現(xiàn)更靈活強(qiáng)大的邏輯控制,具體的接口可參見twisted源代碼。
四. twisted Deferred
twisted Deferred對象用于解決這樣的問題:有時候我們需要在ProtocolsFactory中嵌入自己的回調(diào),以便Protocols中發(fā)生某個事件(如所有Protocols都處理完成)時,回調(diào)我們指定的函數(shù)(如TaskFinished)。如果我們自己來實現(xiàn)回調(diào),需要處理幾個問題:
如何區(qū)分回調(diào)的正確返回和錯誤返回?(我們在使用異步調(diào)用時,要尤其注意錯誤返回的重要性)
如果我們的正確返回和錯誤返回都需要執(zhí)行一個公共函數(shù)(如關(guān)閉連接)呢?
如果保證該回調(diào)只被調(diào)用一次?
Deferred對象便用于解決這種問題,它提供兩個回調(diào)鏈,分別對應(yīng)于正確返回和錯誤返回,在正確返回或錯誤返回時,它會依次調(diào)用對應(yīng)鏈中的函數(shù),并且保證回調(diào)的唯一性。
d = Deferred() # 添加正確回調(diào)和錯誤回調(diào) d.addCallbacks(your_ok_callback, your_err_callback) # 添加公共回調(diào)函數(shù) d.addBoth(your_common_callback) # 正確返回 將依次調(diào)用 your_ok_callback(Res) -> common_callback(Res) d.callback(Res) # 錯誤返回 將依次調(diào)用 your_err_callback(Err) -> common_callback(Err) d.errback(Err) # 注意,對同一個Defered對象,只能返回一次,嘗試多次返回將會報錯
twisted的defer是異步的一種變現(xiàn)方式,可以這么理解,他和thread的區(qū)別是,他是基于時間event的。
有了deferred,即可對任務(wù)的執(zhí)行進(jìn)行管理控制。防止程序的運行,由于等待某項任務(wù)的完成而陷入阻塞停滯,提高整體運行的效率。
Deferred能幫助你編寫異步代碼,但并不是為自動生成異步或無阻塞的代碼!要想將一個同步函數(shù)編程異步函數(shù),必須在函數(shù)中返回Deferred并正確注冊回調(diào)。
五.綜合示例
下面的例子,你們自己跑跑,我上面說的都是一些個零散的例子,大家對照下面完整的,走一遍。 twisted理解其實卻是有點麻煩,大家只要知道他是基于事件的后,慢慢理解就行了。
#coding:utf-8 #xiaorui.cc from twisted.internet import reactor, defer from twisted.internet.threads import deferToThread import os,sys from twisted.python import threadable; threadable.init(1) deferred =deferToThread.__get__ import time def todoprint_(result): print result def running(): "Prints a few dots on stdout while the reactor is running." # sys.stdout.write("."); sys.stdout.flush() print '.' reactor.callLater(.1, running) @deferred def sleep(sec): "A blocking function magically converted in a non-blocking one." print 'start sleep %s'%sec time.sleep(sec) print '\nend sleep %s'%sec return "ok" def test(n,m): print "fun test() is start" m=m vals = [] keys = [] for i in xrange(m): vals.append(i) keys.append('a%s'%i) d = None for i in xrange(n): d = dict(zip(keys, vals)) print "fun test() is end" return d if __name__== "__main__": #one sleep(10).addBoth(todoprint_) reactor.callLater(.1, running) reactor.callLater(3, reactor.stop) print "go go !!!" reactor.run() #two aa=time.time() de = defer.Deferred() de.addCallback(test) reactor.callInThread(de.callback,10000000,100 ) print time.time()-aa print "我這里先做別的事情" print de print "go go end"
- python如何通過twisted搭建socket服務(wù)
- Python3.6中Twisted模塊安裝的問題與解決
- python安裝twisted的問題解析
- python如何通過twisted實現(xiàn)數(shù)據(jù)庫異步插入
- python基于twisted框架編寫簡單聊天室
- python 編程之twisted詳解及簡單實例
- Python 基于Twisted框架的文件夾網(wǎng)絡(luò)傳輸源碼
- 實例解析Python的Twisted框架中Deferred對象的用法
- 詳解Python的Twisted框架中reactor事件管理器的用法
- 使用Python的Twisted框架編寫非阻塞程序的代碼示例
- Python的Twisted框架中使用Deferred對象來管理回調(diào)函數(shù)
- 使用Python的Twisted框架構(gòu)建非阻塞下載程序的實例教程
- Python的Twisted框架上手前所必須了解的異步編程思想
- 使用Python的Treq on Twisted來進(jìn)行HTTP壓力測試
- 利用Python的Twisted框架實現(xiàn)webshell密碼掃描器的教程
- 使用Python的Twisted框架實現(xiàn)一個簡單的服務(wù)器
- 使用Python的Twisted框架編寫簡單的網(wǎng)絡(luò)客戶端
- python開發(fā)實例之Python的Twisted框架中Deferred對象的詳細(xì)用法與實例
相關(guān)文章
Python模擬登陸網(wǎng)頁的三種方法小結(jié)
如何使用Python模擬登陸網(wǎng)頁,尤其是在涉及到復(fù)雜的認(rèn)證機(jī)制時?這篇文章將詳細(xì)介紹Python模擬登陸網(wǎng)頁的三種方法,以及如何繞過一些常見的安全防護(hù)措施,需要的朋友可以參考下2024-01-01詳解Python中內(nèi)置的NotImplemented類型的用法
這篇文章主要介紹了詳解Python中內(nèi)置的NotImplemented類型的用法,包括對相關(guān)的__eq__()和__ne__()兩個方法使用的講解,需要的朋友可以參考下2015-03-03Keras實現(xiàn)DenseNet結(jié)構(gòu)操作
這篇文章主要介紹了Keras實現(xiàn)DenseNet結(jié)構(gòu)操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-07-07