go實(shí)現(xiàn)grpc四種數(shù)據(jù)流模式
1. 什么是數(shù)據(jù)流
grpc中的stream,srteam顧名思義就是一種流,可以源源不斷的推送數(shù)據(jù),很適合傳輸一些大數(shù)據(jù),或者服務(wù)端和客戶(hù)端長(zhǎng)時(shí)間數(shù)據(jù)交互,比如客戶(hù)端可以向服務(wù)端訂閱一個(gè)數(shù)據(jù),服務(wù)端就可以利用stream,源源不斷地推送數(shù)據(jù)。
底層還原成socket編程
2. grpc的四種數(shù)據(jù)流
1.簡(jiǎn)單模式
2.服務(wù)端數(shù)據(jù)流模式(Server-side streaming RPC)
3.客戶(hù)端數(shù)據(jù)流模式(Client-side streaming RPC)
4.雙向數(shù)據(jù)流模式(Bidirectional streaming RPC)
2.1 簡(jiǎn)單模式
這種模式最為傳統(tǒng),即客戶(hù)端發(fā)起一次請(qǐng)求,服務(wù)端響應(yīng)一個(gè)數(shù)據(jù),這和大家平時(shí)熟悉的RPC沒(méi)有什么大的區(qū)別,上兩篇中介紹此模式。
2.2 服務(wù)端數(shù)據(jù)流模式
這種模式是客戶(hù)端發(fā)起一次請(qǐng)求,服務(wù)端返回一段連續(xù)的數(shù)據(jù)流。典型的例子是客戶(hù)端向服務(wù)端發(fā)送一個(gè)股票代碼,服務(wù)端就把該股票的實(shí)時(shí)數(shù)據(jù)源源不斷的返回給客戶(hù)端
2.3 客戶(hù)端數(shù)據(jù)流模式
與服務(wù)端數(shù)據(jù)流模式相反,這次是客戶(hù)端源源不斷的向服務(wù)端發(fā)送數(shù)據(jù)流,而在發(fā)送結(jié)束后,由服務(wù)端返回一個(gè)響應(yīng)。典型的例子是物聯(lián)網(wǎng)終端向服務(wù)器報(bào)送數(shù)據(jù)。
2.4 雙向數(shù)據(jù)流
顧名思義,這是客戶(hù)端和服務(wù)端都可以向?qū)Ψ桨l(fā)送數(shù)據(jù)流,這個(gè)時(shí)候雙方的數(shù)據(jù)可以同時(shí)互相發(fā)送,也就是可以實(shí)現(xiàn)實(shí)時(shí)交互。典型的例子是聊天機(jī)器人。
3. 上代碼
3.1 代碼目錄
3.2 編寫(xiě)stream.proto文件
stream是常量,寫(xiě)在哪一邊,哪一邊就是數(shù)據(jù)流
syntax = "proto3"; option go_package = "./;proto"; service Greeter { // 定義方法,stream是常量,流模式 rpc ServerStream (StreamRequestData) returns (stream StreamResponseData); //服務(wù)端流模式,拉消息 rpc ClientStream (stream StreamRequestData) returns (StreamResponseData); //客戶(hù)端流模式,推消息 rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData); //雙向流模式,能推能拉 } message StreamRequestData { string data = 1; //編號(hào) } message StreamResponseData { string data = 1; //編號(hào) }
生成go的protobuf文件命令:
cd到proto目錄下
命令:protoc -I . hello.proto --go_out=plugins=grpc:.
3.3 編寫(xiě)server文件
package main import ( "file_test/grpc_go_stream/proto" "fmt" "net" "sync" "time" "google.golang.org/grpc" ) const port = 8082 type server struct{} func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error { i := 0 for { i++ //業(yè)務(wù)代碼 _ = res.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("這是發(fā)給%s的數(shù)據(jù)流", req.Data), }) time.Sleep(time.Second * 1) if i > 10 { break } } return nil } func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error { for { //業(yè)務(wù)代碼 res, err := cliStr.Recv() if err != nil { fmt.Println("本次客戶(hù)端流數(shù)據(jù)發(fā)送完了:",err) break } fmt.Println("客戶(hù)端發(fā)來(lái)消息:",res.Data) } return nil } func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error { wg:=sync.WaitGroup{} wg.Add(2) //接受客戶(hù)端消息的協(xié)程 go func() { defer wg.Done() for { //業(yè)務(wù)代碼 res, err := allStr.Recv() if err != nil { fmt.Println("本次客戶(hù)端流數(shù)據(jù)發(fā)送完了:",err) break } fmt.Println("收到客戶(hù)端發(fā)來(lái)消息:",res.Data) } }() //發(fā)送消息給客戶(hù)端的協(xié)程 go func() { defer wg.Done() i := 0 for { i++ //業(yè)務(wù)代碼 _ = allStr.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("這是發(fā)給客戶(hù)端的數(shù)據(jù)流"), }) time.Sleep(time.Second * 1) if i > 10 { break } } }() wg.Wait() return nil } // 啟動(dòng) func start() { // 1.實(shí)例化server g := grpc.NewServer() // 2.注冊(cè)邏輯到server中 proto.RegisterGreeterServer(g, &server{}) // 3.啟動(dòng)server lis, err := net.Listen("tcp", "127.0.0.1:8082") if err != nil { panic("監(jiān)聽(tīng)錯(cuò)誤:" + err.Error()) } err = g.Serve(lis) if err != nil { panic("啟動(dòng)錯(cuò)誤:" + err.Error()) } } func main() { start() }
3.4 編寫(xiě)client文件
package main import ( "context" "file_test/grpc_go_stream/proto" "fmt" "sync" "time" "google.golang.org/grpc" ) var rpc proto.GreeterClient func serverStreamDemo() { //服務(wù)端流模式 res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"}) if err != nil { panic("rpc請(qǐng)求錯(cuò)誤:"+err.Error()) } for { data,err:=res.Recv() // if err != nil { fmt.Println("客戶(hù)端發(fā)送完了:",err) return } fmt.Println("客戶(hù)端返回?cái)?shù)據(jù)流值:",data.Data) } } func clientStreamDemo() { //客戶(hù)端流模式 cliStr, err := rpc.ClientStream(context.Background()) if err != nil { panic("rpc請(qǐng)求錯(cuò)誤:" + err.Error()) } i := 0 for { i++ _ = cliStr.Send(&proto.StreamRequestData{ Data: "jeff", }) time.Sleep(time.Second * 1) if i > 10 { break } } } func clientAndServerStreamDemo() { //雙向流模式 allStr, _ := rpc.AllStream(context.Background()) wg := sync.WaitGroup{} wg.Add(1) //接受服務(wù)端消息的協(xié)程 go func() { defer wg.Done() for { //業(yè)務(wù)代碼 res, err := allStr.Recv() if err != nil { fmt.Println("本次服務(wù)端流數(shù)據(jù)發(fā)送完了:", err) break } fmt.Println("收到服務(wù)端發(fā)來(lái)消息:", res.Data) } }() //發(fā)送消息給服務(wù)端的協(xié)程 go func() { defer wg.Done() i := 0 for { i++ //業(yè)務(wù)代碼 _ = allStr.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("這是發(fā)給服務(wù)端的數(shù)據(jù)流"), }) time.Sleep(time.Second * 1) if i > 10 { break } } }() wg.Wait() } // 啟動(dòng) func start() { conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure()) if err != nil { panic("rpc連接錯(cuò)誤:" + err.Error()) } defer conn.Close() rpc = proto.NewGreeterClient(conn) //初始化 serverStreamDemo() //服務(wù)端流模式 clientStreamDemo() //客戶(hù)端流模式 clientAndServerStreamDemo() // 雙向流模式 } func main() { start() }
以上就是go實(shí)現(xiàn)grpc四種數(shù)據(jù)流模式的詳細(xì)內(nèi)容,更多關(guān)于go實(shí)現(xiàn)grpc流模式的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語(yǔ)言線(xiàn)程安全之互斥鎖與讀寫(xiě)鎖
這篇文章主要介紹了Go語(yǔ)言線(xiàn)程安全之互斥鎖與讀寫(xiě)鎖,互斥鎖是為了并發(fā)的安全,在多個(gè)goroutine共同工作的時(shí)候,對(duì)于共享的數(shù)據(jù)十分不安全,而讀寫(xiě)鎖效率革命,使用鎖的時(shí)候,安全與效率往往需要互相轉(zhuǎn)換,下文詳細(xì)內(nèi)容,需要的小伙伴可以參考一下2022-02-02Go?WEB框架使用攔截器驗(yàn)證用戶(hù)登錄狀態(tài)實(shí)現(xiàn)
這篇文章主要為大家介紹了Go?WEB框架使用攔截器驗(yàn)證用戶(hù)登錄狀態(tài)實(shí)現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07基于Go+OpenCV實(shí)現(xiàn)人臉識(shí)別功能的詳細(xì)示例
OpenCV是一個(gè)強(qiáng)大的計(jì)算機(jī)視覺(jué)庫(kù),提供了豐富的圖像處理和計(jì)算機(jī)視覺(jué)算法,本文將向你介紹在Mac上安裝OpenCV的步驟,并演示如何使用Go的OpenCV綁定庫(kù)進(jìn)行人臉識(shí)別,需要的朋友可以參考下2023-07-07Go語(yǔ)言中常用語(yǔ)法編寫(xiě)與優(yōu)化技巧小結(jié)
為了充分利用?Go?的潛力,我們需要了解如何優(yōu)化?Go?程序,本文將介紹一些常見(jiàn)的?Go?語(yǔ)言?xún)?yōu)化技巧,并通過(guò)實(shí)際例子進(jìn)行說(shuō)明,希望對(duì)大家有所幫助2024-02-02golang sql語(yǔ)句超時(shí)控制方案及原理
一般應(yīng)用程序在執(zhí)行一條sql語(yǔ)句時(shí),都會(huì)給這條sql設(shè)置一個(gè)超時(shí)時(shí)間,本文主要介紹了golang sql語(yǔ)句超時(shí)控制方案及原理,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12Go語(yǔ)言使用protojson庫(kù)實(shí)現(xiàn)Protocol Buffers與JSON轉(zhuǎn)換
本文主要介紹Google開(kāi)源的工具庫(kù)Protojson庫(kù)如何Protocol Buffers與JSON進(jìn)行轉(zhuǎn)換,以及和標(biāo)準(zhǔn)庫(kù)encoding/json的性能對(duì)比,需要的朋友可以參考下2023-09-09Go 代碼規(guī)范錯(cuò)誤處理示例經(jīng)驗(yàn)總結(jié)
這篇文章主要為大家介紹了Go 代碼規(guī)范錯(cuò)誤處理示例實(shí)戰(zhàn)經(jīng)驗(yàn)總結(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08