Python網(wǎng)絡(luò)編程之ZeroMQ知識(shí)總結(jié)
一、ZeroMQ概述
- ZeroMQ(又名ØMQ,MQ,或zmq)像一個(gè)可嵌入的網(wǎng)絡(luò)庫(kù),但其作用就像一個(gè)并發(fā)框架。
- ZeroMQ類似于標(biāo)準(zhǔn)Berkeley套接字,其提供了各種傳輸工具,如進(jìn)程內(nèi)、進(jìn)程間、TCP和組播中進(jìn)行原子消息傳送的套接字
- 可以使用各種模式實(shí)現(xiàn)N對(duì)N的套接字連接,這些模式包括:發(fā)布-訂閱、任務(wù)分配、請(qǐng)求-應(yīng)答。
- ZeroMQ的速度足夠快,因此可充當(dāng)集群產(chǎn)品的結(jié)構(gòu)。
- ZeroMQ的異步I/O模型提供了可擴(kuò)展的多核應(yīng)用程序,用異步消息來(lái)處理任務(wù)
- ZeroMQ核心由C語(yǔ)言編寫,支持C、C++、java、python等多種編程語(yǔ)言的API,并可運(yùn)行在大多數(shù)操作系統(tǒng)上
總結(jié)以下:ØMQ (ZeroMQ) 是一個(gè)基于消息隊(duì)列的多線程網(wǎng)絡(luò)庫(kù),它封裝了網(wǎng)絡(luò)通信、消息隊(duì)列、線程調(diào)度等功能,向上層提供簡(jiǎn)潔的API,應(yīng)用程序通過(guò)加載庫(kù)文件,調(diào)用API函數(shù)來(lái)實(shí)現(xiàn)高性能網(wǎng)絡(luò)通信。
看起來(lái)有些抽象,下面我們結(jié)合ZeroMQ 的 Python 封裝———— pyzmp,用實(shí)例看一下ZeroMQ的三種最基本的工作模式。
二、安裝
安裝方法
pip install pyzmq
查看是否安裝成功
>>> import zmq >>> print(zmq.__version__) 22.0.3
三、Request-Reply (請(qǐng)求響應(yīng)模式)
3.1 Request-Reply模式概述:
- 消息雙向的,有來(lái)有往。
- Client請(qǐng)求的消息,Server必須答復(fù)給Client。
- Client在請(qǐng)求后,Server必須回響應(yīng),注意:Server不返回響應(yīng)會(huì)報(bào)錯(cuò)。
- Server和Client都可以是1:N的模型。通常把1認(rèn)為是Server,N認(rèn)為是Client。
- 更底層的端點(diǎn)地址是對(duì)上層隱藏的,每個(gè)請(qǐng)求都隱含回應(yīng)地址,而應(yīng)用則不關(guān)心它。
- ZMQ 可以很好的支持路由功能(實(shí)現(xiàn)路由功能的組件叫做 Device),把 1:N 擴(kuò)展為 N:M(只需要加入若干路由節(jié)點(diǎn))。
3.2 Client端python實(shí)現(xiàn)
#client.py import zmq context = zmq.Context() # Socket to talk to server print("Connecting to hello world server…") socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") socket.send(b"Hello") # Get the reply. message = socket.recv() print(f"Received reply [ {message} ]")
3.3 Server端python實(shí)現(xiàn)
#server.py import time import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: %s" % message) # Do some 'work' time.sleep(1) # Send reply back to client socket.send(b"World")
- 啟動(dòng)client.py 首先會(huì)打印Connecting to hello world server… 但不會(huì)受到任何消息。
- 然后啟動(dòng)server.py ,客戶端收到來(lái)自客戶端的request: b'Hello'
- 此時(shí)client端收到來(lái)自server端的 reply: [ b'World' ]
python client.py Connecting to hello world server… Received reply [ b'World' ]
python server.py Received request: b'Hello'
可以試一下,多運(yùn)行幾個(gè)client.py,看看情況是什么樣的。
四、Publish/Subscribe(訂閱-發(fā)布模式 )
4.1 Pub-Subs模式概述:
- 消息單向,有去無(wú)回
- 一個(gè)發(fā)布端,多個(gè)訂閱端;發(fā)布端只管產(chǎn)生數(shù)據(jù),發(fā)布端發(fā)布一條消息,可被多個(gè)訂閱端同時(shí)收到。
- 發(fā)布者不必關(guān)心訂閱者的加入和離開,消息會(huì)以 1:N 的方式擴(kuò)散到每個(gè)訂閱者。
- 廣播所有client,沒(méi)有隊(duì)列緩存,斷開連接數(shù)據(jù)將永遠(yuǎn)丟失。
- 如果Publish端開始發(fā)布信息時(shí),Subscribe端尚未連接進(jìn)來(lái),則這些信息會(huì)被直接丟棄。
- PUB和SUB誰(shuí)bind誰(shuí)connect并無(wú)嚴(yán)格要求(雖本質(zhì)并無(wú)區(qū)別),但仍建議PUB使用bind,SUB使用connect
- 使用SUB設(shè)置一個(gè)訂閱時(shí),必須使用zmq_setsockopt()對(duì)消息進(jìn)行過(guò)濾
這里直接引用官方文檔的例子:
發(fā)布者:類似于一個(gè)天氣更新服務(wù)器,向訂閱者發(fā)送天氣更新,內(nèi)容包括郵政編碼、溫度、濕度等信息
#Publisher.py import zmq from random import randrange context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(-80, 135) relhumidity = randrange(10, 60) socket.send_string("%i %i %i" % (zipcode, temperature, relhumidity))
訂閱者:它監(jiān)聽發(fā)布者更新的數(shù)據(jù)流,過(guò)濾只接收與特定郵政編碼相關(guān)的天氣信息,默認(rèn)接收接收10條數(shù)據(jù)
#Subscribe.py import sys import zmq # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print("Collecting updates from weather server...") socket.connect("tcp://localhost:5556") # Subscribe to zipcode, default is NYC, 10001 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" # Python 2 - ascii bytes to unicode str if isinstance(zip_filter, bytes): zip_filter = zip_filter.decode('ascii') socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter) # Process 5 updates total_temp = 0 for update_nbr in range(5): string = socket.recv_string() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print( "Average temperature for zipcode '%s' was %dF" % (zip_filter, total_temp / (update_nbr + 1)) )
五、Push/Pull(流水線模式)
5.1 流水線模式概述:
- 主要用于多任務(wù)并行。
- 消息單向,有去無(wú)回。
- Push的任何一個(gè)消息,始終只會(huì)有一個(gè)Pull端收到消息。
- Push 端還是 Pull 端都可以做 server,bind 到某個(gè)地址等待對(duì)方訪問(wèn)。
- 如果有多個(gè)PULL端同時(shí)連接到PUSH端,則PUSH端會(huì)在內(nèi)部做一個(gè)負(fù)載均衡,采用平均分配的算法,將所有消息均衡發(fā)布到PULL端上。
- 由三部分組成,Push進(jìn)行數(shù)據(jù)推送,work進(jìn)行數(shù)據(jù)緩存,Pull進(jìn)行數(shù)據(jù)競(jìng)爭(zhēng)獲取處理。
- 存在一個(gè)數(shù)據(jù)緩存和處理負(fù)載,當(dāng)連接被斷開,數(shù)據(jù)不會(huì)丟失,重連后數(shù)據(jù)繼續(xù)發(fā)送到對(duì)端。
ventilator 使用的是 SOCKET_PUSH,將任務(wù)分發(fā)到 Worker 節(jié)點(diǎn)上。Worker 節(jié)點(diǎn)上,使用 SOCKET_PULL 從上游接受任務(wù),并使用 SOCKET_PUSH 將結(jié)果匯集到 Sink。值得注意的是,任務(wù)的分發(fā)的時(shí)候也同樣有一個(gè)負(fù)載均衡的路由功能,worker 可以隨時(shí)自由加入,ventilator 可以均衡將任務(wù)分發(fā)出去。
Push/Pull模式還是蠻常用的,這里我們主要測(cè)試一下它的負(fù)載均衡。
5.2 Ventilator
# ventilator.py import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557") while True: socket.send(b"test") print("已發(fā)送") time.sleep(1)
5.3 worker
# worker.py import zmq context = zmq.Context() recive = context.socket(zmq.PULL) recive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558') while True: data = recive.recv() print("work1 正在轉(zhuǎn)發(fā)...") sender.send(data)
5.4 sink
# sink.py import zmq import sys context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558") while True: response = socket.recv() print("response: %s" % response)
打開4個(gè)Terminal,分別運(yùn)行
python sink.py python worker.py python worker.py python ventilator.py
六、總結(jié)
消息模型可以根據(jù)需要組合使用,后續(xù)的代理模式和路由模式等都是在三種基本模式上面的擴(kuò)展或變異。
到此這篇關(guān)于Python網(wǎng)絡(luò)編程之ZeroMQ知識(shí)總結(jié)的文章就介紹到這了,更多相關(guān)Python ZeroMQ知識(shí)總結(jié)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- python神經(jīng)網(wǎng)絡(luò)編程之手寫數(shù)字識(shí)別
- python Socket網(wǎng)絡(luò)編程實(shí)現(xiàn)C/S模式和P2P
- python神經(jīng)網(wǎng)絡(luò)編程實(shí)現(xiàn)手寫數(shù)字識(shí)別
- python網(wǎng)絡(luò)編程:socketserver的基本使用方法實(shí)例分析
- python網(wǎng)絡(luò)編程socket實(shí)現(xiàn)服務(wù)端、客戶端操作詳解
- Python網(wǎng)絡(luò)編程之使用TCP方式傳輸文件操作示例
- Python 網(wǎng)絡(luò)編程之UDP發(fā)送接收數(shù)據(jù)功能示例【基于socket套接字】
- python網(wǎng)絡(luò)編程之多線程同時(shí)接受和發(fā)送
- python socket網(wǎng)絡(luò)編程之粘包問(wèn)題詳解
- python 網(wǎng)絡(luò)編程要點(diǎn)總結(jié)
相關(guān)文章
18個(gè)幫你簡(jiǎn)化代碼的Python技巧分享
選擇學(xué)習(xí)?python?時(shí),最令我震驚的是它的簡(jiǎn)單性和可讀性。但是你知道還可以用更少的代碼行可以讓?Python?代碼變得更簡(jiǎn)單嗎?本文為大家總結(jié)了18個(gè)幫你簡(jiǎn)化代碼的Python技巧,感興趣的可以了解一下2022-07-07matplotlib實(shí)現(xiàn)顯示偽彩色圖像及色度條
今天小編就為大家分享一篇matplotlib實(shí)現(xiàn)顯示偽彩色圖像及色度條,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-12-12django 創(chuàng)建過(guò)濾器的實(shí)例詳解
這篇文章主要介紹了django 創(chuàng)建過(guò)濾器的實(shí)例詳解的相關(guān)資料,主要說(shuō)明django 創(chuàng)建過(guò)濾器來(lái)統(tǒng)一處理字符串,需要的朋友可以參考下2017-08-08conda換源安裝torch+vscode分布式訓(xùn)練調(diào)試的實(shí)現(xiàn)
本文主要介紹了conda換源安裝torch+vscode分布式訓(xùn)練調(diào)試的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06Python?虛擬環(huán)境遷移到其他電腦的實(shí)現(xiàn)
本文主要介紹了Python?虛擬環(huán)境遷移到其他電腦的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04詳解Python map函數(shù)及Python map()函數(shù)的用法
map() 會(huì)根據(jù)提供的函數(shù)對(duì)指定序列做映射。下面通過(guò)本文給大家介紹Python map函數(shù)及Python map()函數(shù)的用法,需要的朋友參考下吧2017-11-11OpenCV角點(diǎn)檢測(cè)的實(shí)現(xiàn)示例
角點(diǎn)通常被定義為兩條邊的交點(diǎn),本文主要介紹了OpenCV角點(diǎn)檢測(cè)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03解決python 3 urllib 沒(méi)有 urlencode 屬性的問(wèn)題
今天小編就為大家分享一篇解決python 3 urllib 沒(méi)有 urlencode 屬性的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-08-08