詳解Python 實現(xiàn) ZeroMQ 的三種基本工作模式
簡介
引用官方說法:ZMQ(以下 ZeroMQ 簡稱 ZMQ)是一個簡單好用的傳輸層,像框架一樣的一個 socket library,他使得 Socket 編程更加簡單、簡潔和性能更高。
是一個消息處理隊列庫,可在多個線程、內核和主機盒之間彈性伸縮。
ZMQ 的明確目標是“成為標準網絡協(xié)議棧的一部分,之后進入 Linux 內核”?,F(xiàn)在還未看到它們的成功。但是,它無疑是極具前景的、并且是人們更加需要的“傳統(tǒng)” BSD 套接字之上的一 層封裝。ZMQ 讓編寫高性能網絡應用程序極為簡單和有趣。
它跟 RabbitMQ,ActiveMQ 之類有著相當本質的區(qū)別,ZeroMQ 根本就不是一個消息隊列服務器,更像是一組底層網絡通訊庫,對原有的 Socket API 加上一層封裝,使我們操作更簡便。
三種工作模式
Request-Reply 模式:
說到“請求-應答”模式,不得不說的就是它的消息流動模型。消息流動模型指的是該模式下,必須嚴格遵守“一問一答”的方式。
發(fā)出消息后,若沒有收到回復,再發(fā)出第二條消息時就會拋出異常。同樣的,對于 Rep 也是,在沒有接收到消息前,不允許發(fā)出消息。
基于此構成“一問一答”的響應模式。
server:
# -*- coding=utf-8 -*- import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() print("Received: %s" % message) socket.send("I am OK!")
client:
# -*- coding=utf-8 -*- import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") socket.send('Are you OK?') response = socket.recv() print("response: %s" % response)
Publish-Subscribe 模式:
“發(fā)布-訂閱”模式下,“發(fā)布者”綁定一個指定的地址,例如“192.168.10.1:5500”,“訂閱者”連接到該地址。該模式下消息流是單向的,只允許從“發(fā)布者”流向“訂閱者”。且“發(fā)布者”只管發(fā)消息,不理會是否存在“訂閱者”。一個“發(fā)布者”可以擁有多個訂閱者,同樣的,一個“訂閱者”也可訂閱多個發(fā)布者。
雖然我們知道“發(fā)布者”在發(fā)送消息時是不關心“訂閱者”的存在于否,所以先啟動“發(fā)布者”,再啟動“訂閱者”是很容易導致部分消息丟失的。那么可能會提出一個說法“我先啟動‘訂閱者',再啟動‘發(fā)布者',就能解決這個問題了?”
對于 ZeroMQ 而言,這種做法也并不能保證 100% 的可靠性。在 ZeroMQ 領域中,有一個叫做“慢木匠”的術語,就是說即使我是先啟動了“訂閱者”,再啟動“發(fā)布者”,“訂閱者”總是會丟失第一批數(shù)據(jù)。因為在“訂閱者”與端點建立 TCP 連接時,會包含幾毫秒的握手時間,雖然時間短,但是是存在的。再加上 ZeroMQ 后臺 IO 是以一部方式執(zhí)行的,所以若不在雙方之間施加同步策略,消息丟失是不可避免的。
關于“發(fā)布-訂閱”模式在 ZeroMQ 中的一些其他特點:
- 公平排隊,一個“訂閱者”連接到多個發(fā)布者時,會均衡的從每個“發(fā)布者”讀取消息,不會出現(xiàn)一個“發(fā)布者”淹沒其他“發(fā)布者”的情況。
- ZMQ3.0 以上的版本,過濾規(guī)則發(fā)生在“發(fā)布方”。 ZMQ3.0 以下的版本,過濾規(guī)則發(fā)生在“訂閱方”。其實也就是處理消息的位置。
server:
# -*- coding=utf-8 -*- import zmq import time context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5555") for i in range(10): print('send message...' + str(i)) socket.send('message' + str(i)) time.sleep(1)
client:
# -*- coding=utf-8 -*- import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE, '') while True: response = socket.recv() print("response: %s" % response)
Parallel Pipeline 模式:
在說明“管道模式”前,需要明確的是在 ZeroMQ 中并沒有絕對的服務端與客戶端之分,所有的數(shù)據(jù)接收與發(fā)送都是以連接為單位的,只區(qū)分 ZeroMQ 定義的類型。就像套接字綁定地址時,可以使用 bind ,也可以使用 connect ,只是通常我們將理解中的服務端 bind 到一個地址,而理解中的客戶端 connec 到該地址。
“管道模式”一般用于任務分發(fā)與結果收集,由一個任務發(fā)生器來產生任務,“公平”的派發(fā)到其管轄下的所有 worker,完成后再由結果收集器來回收任務的執(zhí)行結果。
整體流程比較好理解,worker 連接到任務發(fā)生器上,等待任務的產生,完成后將結果發(fā)送至結果收集器。如果要以客戶端服務端的概念來區(qū)分,這里的任務發(fā)生器與結果收集器是服務端,而 worker 是客戶端。
前面說到了這里任務的派發(fā)是“公平的”,因為內部采用了 LRU 的算法來找到最近最久未工作的閑置 worker。但是公平在這里是相對的,當任務發(fā)生器啟動后,第一個連接到它的 worker 會在一瞬間承受整個任務發(fā)生器產生的 tasks。
總結來說由三部分組成,push 進行數(shù)據(jù)推送,work 進行數(shù)據(jù)緩存,pull 進行數(shù)據(jù)競爭獲取處理。區(qū)別于 Publish-Subscribe 存在一個數(shù)據(jù)緩存和處理負載。
當連接被斷開,數(shù)據(jù)不會丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對端。
server:
# -*- coding=utf-8 -*- import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557") for i in range(10): socket.send('message' + str(i)) # 沒啟 worker 時不會發(fā)消息 print('send message...' + str(i)) time.sleep(1)
work:
# -*- coding=utf-8 -*- import zmq context = zmq.Context() receive = context.socket(zmq.PULL) receive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558') while True: data = receive.recv() print('transform...' + data) sender.send(data)
client:
# -*- coding=utf-8 -*- import zmq context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558") while True: response = socket.recv() print("response: %s" % response)
以上。
參考文檔:
http://www.dbjr.com.cn/article/177043.htm
總結
到此這篇關于詳解Python 實現(xiàn) ZeroMQ 的三種基本工作模式的文章就介紹到這了,更多相關python ZeroMQ工作模式內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
vscode搭建之python?Django環(huán)境配置方式
這篇文章主要介紹了vscode搭建之python?Django環(huán)境配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-01-01python [:3] 實現(xiàn)提取數(shù)組中的數(shù)
今天小編就為大家分享一篇python [:3] 實現(xiàn)提取數(shù)組中的數(shù),具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11Python調用ChatGPT的API實現(xiàn)文章生成
最近ChatGPT大火,在3.5版本后開放了接口API,所以很多人開始進行實操,這里我就用python來為大家實現(xiàn)一下,如何調用API并提問返回文章的說明2023-03-03Python數(shù)據(jù)分析23種Pandas核心操作方法總結
在本文中,作者從基本數(shù)據(jù)集讀寫、數(shù)據(jù)處理和?DataFrame?操作三個角度展示了?23?個?Pandas?核心方法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-05-05Python數(shù)據(jù)結構與算法之圖的最短路徑(Dijkstra算法)完整實例
這篇文章主要介紹了Python數(shù)據(jù)結構與算法之圖的最短路徑(Dijkstra算法),結合完整實例形式分析了Python圖的最短路徑算法相關原理與實現(xiàn)技巧,需要的朋友可以參考下2017-12-12python實現(xiàn)統(tǒng)計代碼行數(shù)的方法
這篇文章主要介紹了python實現(xiàn)統(tǒng)計代碼行數(shù)的方法,涉及Python中os模塊及codecs模塊的相關使用技巧,需要的朋友可以參考下2015-05-05