gRPC中攔截器的使用詳解
前言
本次主要介紹在gRPC中使用攔截器,包括一元攔截器和流式攔截器,在攔截器中添加JWT認(rèn)證,客戶端登錄之后會(huì)獲得token,請(qǐng)求特定的API時(shí)候需要帶上token才能訪問(wèn)。由于代碼中我們使用了grpc-gateway提供http服務(wù),因此需要安裝gateway的一些依賴
在本文中就簡(jiǎn)單介紹一下攔截器的使用,RPC請(qǐng)求分為一元RPC請(qǐng)求和流式RPC請(qǐng)求,所謂一元RPC指的就是請(qǐng)求和響應(yīng)都是一次完成的,gRPC是基于HTTP2.0的,因此,一元RPC就可以看成客戶端請(qǐng)求一次,服務(wù)端就響應(yīng)一次。而流式RPC則是像流一樣多次進(jìn)行響應(yīng)或者請(qǐng)求傳輸?shù)摹?/p>
相應(yīng)地,攔截器分為服務(wù)端攔截和客戶端攔截器,根據(jù)功能的不同又分為一元攔截器和流式攔截器。
服務(wù)端攔截器
1、一元攔截器:UnaryInterceptor
源碼中寫得比較清楚了, UnaryServerInterceptor 提供了一個(gè)鉤子來(lái)攔截服務(wù)器上一元RPC的執(zhí)行。 info 參數(shù)包含了這個(gè)RPC攔截器能操作的所有信息, handler 是服務(wù)方法實(shí)現(xiàn)的一個(gè)包裝器,用于供攔截器中調(diào)用來(lái)處理RPC請(qǐng)求的邏輯。
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the // server. Only one unary interceptor can be installed. The construction of multiple // interceptors (e.g., chaining) can be implemented at the caller. func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.unaryInt != nil { panic("The unary server interceptor was already set and may not be reset.") } o.unaryInt = i }) }
主要看其中包含的參數(shù) UnaryServerInterceptor :
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info // contains all the information of this RPC the interceptor can operate on. And handler // is the wrapper of the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC. type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中幾個(gè)參數(shù)解釋為:
- ctx context.Context:請(qǐng)求上下文
- req interface{}:RPC 方法的請(qǐng)求參數(shù)
- info *UnaryServerInfo:包含了RPC 方法的所有信息
- handler UnaryHandler:RPC 方法真正執(zhí)行邏輯
2、流式攔截器:StreamInterceptor
StreamServerInterceptor 提供了一個(gè)鉤子來(lái)攔截服務(wù)器上流式RPC的執(zhí)行。 info 包含攔截器可以操作的RPC的所有信息。 handler 是服務(wù)方法實(shí)現(xiàn)。攔截器負(fù)責(zé)調(diào)用 handler 來(lái)完成RPC。
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the // server. Only one stream interceptor can be installed. func StreamInterceptor(i StreamServerInterceptor) ServerOption { return newFuncServerOption(func(o *serverOptions) { if o.streamInt != nil { panic("The stream server interceptor was already set and may not be reset.") } o.streamInt = i }) }
StreamServerInterceptor:
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC //on the server.info contains all the information of this RPC the interceptor can operate //on. And handler is the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC. type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
3、實(shí)現(xiàn)服務(wù)端攔截器
下面我們進(jìn)行簡(jiǎn)單的實(shí)現(xiàn):
// 一元攔截器 func Unary() grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Print("---------> the unaryServerInterceptor: ", info.FullMethod) err := interceptor.authorize(ctx, info.FullMethod)//實(shí)現(xiàn)攔截驗(yàn)證的邏輯,自行實(shí)現(xiàn),我這里是截取的完整代碼中的,也可以參考我的gitee上的代碼 if err != nil { return nil, err } return handler(ctx, req) } } //stream攔截器 func Stream() grpc.StreamServerInterceptor { return func( srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Print("--------->the streamServerInterceptor: ", info.FullMethod) err := interceptor.authorize(stream.Context(), info.FullMethod)//實(shí)現(xiàn)攔截驗(yàn)證的邏輯,自行實(shí)現(xiàn),我這里是截取的完整代碼中的,也可以參考我的gitee上的代碼 if err != nil { return err } return handler(srv, stream) } }
將上面實(shí)現(xiàn)的攔截器加入到Server中即可:
serverOptions := []grpc.ServerOption{ grpc.UnaryInterceptor(Unary()), grpc.StreamInterceptor(Stream()), } grpcServer := grpc.NewServer(serverOptions...)
客戶端攔截器
1、一元攔截器:WithUnaryInterceptor
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for // unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.unaryInt = f }) }
UnaryClientInterceptor
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. // Unary interceptors can be specified as a DialOption, using // WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a // ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC // delegates all unary RPC invocations to the interceptor, and it is the // responsibility of the interceptor to call invoker to complete the processing // of the RPC. // // method is the RPC name. req and reply are the corresponding request and // response messages. cc is the ClientConn on which the RPC was invoked. invoker // is the handler to complete the RPC and it is the responsibility of the // interceptor to call it. opts contain all applicable call options, including // defaults from the ClientConn as well as per-call options. // // The returned error must be compatible with the status package. type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
2、流式攔截器:WithStreamInterceptor
// WithStreamInterceptor returns a DialOption that specifies the interceptor for // streaming RPCs. func WithStreamInterceptor(f StreamClientInterceptor) DialOption { return newFuncDialOption(func(o *dialOptions) { o.streamInt = f }) }
StreamClientInterceptor 攔截客戶端流 ClientStream 的創(chuàng)建,流式攔截器可以指定為一個(gè) Dial 選項(xiàng)。當(dāng)創(chuàng)建一個(gè)客戶端連接時(shí),使用 WithStreamInterceptor() 或者 WithChainStreamInterceptor() 。當(dāng)一個(gè)流攔截器被設(shè)置在客戶端連接中的時(shí)候,gRPC將所有流的創(chuàng)建都交給攔截器,攔截器調(diào)用streamer。
// StreamClientInterceptor intercepts the creation of a ClientStream. Stream // interceptors can be specified as a DialOption, using WithStreamInterceptor() // or WithChainStreamInterceptor(), when creating a ClientConn. When a stream // interceptor(s) is set on the ClientConn, gRPC delegates all stream creations // to the interceptor, and it is the responsibility of the interceptor to call // streamer. // // desc contains a description of the stream. cc is the ClientConn on which the // RPC was invoked. streamer is the handler to create a ClientStream and it is // the responsibility of the interceptor to call it. opts contain all applicable // call options, including defaults from the ClientConn as well as per-call // options. // // StreamClientInterceptor may return a custom ClientStream to intercept all I/O // operations. The returned error must be compatible with the status package. type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
參數(shù)解釋:
- ctx context.Context是請(qǐng)求上下文
- desc *StreamDesc包含了流中描述的信息
- cc *ClientConn是調(diào)用RPC的客戶端連接
- method string是請(qǐng)求的方法名
- streamer Streamer是一個(gè)創(chuàng)建客戶端流的處理器,流式攔截器中需要調(diào)用它。
- opts ...CallOption包含了所有適用呼叫選項(xiàng),包括來(lái)自于客戶端連接的默認(rèn)選項(xiàng)和所有的呼叫。
3、實(shí)現(xiàn)客戶端攔截器
下面是具體實(shí)現(xiàn):
func Unary() grpc.UnaryClientInterceptor { return func( ctx context.Context, method string, req, reply interface{}, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, //回調(diào)函數(shù) opts ...grpc.CallOption, ) error { log.Printf("-------> unary interceptor: %s", method) if authMethods[method] { //如果是攔截的方法,在調(diào)用實(shí)際的rpc方法之前將token添加到context中,這個(gè)存儲(chǔ)需要攔截的方法 return invoker(attachToken(ctx), method, req, reply, conn, opts...) //attachToken為自己實(shí)現(xiàn)的客戶端攔截請(qǐng)求之后附加token的方法 } return invoker(ctx, method, req, reply, conn, opts...) } } // Stream returns a client interceptor to authenticate stream RPC func Stream() grpc.StreamClientInterceptor { return func( ctx context.Context, desc *grpc.StreamDesc, conn *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption, ) (grpc.ClientStream, error) { log.Printf("-------> stream interceptor: %s", method) if interceptor.authMethods[method] { return streamer(attachToken(ctx), desc, conn, method, opts...) } return streamer(ctx, desc, conn, method, opts...) } }
使用也比較簡(jiǎn)單,在Dial中添加即可:
conn2, err := grpc.Dial( Address, //地址 transportOption, //SSL/TLS證書選擇 grpc.WithUnaryInterceptor(Unary()), grpc.WithStreamInterceptor(Stream())) if err != nil { log.Fatal("cannot dial server: ", err) }
到此這篇關(guān)于gRPC中攔截器的使用詳解的文章就介紹到這了,更多相關(guān)gRPC中的攔截器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解在Go語(yǔ)言單元測(cè)試中如何解決文件依賴問(wèn)題
現(xiàn)如今的?Web?應(yīng)用程序往往采用?RESTful?API?接口形式對(duì)外提供服務(wù),后端接口直接向前端返回?HTML?文件的情況越來(lái)越少,所以在程序中操作文件的場(chǎng)景也變少了,在編寫單元測(cè)試時(shí),文件就成了被測(cè)試代碼的外部依賴,本文就來(lái)講解下測(cè)試過(guò)程中如何解決文件外部依賴問(wèn)題2023-08-08詳解如何在golang項(xiàng)目開(kāi)發(fā)中創(chuàng)建自己的Module
既然我們使用了很多開(kāi)源的 module為我們的日常開(kāi)發(fā)提供了很多的便捷性,那我們?cè)撊绾螌?shí)現(xiàn)自己的 module 來(lái)提供給團(tuán)隊(duì)中使用,接下小編就給大家介紹一下在golang項(xiàng)目開(kāi)發(fā)如何創(chuàng)建自己的Module,需要的朋友可以參考下2023-09-09xorm根據(jù)數(shù)據(jù)庫(kù)生成go model文件的操作
這篇文章主要介紹了xorm根據(jù)數(shù)據(jù)庫(kù)生成go model文件的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12Golang時(shí)間處理庫(kù)go-carbon?v2.2.13發(fā)布細(xì)則
這篇文章主要為大家介紹了Golang?時(shí)間處理庫(kù)go-carbon?v2.2.13發(fā)布細(xì)則,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11解決Go中攔截HTTP流數(shù)據(jù)時(shí)字段丟失的問(wèn)題
在開(kāi)發(fā)高并發(fā)的Web應(yīng)用時(shí),尤其是在處理HTTP代理和流數(shù)據(jù)攔截的場(chǎng)景下,遇到數(shù)據(jù)丟失的問(wèn)題并不罕見(jiàn),最近,在一個(gè)項(xiàng)目中,我遇到了一個(gè)棘手的問(wèn)題:在攔截并轉(zhuǎn)發(fā)HTTP流數(shù)據(jù)的過(guò)程中,某些數(shù)據(jù)字段因?yàn)樘幚磉^(guò)快而被丟失,所以本文給大家介紹如何解決這個(gè)問(wèn)題2024-08-08golang 實(shí)現(xiàn)Location跳轉(zhuǎn)方式
這篇文章主要介紹了golang 實(shí)現(xiàn)Location跳轉(zhuǎn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05