Go?gRPC服務(wù)端流式RPC教程示例
前言
上一篇介紹了簡(jiǎn)單模式RPC,當(dāng)數(shù)據(jù)量大或者需要不斷傳輸數(shù)據(jù)時(shí)候,我們應(yīng)該使用流式RPC,它允許我們邊處理邊傳輸數(shù)據(jù)。本篇先介紹服務(wù)端流式RPC。
服務(wù)端流式RPC:客戶端發(fā)送請(qǐng)求到服務(wù)器,拿到一個(gè)流去讀取返回的消息序列。 客戶端讀取返回的流,直到里面沒有任何消息。
情景模擬:實(shí)時(shí)獲取股票走勢(shì)
1.客戶端要獲取某原油股的實(shí)時(shí)走勢(shì),客戶端發(fā)送一個(gè)請(qǐng)求
2.服務(wù)端實(shí)時(shí)返回該股票的走勢(shì)
新建proto文件
新建server_stream.proto文件
1.定義發(fā)送信息
// 定義發(fā)送請(qǐng)求信息
message SimpleRequest{
// 定義發(fā)送的參數(shù),采用駝峰命名方式,小寫加下劃線,如:student_name
// 請(qǐng)求參數(shù)
string data = 1;
}2.定義接收信息
// 定義流式響應(yīng)信息
message StreamResponse{
// 流式響應(yīng)數(shù)據(jù)
string stream_value = 1;
}3.定義服務(wù)方法ListValue
服務(wù)端流式rpc,只要在響應(yīng)數(shù)據(jù)前添加stream即可
// 定義我們的服務(wù)(可定義多個(gè)服務(wù),每個(gè)服務(wù)可定義多個(gè)接口)
service StreamServer{
// 服務(wù)端流式rpc,在響應(yīng)數(shù)據(jù)前添加stream
rpc ListValue(SimpleRequest)returns(stream StreamResponse){};
}4.編譯proto文件
進(jìn)入server_stream.proto所在目錄,運(yùn)行指令:
protoc --go_out=plugins=grpc:./ ./server_stream.proto
創(chuàng)建Server端
1.定義我們的服務(wù),并實(shí)現(xiàn)ListValue方法
// SimpleService 定義我們的服務(wù)
type StreamService struct{}
// ListValue 實(shí)現(xiàn)ListValue方法
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
for n := 0; n < 5; n++ {
// 向流中發(fā)送消息, 默認(rèn)每次send送消息最大長度為`math.MaxInt32`bytes
err := srv.Send(&pb.StreamResponse{
StreamValue: req.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
}
return nil
}初學(xué)者可能覺得比較迷惑,ListValue的參數(shù)和返回值是怎樣確定的。其實(shí)這些都是編譯proto時(shí)生成的.pb.go文件中有定義,我們只需要實(shí)現(xiàn)就可以了。
2.啟動(dòng)gRPC服務(wù)器
const (
// Address 監(jiān)聽地址
Address string = ":8000"
// Network 網(wǎng)絡(luò)通信協(xié)議
Network string = "tcp"
)
func main() {
// 監(jiān)聽本地端口
listener, err := net.Listen(Network, Address)
if err != nil {
log.Fatalf("net.Listen err: %v", err)
}
log.Println(Address + " net.Listing...")
// 新建gRPC服務(wù)器實(shí)例
// 默認(rèn)單次接收最大消息長度為`1024*1024*4`bytes(4M),單次發(fā)送消息最大長度為`math.MaxInt32`bytes
// grpcServer := grpc.NewServer(grpc.MaxRecvMsgSize(1024*1024*4), grpc.MaxSendMsgSize(math.MaxInt32))
grpcServer := grpc.NewServer()
// 在gRPC服務(wù)器注冊(cè)我們的服務(wù)
pb.RegisterStreamServerServer(grpcServer, &StreamService{})
//用服務(wù)器 Serve() 方法以及我們的端口信息區(qū)實(shí)現(xiàn)阻塞等待,直到進(jìn)程被殺死或者 Stop() 被調(diào)用
err = grpcServer.Serve(listener)
if err != nil {
log.Fatalf("grpcServer.Serve err: %v", err)
}
}運(yùn)行服務(wù)端
go run server.go :8000 net.Listing...
創(chuàng)建Client端
1.創(chuàng)建調(diào)用服務(wù)端ListValue方法
// listValue 調(diào)用服務(wù)端的ListValue方法
func listValue() {
// 創(chuàng)建發(fā)送結(jié)構(gòu)體
req := pb.SimpleRequest{
Data: "stream server grpc ",
}
// 調(diào)用我們的服務(wù)(ListValue方法)
stream, err := grpcClient.ListValue(context.Background(), &req)
if err != nil {
log.Fatalf("Call ListStr err: %v", err)
}
for {
//Recv() 方法接收服務(wù)端消息,默認(rèn)每次Recv()最大消息長度為`1024*1024*4`bytes(4M)
res, err := stream.Recv()
// 判斷消息流是否已經(jīng)結(jié)束
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// 打印返回值
log.Println(res.StreamValue)
}
}2.啟動(dòng)gRPC客戶端
// Address 連接地址
const Address string = ":8000"
var grpcClient pb.StreamServerClient
func main() {
// 連接服務(wù)器
conn, err := grpc.Dial(Address, grpc.WithInsecure())
if err != nil {
log.Fatalf("net.Connect err: %v", err)
}
defer conn.Close()
// 建立gRPC連接
grpcClient = pb.NewStreamServerClient(conn)
route()
listValue()
}運(yùn)行客戶端
go run client.go
stream server grpc 0
stream server grpc 1
stream server grpc 2
stream server grpc 3
stream server grpc 4
客戶端不斷從服務(wù)端獲取數(shù)據(jù)
思考
假如服務(wù)端不停發(fā)送數(shù)據(jù),類似獲取股票走勢(shì)實(shí)時(shí)數(shù)據(jù),客戶端能自己停止獲取數(shù)據(jù)嗎?
答案:可以的
1.我們把服務(wù)端的ListValue方法稍微修改
// ListValue 實(shí)現(xiàn)ListValue方法
func (s *StreamService) ListValue(req *pb.SimpleRequest, srv pb.StreamServer_ListValueServer) error {
for n := 0; n < 15; n++ {
// 向流中發(fā)送消息, 默認(rèn)每次send送消息最大長度為`math.MaxInt32`bytes
err := srv.Send(&pb.StreamResponse{
StreamValue: req.Data + strconv.Itoa(n),
})
if err != nil {
return err
}
log.Println(n)
time.Sleep(1 * time.Second)
}
return nil
}2.再把客戶端調(diào)用ListValue方法的實(shí)現(xiàn)稍作修改,就可以得到結(jié)果了
// listValue 調(diào)用服務(wù)端的ListValue方法
func listValue() {
// 創(chuàng)建發(fā)送結(jié)構(gòu)體
req := pb.SimpleRequest{
Data: "stream server grpc ",
}
// 調(diào)用我們的服務(wù)(Route方法)
// 同時(shí)傳入了一個(gè) context.Context ,在有需要時(shí)可以讓我們改變RPC的行為,比如超時(shí)/取消一個(gè)正在運(yùn)行的RPC
stream, err := grpcClient.ListValue(context.Background(), &req)
if err != nil {
log.Fatalf("Call ListStr err: %v", err)
}
for {
//Recv() 方法接收服務(wù)端消息,默認(rèn)每次Recv()最大消息長度為`1024*1024*4`bytes(4M)
res, err := stream.Recv()
// 判斷消息流是否已經(jīng)結(jié)束
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// 打印返回值
log.Println(res.StreamValue)
break
}
//可以使用CloseSend()關(guān)閉stream,這樣服務(wù)端就不會(huì)繼續(xù)產(chǎn)生流消息
//調(diào)用CloseSend()后,若繼續(xù)調(diào)用Recv(),會(huì)重新激活stream,接著之前結(jié)果獲取消息
stream.CloseSend()
}只需要調(diào)用CloseSend()方法,就可以關(guān)閉服務(wù)端的stream,讓它停止發(fā)送數(shù)據(jù)。值得注意的是,調(diào)用CloseSend()后,若繼續(xù)調(diào)用Recv(),會(huì)重新激活stream,接著當(dāng)前的結(jié)果繼續(xù)獲取消息。
這能完美解決客戶端暫停->繼續(xù)獲取數(shù)據(jù)的操作。
總結(jié)
本篇介紹了服務(wù)端流式RPC的簡(jiǎn)單使用,客戶端發(fā)起一個(gè)請(qǐng)求,服務(wù)端不停返回?cái)?shù)據(jù),直到服務(wù)端停止發(fā)送數(shù)據(jù)或客戶端主動(dòng)停止接收數(shù)據(jù)為止。下篇將介紹客戶端流式RPC。
教程源碼地址:https://github.com/Bingjian-Zhu/go-grpc-example
參考:gRPC官方文檔中文版
更多關(guān)于Go gRPC服務(wù)端流式RPC的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- golang實(shí)現(xiàn)簡(jiǎn)單的udp協(xié)議服務(wù)端與客戶端示例
- Golang實(shí)現(xiàn)的聊天程序服務(wù)端和客戶端代碼分享
- go語言實(shí)現(xiàn)http服務(wù)端與客戶端的例子
- go標(biāo)準(zhǔn)庫net/http服務(wù)端的實(shí)現(xiàn)示例
- Go語言實(shí)現(xiàn)服務(wù)端消息接收和發(fā)送
- Golang原生rpc(rpc服務(wù)端源碼解讀)
- go微服務(wù)PolarisMesh源碼解析服務(wù)端啟動(dòng)流程
- Go語言編寫一個(gè)簡(jiǎn)易聊天室服務(wù)端的實(shí)現(xiàn)
相關(guān)文章
Go?內(nèi)聯(lián)優(yōu)化讓程序員愛不釋手
這篇文章主要介紹了Go?內(nèi)聯(lián)優(yōu)化讓程序員愛不釋手,內(nèi)聯(lián)是在編譯過程中自動(dòng)進(jìn)行的一類基本優(yōu)化之一,文章圍繞主題展開更多詳細(xì)介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-06-06
一文帶你了解Go中跟蹤函數(shù)調(diào)用鏈的實(shí)現(xiàn)
這篇文章主要為大家詳細(xì)介紹了go如何實(shí)現(xiàn)一個(gè)自動(dòng)注入跟蹤代碼,并輸出有層次感的函數(shù)調(diào)用鏈跟蹤命令行工具,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-11-11
go-zero數(shù)據(jù)的流處理利器fx使用詳解
這篇文章主要為大家介紹了go-zero數(shù)據(jù)的流處理利器fx使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
Go Resiliency庫中timeout實(shí)現(xiàn)原理及源碼解析
Go-Resiliency庫中的timeout是一種基于協(xié)程的超時(shí)機(jī)制,通過創(chuàng)建協(xié)程來執(zhí)行任務(wù)并設(shè)置超時(shí)時(shí)間,若任務(wù)執(zhí)行時(shí)間超時(shí)則中止協(xié)程并返回錯(cuò)誤,需要詳細(xì)了解可以參考下文2023-05-05


