python grpc實現(xiàn)異步調(diào)用(不用grpc異步接口)
grpc同步調(diào)用更簡單,但是在處理復(fù)雜任務(wù)時,會導(dǎo)致請求阻塞,影響吞吐。當(dāng)然,可以采用grpc異步接口解決,本方采用另一種方式:服務(wù)端收到請求后放入請求隊列,由獨立的線程處理各個請求,之后調(diào)用客戶端的服務(wù),回復(fù)處理結(jié)果。即客戶端也是服務(wù)端。
以下DEMO實現(xiàn)的功能:
- 客戶端與服務(wù)端之間通過mmap tmpfs文件,實現(xiàn)圖像的傳輸
- 推理服務(wù)有Request和Response二個接口
- 服務(wù)端實現(xiàn)Request接口,客戶端實現(xiàn)Response接口,這二個接口只用于發(fā)送消息
- 服務(wù)端的消息處理線程處理完客戶端的請求之后,調(diào)用客戶端的Response接口
1.infer_session.proto
syntax = "proto3"; service Inference { rpc Request (InferMessage) returns (Status) {} //服務(wù)端實現(xiàn) rpc Response (InferMessage) returns (Status) {} //客戶端實現(xiàn) } message InferMessage { int32 frame_id = 1; //幀號 int32 client_port=2; //客戶端端口 int32 shm_id=3; //共享內(nèi)存塊id int32 width=4; //圖像寬度 int32 height=5; //圖像高度 int32 channels=6; //圖像通道數(shù) string session_id=7; //會話uuid } message Status { int32 status = 1; //狀態(tài)碼 string error_message=2; //錯誤信息 }
2.生成Python庫函數(shù)
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./infer_session.proto
3.infer_session_server.py
from concurrent import futures import logging import threading import grpc import infer_session_pb2 import infer_session_pb2_grpc import queue import traceback import time from common import SharedMemory,ThreadSafeDict import numpy as np class InferenceServer(infer_session_pb2_grpc.InferenceServicer): def __init__(self) -> None: super().__init__() self.server=None self.black_list=set() def Request(self, request, context): self.request_queue.put(request) return infer_session_pb2.Status(status=0,error_message="") def Open(self,port=50051): self.process_running=True self.bind_addr="localhost:{}".format(port) self.client_session = ThreadSafeDict() self.request_queue= queue.Queue() self.process_thread = threading.Thread(target=self.Process) self.process_thread.start() self.service_ready_semaphore = threading.Semaphore(0) self.server_thread = threading.Thread(target=self.Run) self.server_thread.start() self.service_ready_semaphore.acquire() return True def Run(self): self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server) self.server.add_insecure_port(self.bind_addr) self.server.start() print("Server started, listening on " + self.bind_addr) self.service_ready_semaphore.release() self.server.wait_for_termination() def Process(self): while self.process_running: if not self.request_queue.empty(): request=self.request_queue.get(False,2) if request.session_id in self.black_list: if request.session_id in self.client_session: del self.client_session[request.session_id] continue try: if request.session_id not in self.client_session: record={} print("connect:",request.client_port) record['channel']=grpc.insecure_channel("localhost:{}".format(request.client_port)) record['stub']=infer_session_pb2_grpc.InferenceStub(record['channel']) grpc.channel_ready_future(record['channel']).result(timeout=5) self.client_session[request.session_id]=record shm=SharedMemory(request.width,request.height,request.channels, request.client_port,request.shm_id) data = np.ndarray((request.height,request.width,request.channels), dtype=np.uint8, buffer=shm.get()) data+=1 #修改數(shù)據(jù) shm.close() ret=self.client_session[request.session_id]['stub'].Response(request,timeout=5) if ret.status!=0: print("Response Error:{} {}".format(ret.status,ret.error_message)) except: traceback.print_exc() self.black_list.add(request.session_id) if request.session_id in self.client_session: del self.client_session[request.session_id] else: time.sleep(0.001) def Stop(self): print("Stop") self.server.stop(3) self.process_running=False self.process_thread.join() self.server_thread.join() if __name__ == "__main__": logging.basicConfig() server=InferenceServer() server.Open() input() server.Stop()
4.infer_session_client.py
from __future__ import print_function from concurrent import futures import logging import grpc import infer_session_pb2 import infer_session_pb2_grpc import threading import numpy as np import os import queue from common import SharedMemory import time import argparse import uuid class InferenceClient(infer_session_pb2_grpc.InferenceServicer): def __init__(self) -> None: super().__init__() self.send_index=0 self.recv_index=None self.uuid=uuid.uuid4() print(self.uuid) def Response(self, response, context): request=self.busy_q.get() pred_data = np.ndarray((request.height,request.width,request.channels), dtype=np.uint8, buffer=request.get()) golden=np.ones(pred_data.shape,dtype=np.uint8) golden.fill(response.frame_id+1) result=(golden==pred_data).all() if not result: print("ID:{} ShmId:{} Pass:{}".format(response.frame_id,response.shm_id,result)) self.free_q.put(request) self.recv_index=response.frame_id return infer_session_pb2.Status(status=0,error_message="") def WaitFinish(self): while True: if self.send_index==self.recv_index: return time.sleep(0.001) def Open(self,client_port,width,height,channel,qsize,remote_addr="localhost:50051"): try: self.client_port=client_port self.bind_addr="localhost:{}".format(client_port) self.free_q= queue.Queue(qsize*2) self.busy_q= queue.Queue(qsize*2) for shm_id in range(qsize): self.free_q.put(SharedMemory(width,height,channel,self.client_port,shm_id)) self.channel=grpc.insecure_channel(remote_addr) grpc.channel_ready_future(self.channel).result(timeout=5) self.stub = infer_session_pb2_grpc.InferenceStub(self.channel) self.server_ready=False self.service_ready_semaphore = threading.Semaphore(0) self.server_thread = threading.Thread(target=self.Run) self.server_thread.start() self.service_ready_semaphore.acquire() return self.server_ready except: return False def Stop(self): print("Stop") self.server.stop(3) self.server_thread.join() def Request(self,frame_index): request=self.free_q.get() data = np.ndarray((request.height,request.width,request.channels), dtype=np.uint8, buffer=request.get()) data.fill(frame_index) response = self.stub.Request(infer_session_pb2.InferMessage(frame_id=frame_index, client_port=self.client_port, shm_id=request.shm_id, width=request.width, height=request.height, channels=request.channels, session_id="{}".format(self.uuid) )) self.busy_q.put(request) self.send_index=frame_index return response.status==0 def Run(self): try: self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server) self.server.add_insecure_port(self.bind_addr) self.server.start() self.server_ready=True print("Server started, listening on " + self.bind_addr) self.service_ready_semaphore.release() self.server.wait_for_termination() except: self.service_ready_semaphore.release() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Demo of argparse") parser.add_argument('--port', type=int, default=50000) parser.add_argument('--remote_addr', type=str, default="localhost:50051") args = parser.parse_args() logging.basicConfig() client=InferenceClient() client.Open(client_port=args.port,width=320,height=240,channel=1,qsize=10,remote_addr=args.remote_addr) while True: t0=time.time() count=128 for i in range(count): client.Request(i) client.WaitFinish() t1=time.time() print("{} FPS:{:.3f}".format(args.port,count/(t1-t0))) client.Stop()
5.common.py
import mmap import numpy as np import os import threading # 定義一個SharedMemory類,用于在共享內(nèi)存中讀取和寫入數(shù)據(jù) class SharedMemory(object): def __init__(self,width,height,channels,port,shm_id) -> None: self.width=width self.height=height self.channels=channels self.shm_id=shm_id self.filepath="/sys/fs/cgroup/{}_{}".format(port,shm_id) self.size=width*height*channels if not os.path.exists(self.filepath): os.mknod(self.filepath) self.fd=os.open(self.filepath,os.O_RDWR|os.O_CREAT) os.ftruncate(self.fd,self.size) else: self.fd=os.open(self.filepath,os.O_RDWR) self.mm=mmap.mmap(self.fd,self.size,access=mmap.ACCESS_WRITE) self.mm.seek(0) # 獲取共享內(nèi)存中的數(shù)據(jù) def get(self): return self.mm # 關(guān)閉共享內(nèi)存 def close(self): self.mm.close() os.close(self.fd) # 定義一個ThreadSafeDict類,用于在多線程中安全地操作字典 class ThreadSafeDict: def __init__(self, initial_dict=None): self._dict = {} if initial_dict is None else initial_dict.copy() self.lock = threading.Lock() # 獲取字典中的值 def __getitem__(self, key): with self.lock: return self._dict[key] # 設(shè)置字典中的值 def __setitem__(self, key, value): with self.lock: self._dict[key] = value # 刪除字典中的值 def __delitem__(self, key): with self.lock: del self._dict[key] # 檢查字典中是否存在某個鍵 def __contains__(self, item): with self.lock: return item in self._dict # 獲取字典中的值,如果不存在則返回默認(rèn)值 def get(self, key, default=None): with self.lock: return self._dict.get(key, default) # 設(shè)置字典中的值,如果鍵已經(jīng)存在則不改變值 def setdefault(self, key, default): with self.lock: return self._dict.setdefault(key, default) # 更新字典 def update(self, other_dict): with self.lock: self._dict.update(other_dict)
6.運行
python3 infer_session_server.py & python3 infer_session_client.py --port 50001
7.輸出
50001 FPS:2296.293
50001 FPS:2222.019
50001 FPS:2347.274
50001 FPS:2124.001
到此這篇關(guān)于python grpc實現(xiàn)異步調(diào)用(不用grpc異步接口)的文章就介紹到這了,更多相關(guān)python grpc異步調(diào)用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
30分鐘搭建Python的Flask框架并在上面編寫第一個應(yīng)用
這篇文章主要介紹了如何搭建Python的Flask框架并在上面編寫一個簡單的登錄模版應(yīng)用,代碼數(shù)量少、充分體現(xiàn)了Flask框架的輕量與開發(fā)高效的特點,需要的朋友可以參考下2015-03-03python人工智能使用RepVgg實現(xiàn)圖像分類示例詳解
這篇文章主要介紹了python人工智能使用RepVgg實現(xiàn)圖像分類示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10Python實現(xiàn)Smtplib發(fā)送帶有各種附件的郵件實例
本篇文章主要介紹了Python實現(xiàn)Smtplib發(fā)送帶有各種附件的郵件實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-06-06python詞云庫wordCloud使用方法詳解(解決中文亂碼)
這篇文章主要介紹了python詞云庫wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下2020-02-02