Python如何快速實(shí)現(xiàn)分布式任務(wù)
深入讀了讀python的官方文檔,發(fā)覺Python自帶的multiprocessing模塊有很多預(yù)制的接口可以方便的實(shí)現(xiàn)多個(gè)主機(jī)之間的通訊,進(jìn)而實(shí)現(xiàn)典型的生產(chǎn)者-消費(fèi)者模式的分布式任務(wù)架構(gòu)。
之前,為了在Python中實(shí)現(xiàn)生產(chǎn)者-消費(fèi)者模式,往往就會選擇一個(gè)額外的隊(duì)列系統(tǒng),比如rabbitMQ之類。此外,你有可能還要設(shè)計(jì)一套任務(wù)對象的序列化方式以便塞入隊(duì)列。如果沒有隊(duì)列的支持,那不排除有些同學(xué)不得不從socket服務(wù)器做起,直接跟TCP/IP打起交道來。
其實(shí)multiprocessing.managers中有個(gè)BaseManager就為開發(fā)者提供了這樣一個(gè)快速接口。
我們假定的場景是1個(gè)生產(chǎn)者(producer.py)+8個(gè)消費(fèi)者(worker.py)的系統(tǒng),還有一個(gè)中央節(jié)點(diǎn)負(fù)責(zé)協(xié)調(diào)(server.py)實(shí)現(xiàn)如下:
server.py
from multiprocessing.managers import BaseManager import Queue queue = Queue.Queue() #初始化一個(gè)Q,用于消息傳遞 class QueueManager(BaseManager): pass QueueManager.register('get_queue', callable=lambda:queue) # 在系統(tǒng)中發(fā)布get_queue這個(gè)業(yè)務(wù) if __name__ == '__main__': m = QueueManager(address=('10.239.85.193', 50000),authkey='abr' ) # 監(jiān)聽所有10.239.85.193的50000口 s = m.get_server() s.serve_forever()
worker.py
from multiprocessing.managers import BaseManager from multiprocessing import Pool class QueueManager(BaseManager): pass QueueManager.register('get_queue') def feb(i): #經(jīng)典的'山羊增殖' if i < 2: return 1 if i < 5 : return feb(i-1) + feb(i-2) return feb(i-1) + feb(i-2) - feb(i-5) def worker(i): m = QueueManager(address=('10.239.85.193', 50000), authkey='abr') #連接server m.connect() while True: queue = m.get_queue() # 獲取Q c = queue.get() print feb(c) if __name__ == '__main__': p = Pool(8) # 分進(jìn)程啟動8個(gè)worker p.map(worker, range(8)) producer.py from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass QueueManager.register('get_queue') if __name__ == '__main__': m = QueueManager(address=('10.239.85.193', 50000), authkey='abr') m.connect() i = 0 while True: queue = m.get_queue() queue.put(48) i+=1
系統(tǒng)會直接將Queue() 對象中的數(shù)據(jù)直接封裝后通過TCP 50000端口在主機(jī)之間傳遞。不過需要注意的是,由于authkey的緣故,各個(gè)節(jié)點(diǎn)要求python的版本一致。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
vim自動補(bǔ)全插件YouCompleteMe(YCM)安裝過程解析
這篇文章主要介紹了vim自動補(bǔ)全插件YouCompleteMe(YCM)安裝過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-10-10Python整型運(yùn)算之布爾型、標(biāo)準(zhǔn)整型、長整型操作示例
這篇文章主要介紹了Python整型運(yùn)算之布爾型、標(biāo)準(zhǔn)整型、長整型操作,結(jié)合具體實(shí)例形式分析了Python中布爾型、標(biāo)準(zhǔn)整型、長整型等相關(guān)運(yùn)算技巧,代碼備有詳盡注釋,需要的朋友可以參考下2017-07-07Python Pyqt5多線程更新UI代碼實(shí)例(防止界面卡死)
這篇文章通過代碼實(shí)例給大家介紹了Python Pyqt5多線程更新UI防止界面卡死的問題,代碼簡單易懂,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-12-12PyCharm2020.1.2社區(qū)版安裝,配置及使用教程詳解(Windows)
這篇文章主要介紹了PyCharm2020.1.2社區(qū)版安裝,配置及使用教程(Windows),本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08Tensorflow tf.tile()的用法實(shí)例分析
這篇文章主要介紹了Tensorflow tf.tile()的用法實(shí)例分析,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05