Python使用gRPC實現(xiàn)數(shù)據(jù)分析能力的共享
gRPC是一個高性能、開源、通用的遠程過程調用(RPC)框架,由Google推出。
它基于HTTP/2協(xié)議標準設計開發(fā),默認采用Protocol Buffers數(shù)據(jù)序列化協(xié)議,支持多種開發(fā)語言。
在gRPC中,客戶端可以像調用本地對象一樣直接調用另一臺不同的機器上服務端應用的方法,使得您能夠更容易地創(chuàng)建分布式應用和服務。
gRPC支持多種語言,并提供了豐富的接口和庫,以及簡單易用的API,方便開發(fā)者進行快速開發(fā)和部署。
同時,gRPC的底層框架處理了所有強制嚴格的服務契約、數(shù)據(jù)序列化、網絡通訊、服務認證、訪問控制、服務觀測等等通常有關聯(lián)的復雜性,使得開發(fā)者可以更加專注于業(yè)務邏輯的實現(xiàn)。
1. 為什么用 gRPC
我平時用的最多的語言其實是golang,但是,做數(shù)據(jù)分析相關的項目,不太可能繞開python那些優(yōu)秀的庫。
于是,就想把數(shù)據(jù)分析的核心部分用python來實現(xiàn),并用gRPC接口的方式提供出來。
其他的業(yè)務部分,仍然用原先的語言來實現(xiàn)。
gRPC相比于http REST,性能和安全上更加有保障,而且對主流的開發(fā)語言都支持的很好,不用擔心與其他語言開發(fā)的業(yè)務系統(tǒng)對接的問題。
最后,gRPC雖然接口的定義和實現(xiàn)比http REST更復雜,但是,它提供了方便的命令行工具,可以根據(jù)protocol buf的定義自動生成對應語言的類型定義,以及stub相關的代碼等等。
實際開發(fā)時,一般只要關注接口的定義和業(yè)務功能的實現(xiàn)即可,gRPC框架需要的代碼可以通過命令行工具生成。
2. 安裝
對于Python語言,安裝gRPC框架本身和對應的命令行工具即可:
$ pip install grpcio # gRPC框架 $ pip install grpcio-tools # gRPC命令行工具
3. 開發(fā)步驟
開發(fā)一個gPRC接口一般分為4個步驟:
- 使用
[protocal buf](https://protobuf.dev/overview)定義服務接口 - 通過命令行生成
client和server的模板代碼 - 實現(xiàn)server端代碼(具體業(yè)務功能)
- 實現(xiàn)client端代碼(具體業(yè)務功能)
下面通過一個示例演示gRPC接口的開發(fā)步驟。
這個示例來自最近做量化分析時的一個指標(MACD)的實現(xiàn),為了簡化示例,下面實現(xiàn)MACD指標的業(yè)務功能部分是虛擬的,不是實際的計算方法。
3.1. 定義服務接口
接口主要定義方法,參數(shù),返回值。
syntax = "proto3";
package idc;
// 定義服務,也就是對外提供的功能
service Indicator {
rpc GetMACD(MACDRequest) returns (MACDReply) {}
}
// 請求的參數(shù)
message MACDRequest {
string start_date = 1; // 交易開始時間
string end_date = 2; // 交易結束時間
}
// 返回值中每個對象的詳細內容
message MACDData {
string date = 1; // 交易時間
float open = 2; // 開盤價
float close = 3; // 收盤價
float high = 4; // 最高價
float low = 5; // 最低價
float macd = 6; // macd指標值
}
// 返回的內容,是一個數(shù)組
message MACDReply {
repeated MACDData macd = 1;
}
3.2. 生成模板代碼
在grpc_sample目錄下,執(zhí)行命令:
python -m grpc_tools.protoc -I./protos --python_out=. --pyi_out=. --grpc_python_out=. ./protos/indicator.proto
生成后文件結構如下:

生成了3個文件:
indicator_pb2.py:proto文件定義的消息類indicator_pb2_grpc.py:服務端和客戶端的模板代碼indicator_pb2.pyi:不是必須的,為了能讓mypy等工具校驗代碼類型是否正確
3.3. server端代碼
通過繼承indicator_pb2_grpc.py文件中的服務類,實現(xiàn)服務端功能。
# -*- coding: utf-8 -*-
from concurrent import futures
import grpc
import indicator_pb2
import indicator_pb2_grpc
class Indicator(indicator_pb2_grpc.IndicatorServicer):
def GetMACD(self, request, context):
macd = []
for i in range(1, 5):
data = indicator_pb2.MACDData(
date=request.start_date,
open=i * 1.1,
close=i * 2.1,
high=i * 3.1,
low=i * 0.1,
macd=i * 2.5,
)
macd.append(data)
return indicator_pb2.MACDReply(macd=macd)
def serve():
port = "50051"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
indicator_pb2_grpc.add_IndicatorServicer_to_server(Indicator(), server)
server.add_insecure_port("[::]:" + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if __name__ == "__main__":
serve()
服務端需要實現(xiàn)proto文件中定義接口的具體業(yè)務功能。
3.4. client端代碼
使用indicator_pb2_grpc.py文件中的Stub來調用服務端的代碼。
# -*- coding: utf-8 -*-
import grpc
import indicator_pb2
import indicator_pb2_grpc
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = indicator_pb2_grpc.IndicatorStub(channel)
response = stub.GetMACD(
indicator_pb2.MACDRequest(
start_date="2023-01-01",
end_date="2023-12-31",
)
)
print("indicator client received: ")
print(response)
if __name__ == "__main__":
run()
3.5. 運行效果
加入客戶端和服務端代碼后,最后的文件結構如下:

測試時,先啟動服務:
$ python.exe .\idc_server.py Server started, listening on 50051
然后啟動客戶端看效果:
$ python.exe .\idc_client.py
indicator client received:
macd {
date: "2023-01-01"
open: 1.1
close: 2.1
high: 3.1
low: 0.1
macd: 2.5
}
macd {
date: "2023-01-01"
open: 2.2
close: 4.2
high: 6.2
low: 0.2
macd: 5
}
macd {
date: "2023-01-01"
open: 3.3
close: 6.3
high: 9.3
low: 0.3
macd: 7.5
}
macd {
date: "2023-01-01"
open: 4.4
close: 8.4
high: 12.4
low: 0.4
macd: 10
}
4. 傳輸文件/圖片
除了上面的返回列表數(shù)據(jù)的接口比較常用以外,我用的比較多的還有一種接口就是返回圖片。
將使用python的matplotlib等庫生成的分析結果圖片提供給其他系統(tǒng)使用。
開發(fā)的步驟和上面是一樣的。
4.1. 定義服務接口
定義文件相關的服務接口,文件的部分需要加上stream關鍵字,也就是流式數(shù)據(jù)。
syntax = "proto3";
package idc;
// 定義服務,也就是對外提供的功能
service IndicatorGraph {
rpc GetMACDGraph(MACDGraphRequest) returns (stream MACDGraphReply) {}
}
// 請求的參數(shù)
message MACDGraphRequest {
string start_date = 1; // 交易開始時間
string end_date = 2; // 交易結束時間
}
// 返回的內容,是一個圖片
message MACDGraphReply {
bytes macd_chunk = 1;
}
注意,定義服務接口GetMACDGraph時,返回值MACDGraphReply前面加上stream關鍵字。
返回的文件內容是 bytes 二進制類型。
4.2. 生成模板代碼
執(zhí)行命令:
python -m grpc_tools.protoc -I./protos --python_out=. --pyi_out=. --grpc_python_out=. ./protos/indicator_graph.proto
生成3個文件:
- indicator_graph_pb2.py
- indicator_graph_pb2.pyi
- indicator_graph_pb2_grpc.py
4.3. server端代碼
首先,生成一個MACD指標的圖片(macd.png)。

然后,服務端的代碼主要就是按塊讀取這個文件并返回。
import grpc
import indicator_graph_pb2
import indicator_graph_pb2_grpc
class IndicatorGraph(indicator_graph_pb2_grpc.IndicatorGraphServicer):
def GetMACDGraph(self, request, context):
chunk_size = 1024
with open("./macd.png", mode="rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
return
response = indicator_graph_pb2.MACDGraphReply(macd_chunk=chunk)
yield response
4.4. client端代碼
客戶端的代碼也要相應修改,不再是一次性接受請求的結果,而是循環(huán)接受,直至結束。
import grpc
import indicator_graph_pb2
import indicator_graph_pb2_grpc
def run():
with grpc.insecure_channel("localhost:50051") as channel:
stub = indicator_graph_pb2_grpc.IndicatorGraphStub(channel)
print("indicator client received: ")
with open("./received_macd.png", mode="wb") as f:
for response in stub.GetMACDGraph(
indicator_graph_pb2.MACDGraphRequest(
start_date="2023-01-01",
end_date="2023-12-31",
)
):
f.write(response.macd_chunk)
客戶端接收完成后,圖片保存在 received_macd.png 中。
實際執(zhí)行后,圖片可以正常保存并顯示。
5. 回顧
本篇是最近用gPRC封裝python數(shù)據(jù)分析相關業(yè)務過程中一些簡單的總結。
這里沒有對gPRC做系統(tǒng)的介紹,它的官方文檔已經非常完善,而且文檔中針對主流編程語言的示例也都有。
本篇筆記中的兩個示例雖然簡單,卻是我用的最多的兩種情況:
一種是返回對象數(shù)組:是為了將pandas,numpy等庫處理后的數(shù)據(jù)返回出來供其他系統(tǒng)使用;
一種是返回文件/圖片:是為了將matplotlib,seaborn等庫生成的分析圖片返回出來供其他系統(tǒng)使用。
目前gPRC對我最大的好處是,它提供了一種穩(wěn)定可靠的,將python強大的數(shù)據(jù)分析能力結合到其他系統(tǒng)中的能力。
到此這篇關于Python使用gRPC實現(xiàn)數(shù)據(jù)分析能力的共享的文章就介紹到這了,更多相關Python gRPC數(shù)據(jù)分析內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
python在linux環(huán)境下安裝skimage的示例代碼
這篇文章主要介紹了python在linux環(huán)境下安裝skimage,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-10-10
解決python中畫圖時x,y軸名稱出現(xiàn)中文亂碼的問題
今天小編就為大家分享一篇解決python中畫圖時x,y軸名稱出現(xiàn)中文亂碼的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-01-01
在Debian下配置Python+Django+Nginx+uWSGI+MySQL的教程
這篇文章主要介紹了在Debian下配置Python+Django+Nginx+uWSGI+MySQL的教程,Debian系統(tǒng)和Nginx服務器皆是高性能的選擇,需要的朋友可以參考下2015-04-04
pydantic-resolve嵌套數(shù)據(jù)結構生成LoaderDepend管理contextvars
這篇文章主要為大家介紹了pydantic-resolve解決嵌套數(shù)據(jù)結構生成LoaderDepend管理contextvars的使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪<BR>2023-04-04
Python?SQLAlchemy插入日期時間時區(qū)詳解
SQLAlchemy是一個功能強大且流行的?Python?庫,它提供了一種靈活有效的與數(shù)據(jù)庫交互的方式,在本文中,我們將了解SQLAlchemy如何更新日期、時間和時區(qū)并將其插入數(shù)據(jù)庫,感興趣的可以了解下2023-09-09

