Python的Tornado框架的異步任務(wù)與AsyncHTTPClient
高性能服務(wù)器Tornado
Python的web框架名目繁多,各有千秋。正如光榮屬于希臘,偉大屬于羅馬。Python的優(yōu)雅結(jié)合WSGI的設(shè)計(jì),讓web框架接口實(shí)現(xiàn)千秋一統(tǒng)。WSGI 把應(yīng)用(Application)和服務(wù)器(Server)結(jié)合起來(lái)。Django 和 Flask 都可以結(jié)合 gunicon 搭建部署應(yīng)用。
與 django 和 flask 不一樣,tornado 既可以是 wsgi 應(yīng)用,也可以是 wsgi 服務(wù)。當(dāng)然,選擇tornado更多的考量源于其單進(jìn)程單線(xiàn)程異步IO的網(wǎng)絡(luò)模式。高性能往往吸引人,可是有不少朋友使用之后會(huì)提出疑問(wèn),tornado號(hào)稱(chēng)高性能,實(shí)際使用的時(shí)候卻怎么感受不到呢?
實(shí)際上,高性能源于Tornado基于Epoll(unix為kqueue)的異步網(wǎng)絡(luò)IO。因?yàn)閠ornado的單線(xiàn)程機(jī)制,一不小心就容易寫(xiě)出阻塞服務(wù)(block)的代碼。不但沒(méi)有性能提高,反而會(huì)讓性能急劇下降。因此,探索tornado的異步使用方式很有必要。
Tornado 異步使用方式
簡(jiǎn)而言之,Tornado的異步包括兩個(gè)方面,異步服務(wù)端和異步客戶(hù)端。無(wú)論服務(wù)端和客戶(hù)端,具體的異步模型又可以分為回調(diào)(callback)和協(xié)程(coroutine)。具體應(yīng)用場(chǎng)景,也沒(méi)有很明確的界限。往往一個(gè)請(qǐng)求服務(wù)里還包含對(duì)別的服務(wù)的客戶(hù)端異步請(qǐng)求。
服務(wù)端異步方式
服務(wù)端異步,可以理解為一個(gè)tornado請(qǐng)求之內(nèi),需要做一個(gè)耗時(shí)的任務(wù)。直接寫(xiě)在業(yè)務(wù)邏輯里可能會(huì)block整個(gè)服務(wù)。因此可以把這個(gè)任務(wù)放到異步處理,實(shí)現(xiàn)異步的方式就有兩種,一種是yield掛起函數(shù),另外一種就是使用類(lèi)線(xiàn)程池的方式。請(qǐng)看一個(gè)同步例子:
class SyncHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): # 耗時(shí)的代碼 os.system("ping -c 2 www.google.com") self.finish('It works')
使用ab測(cè)試一下:
ab -c 5 -n 5 http://127.0.0.1:5000/sync
Server Software: TornadoServer/4.3 Server Hostname: 127.0.0.1 Server Port: 5000 Document Path: /sync Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 5.076 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 0.99 [#/sec] (mean) Time per request: 5076.015 [ms] (mean) Time per request: 1015.203 [ms] (mean, across all concurrent requests) Transfer rate: 0.19 [Kbytes/sec] received
qps 僅有可憐的 0.99,姑且當(dāng)成每秒處理一個(gè)請(qǐng)求吧。
下面祭出異步大法:
class AsyncHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): tornado.ioloop.IOLoop.instance().add_timeout(1, callback=functools.partial(self.ping, 'www.google.com')) # do something others self.finish('It works') @tornado.gen.coroutine def ping(self, url): os.system("ping -c 2 {}".format(url)) return 'after'
盡管在執(zhí)行異步任務(wù)的時(shí)候選擇了timeout 1秒,主線(xiàn)程的返回還是很快的。ab壓測(cè)如下:
Document Path: /async Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.009 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 556.92 [#/sec] (mean) Time per request: 8.978 [ms] (mean) Time per request: 1.796 [ms] (mean, across all concurrent requests) Transfer rate: 107.14 [Kbytes/sec] received
上述的使用方式,通過(guò)tornado的IO循環(huán),把可以把耗時(shí)的任務(wù)放到后臺(tái)異步計(jì)算,請(qǐng)求可以接著做別的計(jì)算??墒牵?jīng)常有一些耗時(shí)的任務(wù)完成之后,我們需要其計(jì)算的結(jié)果。此時(shí)這種方式就不行了。車(chē)道山前必有路,只需要切換一異步方式即可。下面使用協(xié)程來(lái)改寫(xiě):
class AsyncTaskHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): # yield 結(jié)果 response = yield tornado.gen.Task(self.ping, ' www.google.com') print 'response', response self.finish('hello') @tornado.gen.coroutine def ping(self, url): os.system("ping -c 2 {}".format(url)) return 'after'
可以看到異步在處理,而結(jié)果值也被返回了。
Server Software: TornadoServer/4.3 Server Hostname: 127.0.0.1 Server Port: 5000 Document Path: /async/task Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.049 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 101.39 [#/sec] (mean) Time per request: 49.314 [ms] (mean) Time per request: 9.863 [ms] (mean, across all concurrent requests) Transfer rate: 19.51 [Kbytes/sec] received
qps提升還是很明顯的。有時(shí)候這種協(xié)程處理,未必就比同步快。在并發(fā)量很小的情況下,IO本身拉開(kāi)的差距并不大。甚至協(xié)程和同步性能差不多。例如你跟博爾特跑100米肯定輸給他,可是如果跟他跑2米,鹿死誰(shuí)手還未定呢。
yield掛起函數(shù)協(xié)程,盡管沒(méi)有block主線(xiàn)程,因?yàn)樾枰幚矸祷刂?,掛起到響?yīng)執(zhí)行還是有時(shí)間等待,相對(duì)于單個(gè)請(qǐng)求而言。另外一種使用異步和協(xié)程的方式就是在主線(xiàn)程之外,使用線(xiàn)程池,線(xiàn)程池依賴(lài)于futures。Python2需要額外安裝。
下面使用線(xiàn)程池的方式修改為異步處理:
from concurrent.futures import ThreadPoolExecutor class FutureHandler(tornado.web.RequestHandler): executor = ThreadPoolExecutor(10) @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): url = 'www.google.com' tornado.ioloop.IOLoop.instance().add_callback(functools.partial(self.ping, url)) self.finish('It works') @tornado.concurrent.run_on_executor def ping(self, url): os.system("ping -c 2 {}".format(url))
再運(yùn)行ab測(cè)試:
Document Path: /future Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.003 seconds Complete requests: 5 Failed requests: 0 Total transferred: 995 bytes HTML transferred: 25 bytes Requests per second: 1912.78 [#/sec] (mean) Time per request: 2.614 [ms] (mean) Time per request: 0.523 [ms] (mean, across all concurrent requests) Transfer rate: 371.72 [Kbytes/sec] received
qps瞬間達(dá)到了1912.78。同時(shí),可以看到服務(wù)器的log還在不停的輸出ping的結(jié)果。
想要返回值也很容易。再切換一下使用方式接口。使用tornado的gen模塊下的with_timeout功能(這個(gè)功能必須在tornado>3.2的版本)。
class Executor(ThreadPoolExecutor): _instance = None def __new__(cls, *args, **kwargs): if not getattr(cls, '_instance', None): cls._instance = ThreadPoolExecutor(max_workers=10) return cls._instance class FutureResponseHandler(tornado.web.RequestHandler): executor = Executor() @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): future = Executor().submit(self.ping, 'www.google.com') response = yield tornado.gen.with_timeout(datetime.timedelta(10), future, quiet_exceptions=tornado.gen.TimeoutError) if response: print 'response', response.result() @tornado.concurrent.run_on_executor def ping(self, url): os.system("ping -c 1 {}".format(url)) return 'after'
線(xiàn)程池的方式也可以通過(guò)使用tornado的yield把函數(shù)掛起,實(shí)現(xiàn)了協(xié)程處理??梢缘贸龊臅r(shí)任務(wù)的result,同時(shí)不會(huì)block住主線(xiàn)程。
Concurrency Level: 5 Time taken for tests: 0.043 seconds Complete requests: 5 Failed requests: 0 Total transferred: 960 bytes HTML transferred: 0 bytes Requests per second: 116.38 [#/sec] (mean) Time per request: 42.961 [ms] (mean) Time per request: 8.592 [ms] (mean, across all concurrent requests) Transfer rate: 21.82 [Kbytes/sec] received
qps為116,使用yield協(xié)程的方式,僅為非reponse的十分之一左右??雌饋?lái)性能損失了很多,主要原因這個(gè)協(xié)程返回結(jié)果需要等執(zhí)行完畢任務(wù)。
好比打魚(yú),前一種方式是撒網(wǎng),然后就完事,不聞不問(wèn),時(shí)間當(dāng)然快,后一種方式則撒網(wǎng)之后,還得收網(wǎng),等待收網(wǎng)也是一段時(shí)間。當(dāng)然,相比同步的方式還是快了千百倍,畢竟撒網(wǎng)還是比一只只釣比較快。
具體使用何種方式,更多的依賴(lài)業(yè)務(wù),不需要返回值的往往需要處理callback,回調(diào)太多容易暈菜,當(dāng)然如果需要很多回調(diào)嵌套,首先優(yōu)化的應(yīng)該是業(yè)務(wù)或產(chǎn)品邏輯。yield的方式很優(yōu)雅,寫(xiě)法可以異步邏輯同步寫(xiě),爽是爽了,當(dāng)然也會(huì)損失一定的性能。
異步多樣化
Tornado異步服務(wù)的處理大抵如此?,F(xiàn)在異步處理的框架和庫(kù)也很多,借助redis或者celery等,也可以把tonrado中一些業(yè)務(wù)異步化,放到后臺(tái)執(zhí)行。
此外,Tornado還有客戶(hù)端異步功能。該特性主要是在于 AsyncHTTPClient的使用。此時(shí)的應(yīng)用場(chǎng)景往往是tornado服務(wù)內(nèi),需要針對(duì)另外的IO進(jìn)行請(qǐng)求和處理。順便提及,上述的例子中,調(diào)用ping其實(shí)也算是一種服務(wù)內(nèi)的IO處理。接下來(lái),將會(huì)探索一下AsyncHTTPClient的使用,尤其是使用AsyncHTTPClient上傳文件與轉(zhuǎn)發(fā)請(qǐng)求。
異步客戶(hù)端
前面了解Tornado的異步任務(wù)的常用做法,姑且歸結(jié)為異步服務(wù)。通常在我們的服務(wù)內(nèi),還需要異步的請(qǐng)求第三方服務(wù)。針對(duì)HTTP請(qǐng)求,Python的庫(kù)Requests是最好用的庫(kù),沒(méi)有之一。官網(wǎng)宣稱(chēng):HTTP for Human。然而,在tornado中直接使用requests將會(huì)是一場(chǎng)惡夢(mèng)。requests的請(qǐng)求會(huì)block整個(gè)服務(wù)進(jìn)程。
上帝關(guān)上門(mén)的時(shí)候,往往回打開(kāi)一扇窗。Tornado提供了一個(gè)基于框架本身的異步HTTP客戶(hù)端(當(dāng)然也有同步的客戶(hù)端)--- AsyncHTTPClient。
AsyncHTTPClient 基本用法
AsyncHTTPClient是 tornado.httpclinet 提供的一個(gè)異步http客戶(hù)端。使用也比較簡(jiǎn)單。與服務(wù)進(jìn)程一樣,AsyncHTTPClient也可以callback和yield兩種使用方式。前者不會(huì)返回結(jié)果,后者則會(huì)返回response。
如果請(qǐng)求第三方服務(wù)是同步方式,同樣會(huì)殺死性能。
class SyncHandler(tornado.web.RequestHandler): def get(self, *args, **kwargs): url = 'https://api.github.com/' resp = requests.get(url) print resp.status_code self.finish('It works')
使用ab測(cè)試大概如下:
Document Path: /sync Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 10.255 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 0.49 [#/sec] (mean) Time per request: 10255.051 [ms] (mean) Time per request: 2051.010 [ms] (mean, across all concurrent requests) Transfer rate: 0.09 [Kbytes/sec] received
性能相當(dāng)慢了,換成AsyncHTTPClient再測(cè):
class AsyncHandler(tornado.web.RequestHandler): @tornado.web.asynchronous def get(self, *args, **kwargs): url = 'https://api.github.com/' http_client = tornado.httpclient.AsyncHTTPClient() http_client.fetch(url, self.on_response) self.finish('It works') @tornado.gen.coroutine def on_response(self, response): print response.code
qps 提高了很多
Document Path: /async Document Length: 5 bytes Concurrency Level: 5 Time taken for tests: 0.162 seconds Complete requests: 5 Failed requests: 0 Total transferred: 985 bytes HTML transferred: 25 bytes Requests per second: 30.92 [#/sec] (mean) Time per request: 161.714 [ms] (mean) Time per request: 32.343 [ms] (mean, across all concurrent requests) Transfer rate: 5.95 [Kbytes/sec] received
同樣,為了獲取response的結(jié)果,只需要yield函數(shù)。
class AsyncResponseHandler(tornado.web.RequestHandler): @tornado.web.asynchronous @tornado.gen.coroutine def get(self, *args, **kwargs): url = 'https://api.github.com/' http_client = tornado.httpclient.AsyncHTTPClient() response = yield tornado.gen.Task(http_client.fetch, url) print response.code print response.body
AsyncHTTPClient 轉(zhuǎn)發(fā)
使用Tornado經(jīng)常需要做一些轉(zhuǎn)發(fā)服務(wù),需要借助AsyncHTTPClient。既然是轉(zhuǎn)發(fā),就不可能只有g(shù)et方法,post,put,delete等方法也會(huì)有。此時(shí)涉及到一些 headers和body,甚至還有https的waring。
下面請(qǐng)看一個(gè)post的例子, yield結(jié)果,通常,使用yield的時(shí)候,handler是需要 tornado.gen.coroutine。
headers = self.request.headers body = json.dumps({'name': 'rsj217'}) http_client = tornado.httpclient.AsyncHTTPClient() resp = yield tornado.gen.Task( self.http_client.fetch, url, method="POST", headers=headers, body=body, validate_cert=False)
AsyncHTTPClient 構(gòu)造請(qǐng)求
如果業(yè)務(wù)處理并不是在handlers寫(xiě)的,而是在別的地方,當(dāng)無(wú)法直接使用tornado.gen.coroutine的時(shí)候,可以構(gòu)造請(qǐng)求,使用callback的方式。
body = urllib.urlencode(params) req = tornado.httpclient.HTTPRequest( url=url, method='POST', body=body, validate_cert=False) http_client.fetch(req, self.handler_response) def handler_response(self, response): print response.code
用法也比較簡(jiǎn)單,AsyncHTTPClient中的fetch方法,第一個(gè)參數(shù)其實(shí)是一個(gè)HTTPRequest實(shí)例對(duì)象,因此對(duì)于一些和http請(qǐng)求有關(guān)的參數(shù),例如method和body,可以使用HTTPRequest先構(gòu)造一個(gè)請(qǐng)求,再扔給fetch方法。通常在轉(zhuǎn)發(fā)服務(wù)的時(shí)候,如果開(kāi)起了validate_cert,有可能會(huì)返回599timeout之類(lèi),這是一個(gè)warning,官方卻認(rèn)為是合理的。
AsyncHTTPClient 上傳圖片
AsyncHTTPClient 更高級(jí)的用法就是上傳圖片。例如服務(wù)有一個(gè)功能就是請(qǐng)求第三方服務(wù)的圖片OCR服務(wù)。需要把用戶(hù)上傳的圖片,再轉(zhuǎn)發(fā)給第三方服務(wù)。
@router.Route('/api/v2/account/upload') class ApiAccountUploadHandler(helper.BaseHandler): @tornado.gen.coroutine @helper.token_require def post(self, *args, **kwargs): upload_type = self.get_argument('type', None) files_body = self.request.files['file'] new_file = 'upload/new_pic.jpg' new_file_name = 'new_pic.jpg' # 寫(xiě)入文件 with open(new_file, 'w') as w: w.write(file_['body']) logging.info('user {} upload {}'.format(user_id, new_file_name)) # 異步請(qǐng)求 上傳圖片 with open(new_file, 'rb') as f: files = [('image', new_file_name, f.read())] fields = (('api_key', KEY), ('api_secret', SECRET)) content_type, body = encode_multipart_formdata(fields, files) headers = {"Content-Type": content_type, 'content-length': str(len(body))} request = tornado.httpclient.HTTPRequest(config.OCR_HOST, method="POST", headers=headers, body=body, validate_cert=False) response = yield tornado.httpclient.AsyncHTTPClient().fetch(request) def encode_multipart_formdata(fields, files): """ fields is a sequence of (name, value) elements for regular form fields. files is a sequence of (name, filename, value) elements for data to be uploaded as files. Return (content_type, body) ready for httplib.HTTP instance """ boundary = '----------ThIs_Is_tHe_bouNdaRY_$' crlf = '\r\n' l = [] for (key, value) in fields: l.append('--' + boundary) l.append('Content-Disposition: form-data; name="%s"' % key) l.append('') l.append(value) for (key, filename, value) in files: filename = filename.encode("utf8") l.append('--' + boundary) l.append( 'Content-Disposition: form-data; name="%s"; filename="%s"' % ( key, filename ) ) l.append('Content-Type: %s' % get_content_type(filename)) l.append('') l.append(value) l.append('--' + boundary + '--') l.append('') body = crlf.join(l) content_type = 'multipart/form-data; boundary=%s' % boundary return content_type, body def get_content_type(filename): import mimetypes return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
對(duì)比上述的用法,上傳圖片僅僅是多了一個(gè)圖片的編碼。將圖片的二進(jìn)制數(shù)據(jù)按照multipart 方式編碼。編碼的同時(shí),還需要把傳遞的相關(guān)的字段處理好。相比之下,使用requests 的方式則非常簡(jiǎn)單:
files = {} f = open('/Users/ghost/Desktop/id.jpg') files['image'] = f data = dict(api_key='KEY', api_secret='SECRET') resp = requests.post(url, data=data, files=files) f.close() print resp.status_Code
總結(jié)
通過(guò)AsyncHTTPClient的使用方式,可以輕松的實(shí)現(xiàn)handler對(duì)第三方服務(wù)的請(qǐng)求。結(jié)合前面關(guān)于tornado異步的使用方式。無(wú)非還是兩個(gè)key。是否需要返回結(jié)果,來(lái)確定使用callback的方式還是yield的方式。當(dāng)然,如果不同的函數(shù)都yield,yield也可以一直傳遞。這個(gè)特性,tornado的中的tornado.auth 里面對(duì)oauth的認(rèn)證。
大致就是這樣的用法。
- 深入解析Python的Tornado框架中內(nèi)置的模板引擎
- 使用Python的Tornado框架實(shí)現(xiàn)一個(gè)Web端圖書(shū)展示頁(yè)面
- 為Python的Tornado框架配置使用Jinja2模板引擎的方法
- Python的Tornado框架實(shí)現(xiàn)異步非阻塞訪問(wèn)數(shù)據(jù)庫(kù)的示例
- Python的Tornado框架實(shí)現(xiàn)圖片上傳及圖片大小修改功能
- Python Web框架Tornado運(yùn)行和部署
- 剖析Python的Tornado框架中session支持的實(shí)現(xiàn)代碼
- python Tornado框架詳解
相關(guān)文章
Python切換pip源兩種方法(解決pip?install慢)
這篇文章主要給大家介紹了關(guān)于Python切換pip源兩種方法(解決pip?install慢),我總結(jié)的這幾種更換pip源的常用方式,希望可以幫助您成功配置國(guó)內(nèi)源,解決安裝Python包速度慢的問(wèn)題,需要的朋友可以參考下2023-11-11Python自動(dòng)化爬取天眼查數(shù)據(jù)的實(shí)現(xiàn)
本文將結(jié)合實(shí)例代碼,介紹Python自動(dòng)化爬取天眼查數(shù)據(jù)的實(shí)現(xiàn),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-06-06Python Nose框架編寫(xiě)測(cè)試用例方法
這篇文章主要介紹了Python Nose框架編寫(xiě)測(cè)試用例方法,需要的朋友可以參考下2017-10-10Python實(shí)現(xiàn)將SQLite中的數(shù)據(jù)直接輸出為CVS的方法示例
這篇文章主要介紹了Python實(shí)現(xiàn)將SQLite中的數(shù)據(jù)直接輸出為CVS的方法,涉及Python連接、讀取SQLite數(shù)據(jù)庫(kù)及轉(zhuǎn)換CVS格式數(shù)據(jù)的相關(guān)操作技巧,需要的朋友可以參考下2017-07-07PyTorch實(shí)現(xiàn)ResNet50、ResNet101和ResNet152示例
今天小編就為大家分享一篇PyTorch實(shí)現(xiàn)ResNet50、ResNet101和ResNet152示例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-01-01Python while 循環(huán)使用的簡(jiǎn)單實(shí)例
下面小編就為大家?guī)?lái)一篇Python while 循環(huán)使用的簡(jiǎn)單實(shí)例。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-06-06