Python?gRPC流式通信協(xié)議詳細講解
ProtoBuf 協(xié)議
gRPC使用 protocol buffer 協(xié)議做為接口描述語言(IDL) ,來定義接口及傳遞的消息數(shù)據(jù)類型。
gRPC的message 與 service 均須在protobuf 文件中定義好,才能進行編譯。 該文件須按protobuf 語法編寫, 也比較簡單,當前只須學習gRPC用到的部分。
如下例:
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}說明:
- syntax = “proto3” protobuf的版本號
- package tutorial 主要是java用,python是用文件名做為module 名,可以不需要定義
- Service : 就是定義1個 gRPC service, 在service代碼塊內(nèi),定義rpc 方法,指定request, response類型。 gRPC支持4種rpc方法
service interface_name {
rpc api_name( request ) returns ( response );
}
Message: 相當于接口函數(shù)的參數(shù)類型與返回值類型 ,需要分開定義
詳細 protobuf 使用教程,請參考菜鳥教程tutorialspoint 的教程 https://www.tutorialspoint.com/protobuf/index.htm
gRPC 4種通信模式介紹
1. 單向RPC
gRPC的術(shù)語為unary RPC,這種方式與函數(shù)調(diào)用類似, client 發(fā)送1條請求,server回1條響應。
rpc SayHello(HelloRequest) returns (HelloResponse);
2. 服務器流式處理 RPC
客戶端向服務器發(fā)送1條請求,服務器以回應多條響應消息,客戶機從返回的流中讀取數(shù)據(jù),直至沒有更多消息。 這時要在響應類型前加1個 stream關(guān)鍵字修飾。
rpc LotsOfReplies(HelloRequest) returns (stream HelloResponse);
3. 客戶端流式處理 RPC
由客戶端寫入一系列消息并將其發(fā)送到服務。 客戶端完成消息寫入后,它將等待服務器讀取消息并返回其響應. 這種模式,要在request的類型前加 stream 修飾。
rpc LotsOfGreetings(stream HelloRequest) returns (HelloResponse);
4. 雙向流式處理 RPC
其中雙方使用讀寫流發(fā)送一系列消息。這兩個流獨立運行,因此客戶端和服務器可以按照它們喜歡的任何順序進行讀取和寫入:例如,服務器可以等待接收所有客戶端消息,然后再寫入響應,或者它可以交替讀取消息然后寫入消息,或者讀取和寫入的某種其他組合。將保留每個流中消息的順序。此模式下, request 與 response類型均需要用stream修飾。
rpc BidiHello(stream HelloRequest) returns (stream HelloResponse);
說明
- 流式處理中,服務器A向客戶機B發(fā)送數(shù)據(jù),必須是在protobuf 中 rpc方法中Response message包含的數(shù)據(jù)類型。而不是任意發(fā)送數(shù)據(jù),程序會報錯。不限制的是響應中可以回1條或多條消息。
- 如果需要A向B發(fā)送請求,則需要定義新的protobuf 文件,并編寫新的server, client python代碼,這時,兩臺機器的角色是倒過來,B為server, A為client。仍然符合gRPC的原則。
- 如果一定需要在1條連接中,雙方互發(fā)請求,socket 模塊的低階API接口函數(shù)編程可以滿足要求,但必須注意,管理socket雙向通信必須小心翼翼,否則會造成混亂,
流程處理實現(xiàn)過程
1. 用protobuf 定義接口
下面以實例說明: users.proto ,
syntax = "proto3";
package users;
message User {
string username = 1;
uint32 user_id = 2;
}
message CreateUserRequest {
string username = 1;
string password = 2;
string email = 3;
}
message CreateUserResult {
User user = 1;
}
message GetUsersRequest {
repeated User user = 1;
}
service Users {
rpc CreateUser (users.CreateUserRequest) returns (users.CreateUserResult);
rpc GetUsers (users.GetUsersRequest) returns (stream users.GetUsersResult);
}
message GetUsersResult {
User user = 1;
}2. 根據(jù).protobuf文件生成客戶方與服務方代碼
首先要安裝 grpcio-tools package:
pip install grpcio-tools
進入proto文件所在目錄,執(zhí)行如下命令
python -m grpc_tools.protoc
\ --proto_path=.
\ --python_out=.
\ --grpc_python_out=.
\ proto文件名
參數(shù)說明
- proto_path=proto文件路徑
- python_out=編譯生成的文件的路徑
- grpc_python_out=編譯生成的接口文件路徑
- ./route_guide.proto 是要編譯的協(xié)議文件
本例 :
python -m grpc_tools.protoc --proto_path=. --python_out=. --grpc_python_out=. users.proto
生成的文件有兩個: Users_pb2.py 與 Users_pb2_grpc.py,
3. 服務器端代碼
from concurrent import futures
import time
import grpc
import users_pb2_grpc as users_service
import users_pb2 as users_messages
_ONE_DAY_IN_SECONDS = 60 * 60 * 24
class UsersService(users_service.UsersServicer):
def CreateUser(self, request, context):
metadata = dict(context.invocation_metadata())
print(metadata)
user = users_messages.User(username=request.username, user_id=1)
return users_messages.CreateUserResult(user=user)
def GetUsers(self, request, context):
for user in request.user:
user = users_messages.User(
username=user.username, user_id=user.user_id
)
yield users_messages.GetUsersResult(user=user)
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
users_service.add_UsersServicer_to_server(UsersService(), server)
server.add_insecure_port('0.0.0.0:50051')
server.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
4. 客戶端側(cè)代碼
import sys
import grpc
import users_pb2_grpc as users_service
import users_pb2 as users_messages
def run():
channel = grpc.insecure_channel('localhost:50051')
try:
grpc.channel_ready_future(channel).result(timeout=10)
except grpc.FutureTimeoutError:
sys.exit('Error connecting to server')
else:
stub = users_service.UsersStub(channel)
metadata = [('ip', '127.0.0.1')]
response = stub.CreateUser(
users_messages.CreateUserRequest(username='tom'),
metadata=metadata,
)
if response:
print("User created:", response.user.username)
request = users_messages.GetUsersRequest(
user=[users_messages.User(username="alexa", user_id=1),
users_messages.User(username="christie", user_id=1)]
)
response = stub.GetUsers(request)
for resp in response:
print(resp)
if __name__ == '__main__':
run()5. 測試代碼
打開兩個終端窗口,分別運行g(shù)rpc_server.py, grpc_client.py
可以看到client.py 窗口顯示
(enva) D:\workplace\python\enva\test1>py grpc_client.py
User created: tom
user {
username: "alexa"
user_id: 1
}
user {
username: "christie"
user_id: 1
}服務器窗口同時顯示
(enva) D:\workplace\python\enva\test1>py grpc_server.py
{'user-agent': 'grpc-python/1.50.0 grpc-c/28.0.0 (windows; chttp2)', 'ip': '127.0.0.1'}
學習小記
流式處理編程,其實比較簡單,只是流式處理一方要構(gòu)建多條mesage,接口方法會自動逐條發(fā)送,接收側(cè)也只須遍歷讀取即可。流式處理用來發(fā)送大文件,如圖片,視頻之類,比REST有明顯優(yōu)勢,而且有規(guī)范接口,也便于團隊合作。
到此這篇關(guān)于Python gRPC流式通信協(xié)議詳細講解的文章就介紹到這了,更多相關(guān)Python gRPC流式通信內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python實現(xiàn)Excel文件轉(zhuǎn)換為TXT文件
這篇文章主要為大家詳細介紹了python實現(xiàn)Excel文件轉(zhuǎn)換為TXT文件,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-04-04
Python編程之gui程序?qū)崿F(xiàn)簡單文件瀏覽器代碼
這篇文章主要介紹了Python編程之gui程序?qū)崿F(xiàn)簡單文件瀏覽器代碼,具有一定借鑒價值,需要的朋友可以了解下。2017-12-12
python實現(xiàn)將文件名批量命名為四位數(shù)or五位數(shù)
這篇文章主要介紹了python實現(xiàn)將文件名批量命名為四位數(shù)or五位數(shù)問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08

