Netty實(shí)現(xiàn)簡(jiǎn)易版的RPC框架過(guò)程詳解
正文
項(xiàng)目地址:gitee.com/baojh123/rp…
netty-study 這個(gè)項(xiàng)目是沒(méi)用到的,可以刪掉,主要是測(cè)試Netty自定義協(xié)議的
1:如何運(yùn)行項(xiàng)目
1:本地起一個(gè)zookeeper服務(wù)
2: 只需要運(yùn)行 rpc-server 和 springboot-zk-study二個(gè)項(xiàng)目即可
3: 二個(gè)項(xiàng)目的application.yml 都不需要改,唯一要改的就是zookeepr的連接配置信息
4:?jiǎn)?dòng)好之后,在瀏覽器訪問(wèn)
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回結(jié)果
2:從客戶端調(diào)用開(kāi)始(springboot-zk-study項(xiàng)目)
@RestController @RequestMapping("/zk") public class ZkController { @Resource @MyResource private UserService userService; @Resource @MyResource private PeopleService peopleService; @GetMapping("/test") public String test() { return userService.test("bjh-",1); } @GetMapping("/people") public Object people() { return peopleService.query(1L); } @GetMapping("/list") public Object list() { return peopleService.list(); } }
只需要在我們需要進(jìn)行RPC調(diào)用的接口上添加 @MyResource 注解即可,當(dāng)我們執(zhí)行這個(gè)方法之后,就會(huì)執(zhí)行代理方法,代理方法在 rpc-core 項(xiàng)目中,為了閱讀清晰,我只貼出重點(diǎn)的方法
@Slf4j public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner { ......省略一些代碼 // 客戶端執(zhí)行方法之后,就會(huì)執(zhí)行到這里的代理方法 @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //從注冊(cè)中心拿到服務(wù)列表 ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class); List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList(); for(ZkProperties zkProperties : zkPropertiesList) { String interfaceName = zkProperties.getInterfaceName(); Class<?> declaringClass = method.getDeclaringClass(); if(StringUtils.equals(declaringClass.getName(),interfaceName)) { List<InterfaceInfo> info = zkProperties.getInfo(); InterfaceInfo interfaceInfo = info.get(0); String ipAddress = interfaceInfo.getIpAddress(); List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo(); InterfaceImplInfo implInfo = interfaceImplInfo.get(0); String[] strings = ipAddress.split(":"); //與遠(yuǎn)程N(yùn)etty服務(wù)端發(fā)起連接 RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort()); /** * 封裝請(qǐng)求參數(shù) */ //獲取方法參數(shù)類型 Class<?>[] parameterTypes = method.getParameterTypes(); List<String> types = getTypes(parameterTypes); //同步調(diào)用 result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName); log.info("返回結(jié)果是:{}",result); } } Class<?> returnType = method.getReturnType(); Object value = objectMapper.readValue(result.toString(), returnType); return value; } private RpcClient connNettyServer(String ipAddress,Integer port) { return new RpcClient(ipAddress,port); } private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{ RpcMessage rpcMessage = new RpcMessage(); ...... //發(fā)送請(qǐng)求 Response result = rpcClient.sendRequest(rpcMessage); log.info("請(qǐng)求結(jié)果是:{}", JSONUtil.toJsonPrettyStr(result)); return result.getData(); } ......省略一些代碼
我們初始化客戶端連接和發(fā)送請(qǐng)求都在一個(gè)RpcClient的類中,我們看下這個(gè)類的代碼
@Slf4j public class RpcClient { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap; private String ip; private Integer port; RpcClientHandler rpcClientHandler; private ChannelFuture channelFuture; public RpcClient(String ip,Integer port) { bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實(shí)現(xiàn) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //加入處理器 rpcClientHandler = new RpcClientHandler(); ch.pipeline().addLast(new RpcDecoder()); ch.pipeline().addLast(new RpcEncoder()); ch.pipeline().addLast(rpcClientHandler); } }); try { // 和遠(yuǎn)程N(yùn)ett服務(wù)端建立連接 channelFuture = bootstrap.connect(ip, port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } public Response sendRequest(RpcMessage rpcMessage) throws Exception{ //發(fā)送請(qǐng)求 channelFuture.channel().writeAndFlush(rpcMessage).sync(); channelFuture.channel().closeFuture().sync(); log.info("獲取返回結(jié)果====================="); Response response = rpcClientHandler.getResponse(); return response; } }
客戶端在這發(fā)送請(qǐng)求到服務(wù)端之后,就接收服務(wù)端返回回來(lái)的消息即可,然后將返回結(jié)果返回給我們的接口??蛻舳说恼{(diào)用就到這里了,現(xiàn)在看下服務(wù)端的
3:服務(wù)端處理請(qǐng)求
服務(wù)端處理請(qǐng)求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> { ObjectMapper objectMapper = new ObjectMapper(); @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception { Object obj = rpcMessage.getObj(); RpcMessage rpcMessageResponse = new RpcMessage(); Response response = new Response(); try{ Request request = objectMapper.readValue(obj.toString(), Request.class); String interfaceImplName = request.getInterfaceImplName(); Class<?> aClass = Class.forName(interfaceImplName); List<String> paramsTypes = request.getParamsTypes(); try { Object result = null; //判讀方法是有參數(shù)的還是沒(méi)有參數(shù)的 if(paramsTypes.isEmpty()) { Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName()); result = declaredMethod.invoke(aClass.newInstance()); }else { Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray()); Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes"); Object[] args = (Object[]) paramsObjectMap.get("args"); result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args); } log.info("返回結(jié)果是:{}",result); response.setData(objectMapper.writeValueAsString(result)); response.setIsOk(1); response.setErrInfo("error"); rpcMessageResponse.setObj(response); } catch (Throwable throwable) { throwable.printStackTrace(); response.setData("error"); response.setIsOk(0); response.setErrInfo(throwable.getMessage()); rpcMessageResponse.setObj(response); } }catch (Exception e) { response.setData("error"); response.setIsOk(0); response.setErrInfo(e.getMessage()); rpcMessageResponse.setObj(response); } String valueAsString = objectMapper.writeValueAsString(response); rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length); rpcMessageResponse.setObj(valueAsString); channelHandlerContext.writeAndFlush(rpcMessageResponse); } }
服務(wù)端就拿到客戶端傳過(guò)來(lái)的接口名稱,從zookeeper獲取到具體的實(shí)現(xiàn)類,然后通過(guò)反射調(diào)用即可
4:接下來(lái)要做什么
上面只是簡(jiǎn)單的介紹了下整個(gè)調(diào)用的大概過(guò)程,還有很多問(wèn)題沒(méi)有解釋清楚,比如
1:在客戶端我們要使用UserService,但是你會(huì)發(fā)現(xiàn)我們使用了二個(gè)注解,一個(gè)是我們自定義的,一個(gè)是spring注入用的,但是在項(xiàng)目中我們并沒(méi)有這個(gè)接口的實(shí)現(xiàn)類,spring是怎么將這個(gè)接口注入到自己容器中的呢
2: 為什么調(diào)用使用了 @MyResource的接口方法都會(huì)走代理方法,是怎么做到的
@Resource @MyResource private PeopleService peopleService;
3:我們的服務(wù)是怎么在服務(wù)啟動(dòng)的時(shí)候注冊(cè)到zookeeper的,注冊(cè)的信息又是什么,可以看下我們服務(wù)注冊(cè)到zookeeper的信息如下
{ "zkPropertiesList": [{ "interfaceName": "com.bjh.service.PeopleService", "info": [{ "ipAddress": "192.168.83.1:9091", "interfaceImplInfo": [{ "name": "com.bjh.service.PeopleServiceImpl", "value": "com.bjh.service.PeopleServiceImpl" }] }] }, { "interfaceName": "com.bjh.service.UserService", "info": [{ "ipAddress": "192.168.83.1:9091", "interfaceImplInfo": [{ "name": "com.bjh.service.UserServiceImpl", "value": "com.bjh.service.UserServiceImpl" }] }] }] }
4:在我們的服務(wù)端的實(shí)現(xiàn)類,我們只使用了我們自定義的 @Service注解,這個(gè)注解不是Spring的
@Service public class PeopleServiceImpl implements PeopleService{ @Override public People query(long id) { People people = new People(); people.setId(id); people.setName("coco"); return people; } @Override public List<People> list() { List<People> list = new ArrayList<>(); People people = new People(); people.setId(123L); people.setName("coco"); People people2 = new People(); people2.setId(124L); people2.setName("baojh"); list.add(people); list.add(people2); return list; } }
5:還有客戶端請(qǐng)求的結(jié)構(gòu)體是怎么樣的,還有返回響應(yīng)結(jié)果是怎么樣的等等,后續(xù)我會(huì)繼續(xù)更新
更多關(guān)于Netty簡(jiǎn)易版RPC框架的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
java實(shí)現(xiàn)全局監(jiān)聽(tīng)鍵盤詳解
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)全局監(jiān)聽(tīng)鍵盤的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解下2024-01-01Java中的Semaphore信號(hào)量簡(jiǎn)析
這篇文章主要介紹了Java中的Semaphore信號(hào)量簡(jiǎn)析,Semaphore:信號(hào)量,用來(lái)限制能同時(shí)訪問(wèn)共享資源的線程上限,使用Semaphore實(shí)現(xiàn)簡(jiǎn)單連接池,對(duì)比享元模式下的實(shí)現(xiàn)(用wait和notify),性能和可讀性要更好,需要的朋友可以參考下2023-12-12Java使用JSqlParser解析SQL語(yǔ)句應(yīng)用場(chǎng)景
JSqlParser是一個(gè)功能全面的Java庫(kù),用于解析SQL語(yǔ)句,支持多種SQL方言,它可以輕松集成到Java項(xiàng)目中,并提供靈活的操作方式,本文介紹Java使用JSqlParser解析SQL語(yǔ)句總結(jié),感興趣的朋友一起看看吧2024-09-09java集合模擬實(shí)現(xiàn)斗地主洗牌和發(fā)牌
這篇文章主要為大家詳細(xì)介紹了java集合模擬實(shí)現(xiàn)斗地主洗牌和發(fā)牌,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-09-09