Java gRPC攔截器簡(jiǎn)單實(shí)現(xiàn)分布式日志鏈路追蹤器過(guò)程詳解
之前開(kāi)源過(guò)一個(gè)分布式日志鏈路追蹤的工具,其作用是規(guī)范日志格式,實(shí)現(xiàn)分布式日志層面的鏈路追蹤,并且工具支持SpringMVC,Dubbo,OpenFeign,HttpClient,OkHttp等網(wǎng)絡(luò)工具或RPC框架,基于此,為了擴(kuò)展日志鏈路追蹤使用場(chǎng)景,同時(shí)最近又在學(xué)習(xí)JAVA+gRPC,所以將該日志工具的鏈路追蹤能力擴(kuò)展了到gRPC場(chǎng)景。
跨進(jìn)程鏈路追蹤原理
想要實(shí)現(xiàn)跨進(jìn)程間的分布式鏈路追蹤,就要在發(fā)起遠(yuǎn)程調(diào)用的時(shí)候通過(guò)請(qǐng)求頭或者公共的自定義域?qū)㈡溌穮?shù)放進(jìn)去,然后服務(wù)端收到請(qǐng)求后將鏈路參數(shù)從請(qǐng)求頭或者自定義域中或取出來(lái),就這樣一層一層的將鏈路參數(shù)傳遞下去直至調(diào)用結(jié)束。
JAVA的gRPC庫(kù)io.grpc提供了在RPC調(diào)用中客戶端和服務(wù)端的攔截器(Interceptor),通過(guò)客戶端攔截器我們可以將鏈路追蹤的參數(shù)放到gRPC調(diào)用的Metadata中,通過(guò)服務(wù)端攔截器能夠從Metadata中獲取到鏈路追蹤所傳遞的參數(shù);io.grpc提供的客戶端攔截器和服務(wù)端攔截器分別是io.grpc.ClientInterceptor和io.grpc.ServerInterceptor。
代碼實(shí)現(xiàn)
maven依賴(lài)
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-server-spring-boot-starter</artifactId>
<version>${grpc.starter.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.devh</groupId>
<artifactId>grpc-client-spring-boot-starter</artifactId>
<version>${grpc.starter.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.github.redick01</groupId>
<artifactId>log-helper-spring-boot-starter-common</artifactId>
<version>1.0.3-RELEASE</version>
</dependency>
</dependencies>
攔截器實(shí)現(xiàn)
@Slf4j
@GrpcGlobalClientInterceptor
@GrpcGlobalServerInterceptor
public class GrpcInterceptor extends AbstractInterceptor implements ServerInterceptor, ClientInterceptor {
// 鏈路追蹤參數(shù)traceId
private static final Metadata.Key<String> TRACE = Metadata.Key.of("traceId", Metadata.ASCII_STRING_MARSHALLER)
// 鏈路追蹤參數(shù)spanId
private static final Metadata.Key<String> SPAN = Metadata.Key.of("spanId", Metadata.ASCII_STRING_MARSHALLER);
// 鏈路追蹤參數(shù)parentId
private static final Metadata.Key<String> PARENT = Metadata.Key.of("parentId", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions,
Channel channel) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(channel.newCall(methodDescriptor, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
// 客戶端傳遞鏈路追中數(shù)據(jù),將數(shù)據(jù)放到headers中
String traceId = traceId();
if (StringUtils.isNotBlank(traceId)) {
headers.put(TRACE, traceId);
headers.put(SPAN, spanId());
headers.put(PARENT, parentId());
}
// 繼續(xù)下一步
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
// 服務(wù)端傳遞回來(lái)的header
super.onHeaders(headers);
}
}, headers);
}
};
} finally {
stopWatch.stop();
log.info(LogUtil.marker(stopWatch.getTime()), "GRPC調(diào)用耗時(shí)");
}
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata headers, ServerCallHandler<ReqT, RespT> serverCallHandler) {
// 服務(wù)端從headers中獲取到鏈路追蹤參數(shù)
String traceId = headers.get(TRACE);
String spanId = headers.get(SPAN);
String parentId = headers.get(PARENT);
// 構(gòu)建當(dāng)前進(jìn)程的鏈路追蹤數(shù)據(jù)并體現(xiàn)在日志中
Tracer.trace(traceId, spanId, parentId);
log.info(LogUtil.marker(), "開(kāi)始處理");
return serverCallHandler.startCall(new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(serverCall) {
@Override
public void sendHeaders(Metadata responseHeaders) {
super.sendHeaders(responseHeaders);
}
@Override
public void close(Status status, Metadata trailers) {
super.close(status, trailers);
}
}, headers);
}
}
客戶端使用
客戶端使用代碼如下,該使用示例是在我開(kāi)源的日志工具中的例子,我這里通過(guò)springboot自動(dòng)裝配將GrpcInterceptor交由spring容器管理。所以可以直接通過(guò)自動(dòng)注入的方式使用。
@RestController
public class TestController {
@GrpcClient("userClient")
private UserServiceGrpc.UserServiceBlockingStub userService;
@Autowired
private GrpcInterceptor grpcInterceptor;
//@LogMarker(businessDescription = "獲取用戶名")
@GetMapping("/getUser")
public String getUser() {
User user = User.newBuilder()
.setUserId(100)
.putHobbys("pingpong", "play pingpong")
.setCode(200)
.build();
Channel channel = ClientInterceptors.intercept(userService.getChannel(), grpcInterceptor);
userService = UserServiceGrpc.newBlockingStub(channel);
User u = userService.getUser(user);
return u.getName();
}
}
總結(jié)
Java使用gRPC完成的服務(wù)間的調(diào)用可以通過(guò)io.grpc.ClientInterceptor和io.grpc.ServerInterceptor定義客戶端和服務(wù)端的攔截器實(shí)現(xiàn)分布式鏈路追蹤。
本文涉及的代碼可以在我之前開(kāi)源的分布式鏈路追蹤的日志工具中找到,項(xiàng)目地址:https://github.com/Redick01/log-helper
到此這篇關(guān)于Java gRPC攔截器簡(jiǎn)單實(shí)現(xiàn)分布式日志鏈路追蹤器過(guò)程詳解的文章就介紹到這了,更多相關(guān)Java gRPC攔截器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列
RabbitMQ的死信隊(duì)列用于接收其他隊(duì)列中的“死信”消息,所謂“死信”,是指滿足一定條件而無(wú)法被消費(fèi)者正確處理的消息,死信隊(duì)列通常與RabbitMQ的延遲隊(duì)列一起使用,本文給大家介紹了SpringBoot整合RabbitMQ實(shí)現(xiàn)延遲隊(duì)列和死信隊(duì)列,需要的朋友可以參考下2024-06-06
詳解基于java的Socket聊天程序——客戶端(附demo)
這篇文章主要介紹了詳解基于java的Socket聊天程序——客戶端(附demo),客戶端設(shè)計(jì)主要分成兩個(gè)部分,分別是socket通訊模塊設(shè)計(jì)和UI相關(guān)設(shè)計(jì)。有興趣的可以了解一下。2016-12-12
Java 中 System.load 和 System.loadLibrary&
在Java開(kāi)發(fā)中,有時(shí)候我們需要調(diào)用本地代碼(如 C、C++ 編寫(xiě)的代碼)來(lái)實(shí)現(xiàn)一些特定的功能,比如提高性能、訪問(wèn)底層硬件等,本文給大家介紹Java中System.load和System.loadLibrary方法使用舉例,感興趣的朋友一起看看吧2025-08-08

