python grpc實(shí)現(xiàn)異步調(diào)用(不用grpc異步接口)
grpc同步調(diào)用更簡(jiǎn)單,但是在處理復(fù)雜任務(wù)時(shí),會(huì)導(dǎo)致請(qǐng)求阻塞,影響吞吐。當(dāng)然,可以采用grpc異步接口解決,本方采用另一種方式:服務(wù)端收到請(qǐng)求后放入請(qǐng)求隊(duì)列,由獨(dú)立的線程處理各個(gè)請(qǐng)求,之后調(diào)用客戶端的服務(wù),回復(fù)處理結(jié)果。即客戶端也是服務(wù)端。
以下DEMO實(shí)現(xiàn)的功能:
- 客戶端與服務(wù)端之間通過(guò)mmap tmpfs文件,實(shí)現(xiàn)圖像的傳輸
- 推理服務(wù)有Request和Response二個(gè)接口
- 服務(wù)端實(shí)現(xiàn)Request接口,客戶端實(shí)現(xiàn)Response接口,這二個(gè)接口只用于發(fā)送消息
- 服務(wù)端的消息處理線程處理完客戶端的請(qǐng)求之后,調(diào)用客戶端的Response接口
1.infer_session.proto
syntax = "proto3";
service Inference {
rpc Request (InferMessage) returns (Status) {} //服務(wù)端實(shí)現(xiàn)
rpc Response (InferMessage) returns (Status) {} //客戶端實(shí)現(xiàn)
}
message InferMessage {
int32 frame_id = 1; //幀號(hào)
int32 client_port=2; //客戶端端口
int32 shm_id=3; //共享內(nèi)存塊id
int32 width=4; //圖像寬度
int32 height=5; //圖像高度
int32 channels=6; //圖像通道數(shù)
string session_id=7; //會(huì)話uuid
}
message Status {
int32 status = 1; //狀態(tài)碼
string error_message=2; //錯(cuò)誤信息
}
2.生成Python庫(kù)函數(shù)
python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./infer_session.proto
3.infer_session_server.py
from concurrent import futures
import logging
import threading
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import queue
import traceback
import time
from common import SharedMemory,ThreadSafeDict
import numpy as np
class InferenceServer(infer_session_pb2_grpc.InferenceServicer):
def __init__(self) -> None:
super().__init__()
self.server=None
self.black_list=set()
def Request(self, request, context):
self.request_queue.put(request)
return infer_session_pb2.Status(status=0,error_message="")
def Open(self,port=50051):
self.process_running=True
self.bind_addr="localhost:{}".format(port)
self.client_session = ThreadSafeDict()
self.request_queue= queue.Queue()
self.process_thread = threading.Thread(target=self.Process)
self.process_thread.start()
self.service_ready_semaphore = threading.Semaphore(0)
self.server_thread = threading.Thread(target=self.Run)
self.server_thread.start()
self.service_ready_semaphore.acquire()
return True
def Run(self):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server)
self.server.add_insecure_port(self.bind_addr)
self.server.start()
print("Server started, listening on " + self.bind_addr)
self.service_ready_semaphore.release()
self.server.wait_for_termination()
def Process(self):
while self.process_running:
if not self.request_queue.empty():
request=self.request_queue.get(False,2)
if request.session_id in self.black_list:
if request.session_id in self.client_session:
del self.client_session[request.session_id]
continue
try:
if request.session_id not in self.client_session:
record={}
print("connect:",request.client_port)
record['channel']=grpc.insecure_channel("localhost:{}".format(request.client_port))
record['stub']=infer_session_pb2_grpc.InferenceStub(record['channel'])
grpc.channel_ready_future(record['channel']).result(timeout=5)
self.client_session[request.session_id]=record
shm=SharedMemory(request.width,request.height,request.channels,
request.client_port,request.shm_id)
data = np.ndarray((request.height,request.width,request.channels),
dtype=np.uint8, buffer=shm.get())
data+=1 #修改數(shù)據(jù)
shm.close()
ret=self.client_session[request.session_id]['stub'].Response(request,timeout=5)
if ret.status!=0:
print("Response Error:{} {}".format(ret.status,ret.error_message))
except:
traceback.print_exc()
self.black_list.add(request.session_id)
if request.session_id in self.client_session:
del self.client_session[request.session_id]
else:
time.sleep(0.001)
def Stop(self):
print("Stop")
self.server.stop(3)
self.process_running=False
self.process_thread.join()
self.server_thread.join()
if __name__ == "__main__":
logging.basicConfig()
server=InferenceServer()
server.Open()
input()
server.Stop()
4.infer_session_client.py
from __future__ import print_function
from concurrent import futures
import logging
import grpc
import infer_session_pb2
import infer_session_pb2_grpc
import threading
import numpy as np
import os
import queue
from common import SharedMemory
import time
import argparse
import uuid
class InferenceClient(infer_session_pb2_grpc.InferenceServicer):
def __init__(self) -> None:
super().__init__()
self.send_index=0
self.recv_index=None
self.uuid=uuid.uuid4()
print(self.uuid)
def Response(self, response, context):
request=self.busy_q.get()
pred_data = np.ndarray((request.height,request.width,request.channels),
dtype=np.uint8, buffer=request.get())
golden=np.ones(pred_data.shape,dtype=np.uint8)
golden.fill(response.frame_id+1)
result=(golden==pred_data).all()
if not result:
print("ID:{} ShmId:{} Pass:{}".format(response.frame_id,response.shm_id,result))
self.free_q.put(request)
self.recv_index=response.frame_id
return infer_session_pb2.Status(status=0,error_message="")
def WaitFinish(self):
while True:
if self.send_index==self.recv_index:
return
time.sleep(0.001)
def Open(self,client_port,width,height,channel,qsize,remote_addr="localhost:50051"):
try:
self.client_port=client_port
self.bind_addr="localhost:{}".format(client_port)
self.free_q= queue.Queue(qsize*2)
self.busy_q= queue.Queue(qsize*2)
for shm_id in range(qsize):
self.free_q.put(SharedMemory(width,height,channel,self.client_port,shm_id))
self.channel=grpc.insecure_channel(remote_addr)
grpc.channel_ready_future(self.channel).result(timeout=5)
self.stub = infer_session_pb2_grpc.InferenceStub(self.channel)
self.server_ready=False
self.service_ready_semaphore = threading.Semaphore(0)
self.server_thread = threading.Thread(target=self.Run)
self.server_thread.start()
self.service_ready_semaphore.acquire()
return self.server_ready
except:
return False
def Stop(self):
print("Stop")
self.server.stop(3)
self.server_thread.join()
def Request(self,frame_index):
request=self.free_q.get()
data = np.ndarray((request.height,request.width,request.channels),
dtype=np.uint8, buffer=request.get())
data.fill(frame_index)
response = self.stub.Request(infer_session_pb2.InferMessage(frame_id=frame_index,
client_port=self.client_port,
shm_id=request.shm_id,
width=request.width,
height=request.height,
channels=request.channels,
session_id="{}".format(self.uuid)
))
self.busy_q.put(request)
self.send_index=frame_index
return response.status==0
def Run(self):
try:
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=2))
infer_session_pb2_grpc.add_InferenceServicer_to_server(self, self.server)
self.server.add_insecure_port(self.bind_addr)
self.server.start()
self.server_ready=True
print("Server started, listening on " + self.bind_addr)
self.service_ready_semaphore.release()
self.server.wait_for_termination()
except:
self.service_ready_semaphore.release()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Demo of argparse")
parser.add_argument('--port', type=int, default=50000)
parser.add_argument('--remote_addr', type=str, default="localhost:50051")
args = parser.parse_args()
logging.basicConfig()
client=InferenceClient()
client.Open(client_port=args.port,width=320,height=240,channel=1,qsize=10,remote_addr=args.remote_addr)
while True:
t0=time.time()
count=128
for i in range(count):
client.Request(i)
client.WaitFinish()
t1=time.time()
print("{} FPS:{:.3f}".format(args.port,count/(t1-t0)))
client.Stop()
5.common.py
import mmap
import numpy as np
import os
import threading
# 定義一個(gè)SharedMemory類,用于在共享內(nèi)存中讀取和寫入數(shù)據(jù)
class SharedMemory(object):
def __init__(self,width,height,channels,port,shm_id) -> None:
self.width=width
self.height=height
self.channels=channels
self.shm_id=shm_id
self.filepath="/sys/fs/cgroup/{}_{}".format(port,shm_id)
self.size=width*height*channels
if not os.path.exists(self.filepath):
os.mknod(self.filepath)
self.fd=os.open(self.filepath,os.O_RDWR|os.O_CREAT)
os.ftruncate(self.fd,self.size)
else:
self.fd=os.open(self.filepath,os.O_RDWR)
self.mm=mmap.mmap(self.fd,self.size,access=mmap.ACCESS_WRITE)
self.mm.seek(0)
# 獲取共享內(nèi)存中的數(shù)據(jù)
def get(self):
return self.mm
# 關(guān)閉共享內(nèi)存
def close(self):
self.mm.close()
os.close(self.fd)
# 定義一個(gè)ThreadSafeDict類,用于在多線程中安全地操作字典
class ThreadSafeDict:
def __init__(self, initial_dict=None):
self._dict = {} if initial_dict is None else initial_dict.copy()
self.lock = threading.Lock()
# 獲取字典中的值
def __getitem__(self, key):
with self.lock:
return self._dict[key]
# 設(shè)置字典中的值
def __setitem__(self, key, value):
with self.lock:
self._dict[key] = value
# 刪除字典中的值
def __delitem__(self, key):
with self.lock:
del self._dict[key]
# 檢查字典中是否存在某個(gè)鍵
def __contains__(self, item):
with self.lock:
return item in self._dict
# 獲取字典中的值,如果不存在則返回默認(rèn)值
def get(self, key, default=None):
with self.lock:
return self._dict.get(key, default)
# 設(shè)置字典中的值,如果鍵已經(jīng)存在則不改變值
def setdefault(self, key, default):
with self.lock:
return self._dict.setdefault(key, default)
# 更新字典
def update(self, other_dict):
with self.lock:
self._dict.update(other_dict)
6.運(yùn)行
python3 infer_session_server.py & python3 infer_session_client.py --port 50001
7.輸出
50001 FPS:2296.293
50001 FPS:2222.019
50001 FPS:2347.274
50001 FPS:2124.001
到此這篇關(guān)于python grpc實(shí)現(xiàn)異步調(diào)用(不用grpc異步接口)的文章就介紹到這了,更多相關(guān)python grpc異步調(diào)用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
通過(guò)python下載FTP上的文件夾的實(shí)現(xiàn)代碼
使用python下載FTP上的文件夾的代碼,有需要的朋友不妨看看2013-02-02
python shell根據(jù)ip獲取主機(jī)名代碼示例
這篇文章主要介紹了python shell根據(jù)ip獲取主機(jī)名代碼示例,涉及用socket模塊和shell中hostname命令獲取等相關(guān)內(nèi)容,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
python實(shí)現(xiàn)飛船游戲的縱向移動(dòng)
這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)飛船游戲的縱向移動(dòng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-04-04
30分鐘搭建Python的Flask框架并在上面編寫第一個(gè)應(yīng)用
這篇文章主要介紹了如何搭建Python的Flask框架并在上面編寫一個(gè)簡(jiǎn)單的登錄模版應(yīng)用,代碼數(shù)量少、充分體現(xiàn)了Flask框架的輕量與開發(fā)高效的特點(diǎn),需要的朋友可以參考下2015-03-03
python人工智能使用RepVgg實(shí)現(xiàn)圖像分類示例詳解
這篇文章主要介紹了python人工智能使用RepVgg實(shí)現(xiàn)圖像分類示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10
Python實(shí)現(xiàn)Smtplib發(fā)送帶有各種附件的郵件實(shí)例
本篇文章主要介紹了Python實(shí)現(xiàn)Smtplib發(fā)送帶有各種附件的郵件實(shí)例,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06
Python判斷dict中key是否存在的3種方法實(shí)例
大家在學(xué)會(huì)python中的字典,會(huì)發(fā)現(xiàn),字典中是沒(méi)有特殊順序的,但是都存儲(chǔ)在一個(gè)特定的key下面,下面這篇文章主要給大家介紹了關(guān)于Python判斷dict中key是否存在的3種方法,需要的朋友可以參考下2022-04-04
python詞云庫(kù)wordCloud使用方法詳解(解決中文亂碼)
這篇文章主要介紹了python詞云庫(kù)wordCloud使用方法詳解(解決中文亂碼),需要的朋友可以參考下2020-02-02

