Netty實(shí)現(xiàn)簡易版的RPC框架過程詳解
正文
項(xiàng)目地址:gitee.com/baojh123/rp…

netty-study 這個(gè)項(xiàng)目是沒用到的,可以刪掉,主要是測試Netty自定義協(xié)議的
1:如何運(yùn)行項(xiàng)目
1:本地起一個(gè)zookeeper服務(wù)
2: 只需要運(yùn)行 rpc-server 和 springboot-zk-study二個(gè)項(xiàng)目即可
3: 二個(gè)項(xiàng)目的application.yml 都不需要改,唯一要改的就是zookeepr的連接配置信息

4:啟動(dòng)好之后,在瀏覽器訪問
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回結(jié)果

2:從客戶端調(diào)用開始(springboot-zk-study項(xiàng)目)
@RestController
@RequestMapping("/zk")
public class ZkController {
@Resource
@MyResource
private UserService userService;
@Resource
@MyResource
private PeopleService peopleService;
@GetMapping("/test")
public String test() {
return userService.test("bjh-",1);
}
@GetMapping("/people")
public Object people() {
return peopleService.query(1L);
}
@GetMapping("/list")
public Object list() {
return peopleService.list();
}
}
只需要在我們需要進(jìn)行RPC調(diào)用的接口上添加 @MyResource 注解即可,當(dāng)我們執(zhí)行這個(gè)方法之后,就會(huì)執(zhí)行代理方法,代理方法在 rpc-core 項(xiàng)目中,為了閱讀清晰,我只貼出重點(diǎn)的方法
@Slf4j
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner {
......省略一些代碼
// 客戶端執(zhí)行方法之后,就會(huì)執(zhí)行到這里的代理方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//從注冊(cè)中心拿到服務(wù)列表
ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class);
List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList();
for(ZkProperties zkProperties : zkPropertiesList) {
String interfaceName = zkProperties.getInterfaceName();
Class<?> declaringClass = method.getDeclaringClass();
if(StringUtils.equals(declaringClass.getName(),interfaceName)) {
List<InterfaceInfo> info = zkProperties.getInfo();
InterfaceInfo interfaceInfo = info.get(0);
String ipAddress = interfaceInfo.getIpAddress();
List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo();
InterfaceImplInfo implInfo = interfaceImplInfo.get(0);
String[] strings = ipAddress.split(":");
//與遠(yuǎn)程N(yùn)etty服務(wù)端發(fā)起連接
RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort());
/**
* 封裝請(qǐng)求參數(shù)
*/
//獲取方法參數(shù)類型
Class<?>[] parameterTypes = method.getParameterTypes();
List<String> types = getTypes(parameterTypes);
//同步調(diào)用
result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName);
log.info("返回結(jié)果是:{}",result);
}
}
Class<?> returnType = method.getReturnType();
Object value = objectMapper.readValue(result.toString(), returnType);
return value;
}
private RpcClient connNettyServer(String ipAddress,Integer port) {
return new RpcClient(ipAddress,port);
}
private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{
RpcMessage rpcMessage = new RpcMessage();
......
//發(fā)送請(qǐng)求
Response result = rpcClient.sendRequest(rpcMessage);
log.info("請(qǐng)求結(jié)果是:{}", JSONUtil.toJsonPrettyStr(result));
return result.getData();
}
......省略一些代碼
我們初始化客戶端連接和發(fā)送請(qǐng)求都在一個(gè)RpcClient的類中,我們看下這個(gè)類的代碼
@Slf4j
public class RpcClient {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap;
private String ip;
private Integer port;
RpcClientHandler rpcClientHandler;
private ChannelFuture channelFuture;
public RpcClient(String ip,Integer port) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實(shí)現(xiàn)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入處理器
rpcClientHandler = new RpcClientHandler();
ch.pipeline().addLast(new RpcDecoder());
ch.pipeline().addLast(new RpcEncoder());
ch.pipeline().addLast(rpcClientHandler);
}
});
try {
// 和遠(yuǎn)程N(yùn)ett服務(wù)端建立連接
channelFuture = bootstrap.connect(ip, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Response sendRequest(RpcMessage rpcMessage) throws Exception{
//發(fā)送請(qǐng)求
channelFuture.channel().writeAndFlush(rpcMessage).sync();
channelFuture.channel().closeFuture().sync();
log.info("獲取返回結(jié)果=====================");
Response response = rpcClientHandler.getResponse();
return response;
}
}
客戶端在這發(fā)送請(qǐng)求到服務(wù)端之后,就接收服務(wù)端返回回來的消息即可,然后將返回結(jié)果返回給我們的接口??蛻舳说恼{(diào)用就到這里了,現(xiàn)在看下服務(wù)端的
3:服務(wù)端處理請(qǐng)求
服務(wù)端處理請(qǐng)求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
Object obj = rpcMessage.getObj();
RpcMessage rpcMessageResponse = new RpcMessage();
Response response = new Response();
try{
Request request = objectMapper.readValue(obj.toString(), Request.class);
String interfaceImplName = request.getInterfaceImplName();
Class<?> aClass = Class.forName(interfaceImplName);
List<String> paramsTypes = request.getParamsTypes();
try {
Object result = null;
//判讀方法是有參數(shù)的還是沒有參數(shù)的
if(paramsTypes.isEmpty()) {
Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName());
result = declaredMethod.invoke(aClass.newInstance());
}else {
Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray());
Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes");
Object[] args = (Object[]) paramsObjectMap.get("args");
result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args);
}
log.info("返回結(jié)果是:{}",result);
response.setData(objectMapper.writeValueAsString(result));
response.setIsOk(1);
response.setErrInfo("error");
rpcMessageResponse.setObj(response);
} catch (Throwable throwable) {
throwable.printStackTrace();
response.setData("error");
response.setIsOk(0);
response.setErrInfo(throwable.getMessage());
rpcMessageResponse.setObj(response);
}
}catch (Exception e) {
response.setData("error");
response.setIsOk(0);
response.setErrInfo(e.getMessage());
rpcMessageResponse.setObj(response);
}
String valueAsString = objectMapper.writeValueAsString(response);
rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length);
rpcMessageResponse.setObj(valueAsString);
channelHandlerContext.writeAndFlush(rpcMessageResponse);
}
}
服務(wù)端就拿到客戶端傳過來的接口名稱,從zookeeper獲取到具體的實(shí)現(xiàn)類,然后通過反射調(diào)用即可
4:接下來要做什么
上面只是簡單的介紹了下整個(gè)調(diào)用的大概過程,還有很多問題沒有解釋清楚,比如
1:在客戶端我們要使用UserService,但是你會(huì)發(fā)現(xiàn)我們使用了二個(gè)注解,一個(gè)是我們自定義的,一個(gè)是spring注入用的,但是在項(xiàng)目中我們并沒有這個(gè)接口的實(shí)現(xiàn)類,spring是怎么將這個(gè)接口注入到自己容器中的呢
2: 為什么調(diào)用使用了 @MyResource的接口方法都會(huì)走代理方法,是怎么做到的
@Resource @MyResource private PeopleService peopleService;
3:我們的服務(wù)是怎么在服務(wù)啟動(dòng)的時(shí)候注冊(cè)到zookeeper的,注冊(cè)的信息又是什么,可以看下我們服務(wù)注冊(cè)到zookeeper的信息如下
{
"zkPropertiesList": [{
"interfaceName": "com.bjh.service.PeopleService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.PeopleServiceImpl",
"value": "com.bjh.service.PeopleServiceImpl"
}]
}]
}, {
"interfaceName": "com.bjh.service.UserService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.UserServiceImpl",
"value": "com.bjh.service.UserServiceImpl"
}]
}]
}]
}
4:在我們的服務(wù)端的實(shí)現(xiàn)類,我們只使用了我們自定義的 @Service注解,這個(gè)注解不是Spring的
@Service
public class PeopleServiceImpl implements PeopleService{
@Override
public People query(long id) {
People people = new People();
people.setId(id);
people.setName("coco");
return people;
}
@Override
public List<People> list() {
List<People> list = new ArrayList<>();
People people = new People();
people.setId(123L);
people.setName("coco");
People people2 = new People();
people2.setId(124L);
people2.setName("baojh");
list.add(people);
list.add(people2);
return list;
}
}
5:還有客戶端請(qǐng)求的結(jié)構(gòu)體是怎么樣的,還有返回響應(yīng)結(jié)果是怎么樣的等等,后續(xù)我會(huì)繼續(xù)更新
更多關(guān)于Netty簡易版RPC框架的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實(shí)現(xiàn)全局監(jiān)聽鍵盤詳解
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)全局監(jiān)聽鍵盤的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解下2024-01-01
Java使用JSqlParser解析SQL語句應(yīng)用場景
JSqlParser是一個(gè)功能全面的Java庫,用于解析SQL語句,支持多種SQL方言,它可以輕松集成到Java項(xiàng)目中,并提供靈活的操作方式,本文介紹Java使用JSqlParser解析SQL語句總結(jié),感興趣的朋友一起看看吧2024-09-09
java集合模擬實(shí)現(xiàn)斗地主洗牌和發(fā)牌
這篇文章主要為大家詳細(xì)介紹了java集合模擬實(shí)現(xiàn)斗地主洗牌和發(fā)牌,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09

