gRPC中攔截器的使用詳解
前言
本次主要介紹在gRPC中使用攔截器,包括一元攔截器和流式攔截器,在攔截器中添加JWT認(rèn)證,客戶端登錄之后會獲得token,請求特定的API時候需要帶上token才能訪問。由于代碼中我們使用了grpc-gateway提供http服務(wù),因此需要安裝gateway的一些依賴
在本文中就簡單介紹一下攔截器的使用,RPC請求分為一元RPC請求和流式RPC請求,所謂一元RPC指的就是請求和響應(yīng)都是一次完成的,gRPC是基于HTTP2.0的,因此,一元RPC就可以看成客戶端請求一次,服務(wù)端就響應(yīng)一次。而流式RPC則是像流一樣多次進(jìn)行響應(yīng)或者請求傳輸?shù)摹?/p>
相應(yīng)地,攔截器分為服務(wù)端攔截和客戶端攔截器,根據(jù)功能的不同又分為一元攔截器和流式攔截器。
服務(wù)端攔截器
1、一元攔截器:UnaryInterceptor
源碼中寫得比較清楚了, UnaryServerInterceptor 提供了一個鉤子來攔截服務(wù)器上一元RPC的執(zhí)行。 info 參數(shù)包含了這個RPC攔截器能操作的所有信息, handler 是服務(wù)方法實現(xiàn)的一個包裝器,用于供攔截器中調(diào)用來處理RPC請求的邏輯。
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
// server. Only one unary interceptor can be installed. The construction of multiple
// interceptors (e.g., chaining) can be implemented at the caller.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
})
}主要看其中包含的參數(shù) UnaryServerInterceptor :
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler // is the wrapper of the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)其中幾個參數(shù)解釋為:
- ctx context.Context:請求上下文
- req interface{}:RPC 方法的請求參數(shù)
- info *UnaryServerInfo:包含了RPC 方法的所有信息
- handler UnaryHandler:RPC 方法真正執(zhí)行邏輯
2、流式攔截器:StreamInterceptor
StreamServerInterceptor 提供了一個鉤子來攔截服務(wù)器上流式RPC的執(zhí)行。 info 包含攔截器可以操作的RPC的所有信息。 handler 是服務(wù)方法實現(xiàn)。攔截器負(fù)責(zé)調(diào)用 handler 來完成RPC。
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
// server. Only one stream interceptor can be installed.
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.streamInt != nil {
panic("The stream server interceptor was already set and may not be reset.")
}
o.streamInt = i
})
}StreamServerInterceptor:
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC //on the server.info contains all the information of this RPC the interceptor can operate //on. And handler is the service method implementation. It is the responsibility of the // interceptor to invoke handler to complete the RPC.
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error3、實現(xiàn)服務(wù)端攔截器
下面我們進(jìn)行簡單的實現(xiàn):
// 一元攔截器
func Unary() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
log.Print("---------> the unaryServerInterceptor: ", info.FullMethod)
err := interceptor.authorize(ctx, info.FullMethod)//實現(xiàn)攔截驗證的邏輯,自行實現(xiàn),我這里是截取的完整代碼中的,也可以參考我的gitee上的代碼
if err != nil {
return nil, err
}
return handler(ctx, req)
}
}
//stream攔截器
func Stream() grpc.StreamServerInterceptor {
return func(
srv interface{},
stream grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler) error {
log.Print("--------->the streamServerInterceptor: ", info.FullMethod)
err := interceptor.authorize(stream.Context(), info.FullMethod)//實現(xiàn)攔截驗證的邏輯,自行實現(xiàn),我這里是截取的完整代碼中的,也可以參考我的gitee上的代碼
if err != nil {
return err
}
return handler(srv, stream)
}
}將上面實現(xiàn)的攔截器加入到Server中即可:
serverOptions := []grpc.ServerOption{
grpc.UnaryInterceptor(Unary()),
grpc.StreamInterceptor(Stream()),
}
grpcServer := grpc.NewServer(serverOptions...)客戶端攔截器
1、一元攔截器:WithUnaryInterceptor
// WithUnaryInterceptor returns a DialOption that specifies the interceptor for
// unary RPCs.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.unaryInt = f
})
}UnaryClientInterceptor
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client.
// Unary interceptors can be specified as a DialOption, using
// WithUnaryInterceptor() or WithChainUnaryInterceptor(), when creating a
// ClientConn. When a unary interceptor(s) is set on a ClientConn, gRPC
// delegates all unary RPC invocations to the interceptor, and it is the
// responsibility of the interceptor to call invoker to complete the processing
// of the RPC.
//
// method is the RPC name. req and reply are the corresponding request and
// response messages. cc is the ClientConn on which the RPC was invoked. invoker
// is the handler to complete the RPC and it is the responsibility of the
// interceptor to call it. opts contain all applicable call options, including
// defaults from the ClientConn as well as per-call options.
//
// The returned error must be compatible with the status package.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error2、流式攔截器:WithStreamInterceptor
// WithStreamInterceptor returns a DialOption that specifies the interceptor for
// streaming RPCs.
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.streamInt = f
})
}StreamClientInterceptor 攔截客戶端流 ClientStream 的創(chuàng)建,流式攔截器可以指定為一個 Dial 選項。當(dāng)創(chuàng)建一個客戶端連接時,使用 WithStreamInterceptor() 或者 WithChainStreamInterceptor() 。當(dāng)一個流攔截器被設(shè)置在客戶端連接中的時候,gRPC將所有流的創(chuàng)建都交給攔截器,攔截器調(diào)用streamer。
// StreamClientInterceptor intercepts the creation of a ClientStream. Stream // interceptors can be specified as a DialOption, using WithStreamInterceptor() // or WithChainStreamInterceptor(), when creating a ClientConn. When a stream // interceptor(s) is set on the ClientConn, gRPC delegates all stream creations // to the interceptor, and it is the responsibility of the interceptor to call // streamer. // // desc contains a description of the stream. cc is the ClientConn on which the // RPC was invoked. streamer is the handler to create a ClientStream and it is // the responsibility of the interceptor to call it. opts contain all applicable // call options, including defaults from the ClientConn as well as per-call // options. // // StreamClientInterceptor may return a custom ClientStream to intercept all I/O // operations. The returned error must be compatible with the status package. type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
參數(shù)解釋:
- ctx context.Context是請求上下文
- desc *StreamDesc包含了流中描述的信息
- cc *ClientConn是調(diào)用RPC的客戶端連接
- method string是請求的方法名
- streamer Streamer是一個創(chuàng)建客戶端流的處理器,流式攔截器中需要調(diào)用它。
- opts ...CallOption包含了所有適用呼叫選項,包括來自于客戶端連接的默認(rèn)選項和所有的呼叫。
3、實現(xiàn)客戶端攔截器
下面是具體實現(xiàn):
func Unary() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
conn *grpc.ClientConn,
invoker grpc.UnaryInvoker, //回調(diào)函數(shù)
opts ...grpc.CallOption,
) error {
log.Printf("-------> unary interceptor: %s", method)
if authMethods[method] { //如果是攔截的方法,在調(diào)用實際的rpc方法之前將token添加到context中,這個存儲需要攔截的方法
return invoker(attachToken(ctx), method, req, reply, conn, opts...) //attachToken為自己實現(xiàn)的客戶端攔截請求之后附加token的方法
}
return invoker(ctx, method, req, reply, conn, opts...)
}
}
// Stream returns a client interceptor to authenticate stream RPC
func Stream() grpc.StreamClientInterceptor {
return func(
ctx context.Context,
desc *grpc.StreamDesc,
conn *grpc.ClientConn,
method string,
streamer grpc.Streamer,
opts ...grpc.CallOption,
) (grpc.ClientStream, error) {
log.Printf("-------> stream interceptor: %s", method)
if interceptor.authMethods[method] {
return streamer(attachToken(ctx), desc, conn, method, opts...)
}
return streamer(ctx, desc, conn, method, opts...)
}
}使用也比較簡單,在Dial中添加即可:
conn2, err := grpc.Dial(
Address, //地址
transportOption, //SSL/TLS證書選擇
grpc.WithUnaryInterceptor(Unary()),
grpc.WithStreamInterceptor(Stream()))
if err != nil {
log.Fatal("cannot dial server: ", err)
}到此這篇關(guān)于gRPC中攔截器的使用詳解的文章就介紹到這了,更多相關(guān)gRPC中的攔截器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解如何在golang項目開發(fā)中創(chuàng)建自己的Module
既然我們使用了很多開源的 module為我們的日常開發(fā)提供了很多的便捷性,那我們該如何實現(xiàn)自己的 module 來提供給團(tuán)隊中使用,接下小編就給大家介紹一下在golang項目開發(fā)如何創(chuàng)建自己的Module,需要的朋友可以參考下2023-09-09
xorm根據(jù)數(shù)據(jù)庫生成go model文件的操作
這篇文章主要介紹了xorm根據(jù)數(shù)據(jù)庫生成go model文件的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
Golang時間處理庫go-carbon?v2.2.13發(fā)布細(xì)則
這篇文章主要為大家介紹了Golang?時間處理庫go-carbon?v2.2.13發(fā)布細(xì)則,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11
解決Go中攔截HTTP流數(shù)據(jù)時字段丟失的問題
在開發(fā)高并發(fā)的Web應(yīng)用時,尤其是在處理HTTP代理和流數(shù)據(jù)攔截的場景下,遇到數(shù)據(jù)丟失的問題并不罕見,最近,在一個項目中,我遇到了一個棘手的問題:在攔截并轉(zhuǎn)發(fā)HTTP流數(shù)據(jù)的過程中,某些數(shù)據(jù)字段因為處理過快而被丟失,所以本文給大家介紹如何解決這個問題2024-08-08
golang 實現(xiàn)Location跳轉(zhuǎn)方式
這篇文章主要介紹了golang 實現(xiàn)Location跳轉(zhuǎn)方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05

