Netty實現(xiàn)簡易版的RPC框架過程詳解
正文

netty-study 這個項目是沒用到的,可以刪掉,主要是測試Netty自定義協(xié)議的
1:如何運行項目
1:本地起一個zookeeper服務
2: 只需要運行 rpc-server 和 springboot-zk-study二個項目即可
3: 二個項目的application.yml 都不需要改,唯一要改的就是zookeepr的連接配置信息

4:啟動好之后,在瀏覽器訪問
http://localhost:8081/zk/test
http://localhost:8081/zk/people
http://localhost:8081/zk/list
可以查看到返回結(jié)果

2:從客戶端調(diào)用開始(springboot-zk-study項目)
@RestController
@RequestMapping("/zk")
public class ZkController {
@Resource
@MyResource
private UserService userService;
@Resource
@MyResource
private PeopleService peopleService;
@GetMapping("/test")
public String test() {
return userService.test("bjh-",1);
}
@GetMapping("/people")
public Object people() {
return peopleService.query(1L);
}
@GetMapping("/list")
public Object list() {
return peopleService.list();
}
}
只需要在我們需要進行RPC調(diào)用的接口上添加 @MyResource 注解即可,當我們執(zhí)行這個方法之后,就會執(zhí)行代理方法,代理方法在 rpc-core 項目中,為了閱讀清晰,我只貼出重點的方法
@Slf4j
public class ServiceProxy<T> implements InvocationHandler, ApplicationContextAware, ApplicationRunner {
......省略一些代碼
// 客戶端執(zhí)行方法之后,就會執(zhí)行到這里的代理方法
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//從注冊中心拿到服務列表
ZkNodeData zkNodeData = objectMapper.readValue(nodeData, ZkNodeData.class);
List<ZkProperties> zkPropertiesList = zkNodeData.getZkPropertiesList();
for(ZkProperties zkProperties : zkPropertiesList) {
String interfaceName = zkProperties.getInterfaceName();
Class<?> declaringClass = method.getDeclaringClass();
if(StringUtils.equals(declaringClass.getName(),interfaceName)) {
List<InterfaceInfo> info = zkProperties.getInfo();
InterfaceInfo interfaceInfo = info.get(0);
String ipAddress = interfaceInfo.getIpAddress();
List<InterfaceImplInfo> interfaceImplInfo = interfaceInfo.getInterfaceImplInfo();
InterfaceImplInfo implInfo = interfaceImplInfo.get(0);
String[] strings = ipAddress.split(":");
//與遠程Netty服務端發(fā)起連接
RpcClient rpcClient = connNettyServer(strings[0], zkPropertiesSource.getNettyConnectPort());
/**
* 封裝請求參數(shù)
*/
//獲取方法參數(shù)類型
Class<?>[] parameterTypes = method.getParameterTypes();
List<String> types = getTypes(parameterTypes);
//同步調(diào)用
result = remoteCall(method.getName(), types, args, rpcClient, implInfo, interfaceName);
log.info("返回結(jié)果是:{}",result);
}
}
Class<?> returnType = method.getReturnType();
Object value = objectMapper.readValue(result.toString(), returnType);
return value;
}
private RpcClient connNettyServer(String ipAddress,Integer port) {
return new RpcClient(ipAddress,port);
}
private Object remoteCall(String methodName, List<String> argTypes, Object[] args,RpcClient rpcClient,InterfaceImplInfo implInfo,String interfaceName) throws Exception{
RpcMessage rpcMessage = new RpcMessage();
......
//發(fā)送請求
Response result = rpcClient.sendRequest(rpcMessage);
log.info("請求結(jié)果是:{}", JSONUtil.toJsonPrettyStr(result));
return result.getData();
}
......省略一些代碼
我們初始化客戶端連接和發(fā)送請求都在一個RpcClient的類中,我們看下這個類的代碼
@Slf4j
public class RpcClient {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap;
private String ip;
private Integer port;
RpcClientHandler rpcClientHandler;
private ChannelFuture channelFuture;
public RpcClient(String ip,Integer port) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class) // 使用NioSocketChannel作為客戶端的通道實現(xiàn)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//加入處理器
rpcClientHandler = new RpcClientHandler();
ch.pipeline().addLast(new RpcDecoder());
ch.pipeline().addLast(new RpcEncoder());
ch.pipeline().addLast(rpcClientHandler);
}
});
try {
// 和遠程Nett服務端建立連接
channelFuture = bootstrap.connect(ip, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public Response sendRequest(RpcMessage rpcMessage) throws Exception{
//發(fā)送請求
channelFuture.channel().writeAndFlush(rpcMessage).sync();
channelFuture.channel().closeFuture().sync();
log.info("獲取返回結(jié)果=====================");
Response response = rpcClientHandler.getResponse();
return response;
}
}
客戶端在這發(fā)送請求到服務端之后,就接收服務端返回回來的消息即可,然后將返回結(jié)果返回給我們的接口??蛻舳说恼{(diào)用就到這里了,現(xiàn)在看下服務端的
3:服務端處理請求
服務端處理請求的核心都在 rpc-core的 RpcServerHandler中
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcMessage> {
ObjectMapper objectMapper = new ObjectMapper();
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage rpcMessage) throws Exception {
Object obj = rpcMessage.getObj();
RpcMessage rpcMessageResponse = new RpcMessage();
Response response = new Response();
try{
Request request = objectMapper.readValue(obj.toString(), Request.class);
String interfaceImplName = request.getInterfaceImplName();
Class<?> aClass = Class.forName(interfaceImplName);
List<String> paramsTypes = request.getParamsTypes();
try {
Object result = null;
//判讀方法是有參數(shù)的還是沒有參數(shù)的
if(paramsTypes.isEmpty()) {
Method declaredMethod = aClass.getDeclaredMethod(request.getMethodName());
result = declaredMethod.invoke(aClass.newInstance());
}else {
Map<String, Object> paramsObjectMap = TypeParseUtil.parseTypeString2Class(paramsTypes, request.getParams().toArray());
Class<?>[] classTypes = (Class<?>[]) paramsObjectMap.get("classTypes");
Object[] args = (Object[]) paramsObjectMap.get("args");
result = aClass.getMethod(request.getMethodName(), classTypes).invoke(aClass.newInstance(), args);
}
log.info("返回結(jié)果是:{}",result);
response.setData(objectMapper.writeValueAsString(result));
response.setIsOk(1);
response.setErrInfo("error");
rpcMessageResponse.setObj(response);
} catch (Throwable throwable) {
throwable.printStackTrace();
response.setData("error");
response.setIsOk(0);
response.setErrInfo(throwable.getMessage());
rpcMessageResponse.setObj(response);
}
}catch (Exception e) {
response.setData("error");
response.setIsOk(0);
response.setErrInfo(e.getMessage());
rpcMessageResponse.setObj(response);
}
String valueAsString = objectMapper.writeValueAsString(response);
rpcMessageResponse.setDataLength(valueAsString.getBytes(Charset.forName("utf-8")).length);
rpcMessageResponse.setObj(valueAsString);
channelHandlerContext.writeAndFlush(rpcMessageResponse);
}
}
服務端就拿到客戶端傳過來的接口名稱,從zookeeper獲取到具體的實現(xiàn)類,然后通過反射調(diào)用即可
4:接下來要做什么
上面只是簡單的介紹了下整個調(diào)用的大概過程,還有很多問題沒有解釋清楚,比如
1:在客戶端我們要使用UserService,但是你會發(fā)現(xiàn)我們使用了二個注解,一個是我們自定義的,一個是spring注入用的,但是在項目中我們并沒有這個接口的實現(xiàn)類,spring是怎么將這個接口注入到自己容器中的呢
2: 為什么調(diào)用使用了 @MyResource的接口方法都會走代理方法,是怎么做到的
@Resource @MyResource private PeopleService peopleService;
3:我們的服務是怎么在服務啟動的時候注冊到zookeeper的,注冊的信息又是什么,可以看下我們服務注冊到zookeeper的信息如下
{
"zkPropertiesList": [{
"interfaceName": "com.bjh.service.PeopleService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.PeopleServiceImpl",
"value": "com.bjh.service.PeopleServiceImpl"
}]
}]
}, {
"interfaceName": "com.bjh.service.UserService",
"info": [{
"ipAddress": "192.168.83.1:9091",
"interfaceImplInfo": [{
"name": "com.bjh.service.UserServiceImpl",
"value": "com.bjh.service.UserServiceImpl"
}]
}]
}]
}
4:在我們的服務端的實現(xiàn)類,我們只使用了我們自定義的 @Service注解,這個注解不是Spring的
@Service
public class PeopleServiceImpl implements PeopleService{
@Override
public People query(long id) {
People people = new People();
people.setId(id);
people.setName("coco");
return people;
}
@Override
public List<People> list() {
List<People> list = new ArrayList<>();
People people = new People();
people.setId(123L);
people.setName("coco");
People people2 = new People();
people2.setId(124L);
people2.setName("baojh");
list.add(people);
list.add(people2);
return list;
}
}
5:還有客戶端請求的結(jié)構(gòu)體是怎么樣的,還有返回響應結(jié)果是怎么樣的等等,后續(xù)我會繼續(xù)更新
更多關(guān)于Netty簡易版RPC框架的資料請關(guān)注腳本之家其它相關(guān)文章!

