Python的Flask框架應(yīng)用調(diào)用Redis隊(duì)列數(shù)據(jù)的方法
任務(wù)異步化
打開(kāi)瀏覽器,輸入地址,按下回車(chē),打開(kāi)了頁(yè)面。于是一個(gè)HTTP請(qǐng)求(request)就由客戶(hù)端發(fā)送到服務(wù)器,服務(wù)器處理請(qǐng)求,返回響應(yīng)(response)內(nèi)容。
我們每天都在瀏覽網(wǎng)頁(yè),發(fā)送大大小小的請(qǐng)求給服務(wù)器。有時(shí)候,服務(wù)器接到了請(qǐng)求,會(huì)發(fā)現(xiàn)他也需要給另外的服務(wù)器發(fā)送請(qǐng)求,或者服務(wù)器也需要做另外一些事情,于是最初們發(fā)送的請(qǐng)求就被阻塞了,也就是要等待服務(wù)器完成其他的事情。
更多的時(shí)候,服務(wù)器做的額外事情,并不需要客戶(hù)端等待,這時(shí)候就可以把這些額外的事情異步去做。從事異步任務(wù)的工具有很多。主要原理還是處理通知消息,針對(duì)通知消息通常采取是隊(duì)列結(jié)構(gòu)。生產(chǎn)和消費(fèi)消息進(jìn)行通信和業(yè)務(wù)實(shí)現(xiàn)。
生產(chǎn)消費(fèi)與隊(duì)列
上述異步任務(wù)的實(shí)現(xiàn),可以抽象為生產(chǎn)者消費(fèi)模型。如同一個(gè)餐館,廚師在做飯,吃貨在吃飯。如果廚師做了很多,暫時(shí)賣(mài)不完,廚師就會(huì)休息;如果客戶(hù)很多,廚師馬不停蹄的忙碌,客戶(hù)則需要慢慢等待。實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者的方式用很多,下面使用Python標(biāo)準(zhǔn)庫(kù)Queue寫(xiě)個(gè)小例子:
import random import time from Queue import Queue from threading import Thread queue = Queue(10) class Producer(Thread): def run(self): while True: elem = random.randrange(9) queue.put(elem) print "廚師 {} 做了 {} 飯 --- 還剩 {} 飯沒(méi)賣(mài)完".format(self.name, elem, queue.qsize()) time.sleep(random.random()) class Consumer(Thread): def run(self): while True: elem = queue.get() print "吃貨{} 吃了 {} 飯 --- 還有 {} 飯可以吃".format(self.name, elem, queue.qsize()) time.sleep(random.random()) def main(): for i in range(3): p = Producer() p.start() for i in range(2): c = Consumer() c.start() if __name__ == '__main__': main()
大概輸出如下:
廚師 Thread-1 做了 1 飯 --- 還剩 1 飯沒(méi)賣(mài)完 廚師 Thread-2 做了 8 飯 --- 還剩 2 飯沒(méi)賣(mài)完 廚師 Thread-3 做了 3 飯 --- 還剩 3 飯沒(méi)賣(mài)完 吃貨Thread-4 吃了 1 飯 --- 還有 2 飯可以吃 吃貨Thread-5 吃了 8 飯 --- 還有 1 飯可以吃 吃貨Thread-4 吃了 3 飯 --- 還有 0 飯可以吃 廚師 Thread-1 做了 0 飯 --- 還剩 1 飯沒(méi)賣(mài)完 廚師 Thread-2 做了 0 飯 --- 還剩 2 飯沒(méi)賣(mài)完 廚師 Thread-1 做了 1 飯 --- 還剩 3 飯沒(méi)賣(mài)完 廚師 Thread-1 做了 1 飯 --- 還剩 4 飯沒(méi)賣(mài)完 吃貨Thread-4 吃了 0 飯 --- 還有 3 飯可以吃 廚師 Thread-3 做了 3 飯 --- 還剩 4 飯沒(méi)賣(mài)完 吃貨Thread-5 吃了 0 飯 --- 還有 3 飯可以吃 吃貨Thread-5 吃了 1 飯 --- 還有 2 飯可以吃 廚師 Thread-2 做了 8 飯 --- 還剩 3 飯沒(méi)賣(mài)完 廚師 Thread-2 做了 8 飯 --- 還剩 4 飯沒(méi)賣(mài)完
Redis 隊(duì)列
Python內(nèi)置了一個(gè)好用的隊(duì)列結(jié)構(gòu)。我們也可以是用redis實(shí)現(xiàn)類(lèi)似的操作。并做一個(gè)簡(jiǎn)單的異步任務(wù)。
Redis提供了兩種方式來(lái)作消息隊(duì)列。一個(gè)是使用生產(chǎn)者消費(fèi)模式模式,另外一個(gè)方法就是發(fā)布訂閱者模式。前者會(huì)讓一個(gè)或者多個(gè)客戶(hù)端監(jiān)聽(tīng)消息隊(duì)列,一旦消息到達(dá),消費(fèi)者馬上消費(fèi),誰(shuí)先搶到算誰(shuí)的,如果隊(duì)列里沒(méi)有消息,則消費(fèi)者繼續(xù)監(jiān)聽(tīng)。后者也是一個(gè)或多個(gè)客戶(hù)端訂閱消息頻道,只要發(fā)布者發(fā)布消息,所有訂閱者都能收到消息,訂閱者都是ping的。
生產(chǎn)消費(fèi)模式
主要使用了redis提供的blpop獲取隊(duì)列數(shù)據(jù),如果隊(duì)列沒(méi)有數(shù)據(jù)則阻塞等待,也就是監(jiān)聽(tīng)。
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host='localhost', db=5) self.queue = 'task:prodcons:queue' def listen_task(self): while True: task = self.rcon.blpop(self.queue, 0)[1] print "Task get", task if __name__ == '__main__': print 'listen task queue' Task().listen_task()
發(fā)布訂閱模式
使用redis的pubsub功能,訂閱者訂閱頻道,發(fā)布者發(fā)布消息到頻道了,頻道就是一個(gè)消息隊(duì)列。
import redis class Task(object): def __init__(self): self.rcon = redis.StrictRedis(host='localhost', db=5) self.ps = self.rcon.pubsub() self.ps.subscribe('task:pubsub:channel') def listen_task(self): for i in self.ps.listen(): if i['type'] == 'message': print "Task get", i['data'] if __name__ == '__main__': print 'listen task channel' Task().listen_task()
Flask 入口
我們分別實(shí)現(xiàn)了兩種異步任務(wù)的后端服務(wù),直接啟動(dòng)他們,就能監(jiān)聽(tīng)redis隊(duì)列或頻道的消息了。簡(jiǎn)單的測(cè)試如下:
import redis import random import logging from flask import Flask, redirect app = Flask(__name__) rcon = redis.StrictRedis(host='localhost', db=5) prodcons_queue = 'task:prodcons:queue' pubsub_channel = 'task:pubsub:channel' @app.route('/') def index(): html = """ <br> <center><h3>Redis Message Queue</h3> <br> <a href="/prodcons">生產(chǎn)消費(fèi)者模式</a> <br> <br> <a href="/pubsub">發(fā)布訂閱者模式</a> </center> """ return html @app.route('/prodcons') def prodcons(): elem = random.randrange(10) rcon.lpush(prodcons_queue, elem) logging.info("lpush {} -- {}".format(prodcons_queue, elem)) return redirect('/') @app.route('/pubsub') def pubsub(): ps = rcon.pubsub() ps.subscribe(pubsub_channel) elem = random.randrange(10) rcon.publish(pubsub_channel, elem) return redirect('/') if __name__ == '__main__': app.run(debug=True)
啟動(dòng)腳本,使用
siege -c10 -r 5 http://127.0.0.1:5000/prodcons siege -c10 -r 5 http://127.0.0.1:5000/pubsub
可以分別在監(jiān)聽(tīng)的腳本輸入中看到異步消息。在異步的任務(wù)中,可以執(zhí)行一些耗時(shí)間的操作,當(dāng)然目前這些做法并不知道異步的執(zhí)行結(jié)果,如果需要知道異步的執(zhí)行結(jié)果,可以考慮設(shè)計(jì)協(xié)程任務(wù)或者使用一些工具如RQ或者celery等。
相關(guān)文章
matplotlib實(shí)現(xiàn)顯示偽彩色圖像及色度條
今天小編就為大家分享一篇matplotlib實(shí)現(xiàn)顯示偽彩色圖像及色度條,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12Python 專(zhuān)題六 局部變量、全局變量global、導(dǎo)入模塊變量
本文主要講述python全局變量、局部變量和導(dǎo)入模塊變量的方法。具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧2017-03-03python3利用tcp實(shí)現(xiàn)文件夾遠(yuǎn)程傳輸
這篇文章主要為大家詳細(xì)介紹了python3利用tcp實(shí)現(xiàn)文件夾遠(yuǎn)程傳輸,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-07-07python 如何通過(guò)KNN來(lái)填充缺失值
這篇文章主要介紹了python 通過(guò)KNN來(lái)填充缺失值的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-05-05解決TensorFlow GPU版出現(xiàn)OOM錯(cuò)誤的問(wèn)題
今天小編就為大家分享一篇解決TensorFlow GPU版出現(xiàn)OOM錯(cuò)誤的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-02-02如何利用Python和OpenCV對(duì)圖像進(jìn)行加水印詳解
Python使用opencv是因?yàn)橛X(jué)得它足夠強(qiáng)大,很多圖像處理這塊都是用的它,最近就用opencv添加個(gè)水印,這篇文章主要給大家介紹了關(guān)于如何利用Python和OpenCV對(duì)圖像進(jìn)行加水印的相關(guān)資料,需要的朋友可以參考下2021-10-10python上下文管理的使用場(chǎng)景實(shí)例講解
在本篇文章里小編給大家整理的是一篇關(guān)于python上下文管理的使用場(chǎng)景實(shí)例講解內(nèi)容,有興趣的朋友們可以學(xué)習(xí)下。2021-03-03python如何通過(guò)twisted搭建socket服務(wù)
這篇文章主要介紹了python如何通過(guò)twisted搭建socket服務(wù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-02-02