java實(shí)現(xiàn)簡易版簡易版dubbo
一、dubbo簡介
實(shí)現(xiàn)一個簡易版的dubbo,首先看下dubbo是什么
dubbo是阿里開源的rpc框架,目前是apache頂級開源項(xiàng)目,可以用來構(gòu)建微服務(wù)。社區(qū)已經(jīng)到了3.0版本,生態(tài)活躍,原來是java寫的,現(xiàn)在有g(shù)o版本,支持云原生。
研究dubbo的目的
- dubbo渾身是寶,用到了zk/javassit/netty/spring/spi等技術(shù),可以說看懂了dubbo源碼,對自己是十分有幫助的。不僅能學(xué)習(xí)優(yōu)秀的代碼設(shè)計(jì),理解solid原則,更能深刻理解rpc的實(shí)現(xiàn)原理,了解各種技術(shù)的運(yùn)用。
- 當(dāng)前目前有不少大公司在使用,dubbo也是經(jīng)常被問的面試題,包括dubbo spi/dubbo通信流程,對于簡歷也是十分加分的。
- 手寫dubbo,比看懂dubbo要難得多,讓自己透徹理解dubbo的設(shè)計(jì)細(xì)節(jié),內(nèi)功修煉更上一層樓,以后再去看其他源碼會非???。
二、架構(gòu)設(shè)計(jì)
博主在看懂dubbo源碼的基礎(chǔ)上,自己動手實(shí)現(xiàn)dubbo最基礎(chǔ)的功能,包括服務(wù)注冊、服務(wù)發(fā)現(xiàn)、服務(wù)負(fù)載均衡、網(wǎng)絡(luò)通信、與spring boot集成等,以便更好掌握dubbo的實(shí)現(xiàn)原理。實(shí)現(xiàn)架構(gòu)如下:

Api層主要是面向框架使用者,通過注解就能使用簡易版dubbo。并實(shí)現(xiàn)了在spring boot中進(jìn)行自動配置。
目前微服務(wù)都是基于spring boot搭建的,大部分公司都從spring遷移到spring boot。整個實(shí)例都是基于spring boot來構(gòu)建的,基于自動化配置搞個starter,配置下yaml就能使用miniDubbo,無需配置xml。
服務(wù)發(fā)現(xiàn)基于zookeeper實(shí)現(xiàn),所有發(fā)現(xiàn)的實(shí)例,暫存在注冊目錄中。
Zookeeper是一個樹狀的結(jié)構(gòu),有三種節(jié)點(diǎn)類型,永久節(jié)點(diǎn)(需要自己刪除)、臨時節(jié)點(diǎn)(有會話的概念,當(dāng)zookeeper監(jiān)聽到會話斷開時,會刪除臨時節(jié)點(diǎn))、有序節(jié)點(diǎn)。在dubbo中主要用的是臨時節(jié)點(diǎn),用于實(shí)時感知服務(wù)實(shí)例上下線。
路徑格式:
/miniDubbo/${group}/${className}/providers/${ip:port}
集群失敗策略:主要有failover/failsafe/failback。
也就是當(dāng)consumer調(diào)用某個provider失敗時,采取的策略,failover表示失敗轉(zhuǎn)移,重試下一臺實(shí)例;failsafe表示不重試,不拋出異常,返回null。
模擬dubbo協(xié)議,這里通過netty實(shí)現(xiàn)網(wǎng)絡(luò)通信,底層使用JSON序列化,通過長度 + body的協(xié)議防止粘包。
Netty是java領(lǐng)域的一套高性能的nio異步網(wǎng)絡(luò)編程框架,底層也是基于java nio,已經(jīng)封裝好了網(wǎng)絡(luò)編程,實(shí)現(xiàn)網(wǎng)絡(luò)通信非常方便。在dubbo中用于provider-consumer建立長連接,避免重復(fù)建立連接。
整體思路是:consumer調(diào)用接口方法時,對接口進(jìn)行JDK代理,代理邏輯是通過服務(wù)發(fā)現(xiàn)找到一臺可用實(shí)例,將請求的類名、方法名、入?yún)⑦M(jìn)行打包、編碼,通過網(wǎng)絡(luò)請求發(fā)送給provider。在服務(wù)端拿到請求的類名、方法名、入?yún)⒑?,通過反射調(diào)用服務(wù)實(shí)例,并將結(jié)果的返回給consumer。
三、開發(fā)工具
需要安裝如下工具。
- IDEA+JDK8+MAVEN
- zookeeper
四、一步步實(shí)現(xiàn)
這里給出部分代碼,具體參考github:miniDubbo
4.1 客戶端消費(fèi)實(shí)現(xiàn)
4.1.1 掃描Reference注解,注入dubbo依賴
定義一個Reference注解,這個注解必須是運(yùn)行時注解,且至少可用在屬性上。
package com.jessin.practice.dubbo.processor;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* 消費(fèi)端注解
* @Author: jessin
* @Date: 19-11-25 下午9:48
*/
@Target({ ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.PARAMETER, ElementType.FIELD, ElementType.ANNOTATION_TYPE})
// 必須寫為runtime,否則獲取不到
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Reference {
String group() default "";
String version() default "1.0.0";
String timeout() default "3000";
String failStrategy() default "failover";
String retryCount() default "3";
}
在spring bean實(shí)例化后初始化前,對應(yīng)的是InstantiationAwareBeanPostProcessor#postProcessProperties,這個是bean實(shí)例化后第一個擴(kuò)展點(diǎn),且在aware方法之前,進(jìn)行依賴反射注入,并設(shè)置動態(tài)代理,代理實(shí)現(xiàn)該屬性對應(yīng)的接口。
package com.jessin.practice.dubbo.processor;
import com.jessin.practice.dubbo.config.InterfaceConfig;
import com.jessin.practice.dubbo.config.MiniDubboProperties;
import java.lang.reflect.Field;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.factory.config.InstantiationAwareBeanPostProcessorAdapter;
/**
* @Author: jessin
* @Date: 19-11-25 下午9:49
*/
@Slf4j
public class ReferenceBeanPostProcessor extends InstantiationAwareBeanPostProcessorAdapter {
private MiniDubboProperties miniDubboProperties;
public ReferenceBeanPostProcessor(MiniDubboProperties miniDubboProperties) {
this.miniDubboProperties = miniDubboProperties;
}
@Override
public PropertyValues postProcessProperties(PropertyValues pvs, Object bean, String beanName)
throws BeansException {
Field[] fields = bean.getClass().getDeclaredFields();
for (Field field : fields) {
boolean isExist = field.isAnnotationPresent(Reference.class);
if (isExist) {
try {
if (!field.getType().isInterface()) {
throw new RuntimeException("dubbo依賴不是接口:" + field.getType().getName());
}
Reference ref = field.getAnnotation(Reference.class);
log.info("嘗試注入接口代理,bean的{}屬性為:{}", beanName, ref);
// 私有屬性,必須設(shè)置為可訪問
field.setAccessible(true);
field.set(bean, JdkDynamicProxy.createProxy(field.getType(), transform(ref), miniDubboProperties));
} catch (IllegalAccessException e) {
log.error("設(shè)置jdk實(shí)例出錯啦:{}", field);
}
}
}
return pvs;
}
private InterfaceConfig transform(Reference ref) {
InterfaceConfig interfaceConfig = new InterfaceConfig();
interfaceConfig.setGroup(ref.group());
interfaceConfig.setVersion(ref.version());
interfaceConfig.setTimeout(ref.timeout());
interfaceConfig.setFailStrategy(ref.failStrategy());
interfaceConfig.setRetryCount(ref.retryCount());
return interfaceConfig;
}
}
動態(tài)代理的邏輯非常簡單,基于注冊目錄找到可用的DubboInvoker,并發(fā)起網(wǎng)絡(luò)請求,傳輸請求id、類名、入?yún)?、超時時間、版本等信息。
package com.jessin.practice.dubbo.processor;
import com.alibaba.fastjson.JSONObject;
import com.jessin.practice.dubbo.config.InterfaceConfig;
import com.jessin.practice.dubbo.config.MiniDubboProperties;
import com.jessin.practice.dubbo.invoker.FailfastClusterInvoker;
import com.jessin.practice.dubbo.invoker.RpcInvocation;
import com.jessin.practice.dubbo.registry.RegistryDirectory;
import com.jessin.practice.dubbo.transport.Response;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: jessin
* @Date: 19-11-25 下午9:54
*/
@Slf4j
public class JdkDynamicProxy<T> implements InvocationHandler {
private String clazzName;
private Object proxy;
private RegistryDirectory registryDirectory;
private FailfastClusterInvoker failfastClusterInvoker;
private InterfaceConfig interfaceConfig;
private MiniDubboProperties miniDubboProperties;
public JdkDynamicProxy(Class<T> clazz, InterfaceConfig interfaceConfig, MiniDubboProperties miniDubboProperties) {
proxy = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{clazz}, this);
this.clazzName = clazz.getName();
registryDirectory = new RegistryDirectory(clazzName, miniDubboProperties.getRegistry(), interfaceConfig);
failfastClusterInvoker = new FailfastClusterInvoker(registryDirectory);
this.interfaceConfig = interfaceConfig;
}
public static <T> Object createProxy(Class<T> clazz, InterfaceConfig interfaceConfig, MiniDubboProperties miniDubboProperties) {
return new JdkDynamicProxy(clazz, interfaceConfig, miniDubboProperties).proxy;
}
/**
* TODO 特殊方法不攔截。。
* @param proxy
* @param method
* @param args
* @return
* @throws Throwable
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if ("toString".equals(method.getName())) {
return this.toString();
}
// todo group,attachment
RpcInvocation rpcInvocation = new RpcInvocation();
rpcInvocation.setInterfaceName(clazzName);
rpcInvocation.setParameterType(method.getParameterTypes());
rpcInvocation.setArgs(args);
rpcInvocation.setMethodName(method.getName());
rpcInvocation.setVersion(interfaceConfig.getVersion());
Class returnType = method.getReturnType();
log.info("jdk調(diào)用:{},代理類為:{},返回類型:{}", rpcInvocation, proxy, returnType);
// todo 通過接口配置決定用哪種策略
Response response = (Response)failfastClusterInvoker.invoke(rpcInvocation);
if (returnType == Void.class) {
return null;
}
JSONObject jsonObject = (JSONObject)response.getResult();
return jsonObject.toJavaObject(returnType);
}
}
4.1.2 使用注冊目錄刷新服務(wù)實(shí)例
主要是創(chuàng)建zk連接,訂閱該類的zk路徑,處理zk子節(jié)點(diǎn)事件,維護(hù)子節(jié)點(diǎn)上線和下線,并構(gòu)造DubboInvoker,建立網(wǎng)絡(luò)連接。該service接口的所有可用實(shí)例,都會存在一個注冊目錄中,并動態(tài)刷新。
package com.jessin.practice.dubbo.registry;
import com.google.common.collect.Lists;
import com.jessin.practice.dubbo.config.InterfaceConfig;
import com.jessin.practice.dubbo.invoker.DubboInvoker;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
/**
* * 注冊項(xiàng)目錄,所有的dubboInvoker都保存到這里,實(shí)現(xiàn)zk listener,監(jiān)聽zk路徑變化,當(dāng)zk變化時,構(gòu)造DubboInvoker。
* * 每個service應(yīng)該有一個,同一個zk集群同一臺機(jī)器應(yīng)該只建立一個共享連接
* @Author: jessin
* @Date: 19-11-25 下午10:34
*/
@Slf4j
public class RegistryDirectory {
private Map<String, DubboInvoker> ipAndPort2InvokerMap = new ConcurrentHashMap<>();
private CuratorZookeeperClient curatorZookeeperClient;
private InterfaceConfig interfaceConfig;
private String providerPath;
/**
* TODO 創(chuàng)建zk連接,監(jiān)聽zk路徑創(chuàng)建DubboInvoker
* @param path
*/
public RegistryDirectory(String path, String registry, InterfaceConfig interfaceConfig) {
this.interfaceConfig = interfaceConfig;
// 監(jiān)聽group/接口名/providers,有變化時通知RegistryDirectory,也就是調(diào)用notify(url, listener, urls);
this.providerPath = "/miniDubbo/" + interfaceConfig.getGroup() + "/" + path + "/providers";
// TODO 創(chuàng)建zk連接,并創(chuàng)建RegistryDirectory,第一次時創(chuàng)建DubboInvoker
// 判斷zk/redis。
curatorZookeeperClient = RegistryManager.getCuratorZookeeperClient(registry);
// todo 抽取subscribe方法
List<String> children = curatorZookeeperClient.addTargetChildListener(providerPath, new ChildListener() {
@Override
public void childChanged(String path, List<String> children) {
log.info("監(jiān)聽到zk路徑變化:{},children:{}", path, children);
processChildren(children);
}
});
processChildren(children);
}
public void processChildren(List<String> children) {
try {
if (children == null || children.size() == 0) {
// 可能是遠(yuǎn)程抖動,或者zookeeper出問題了,造成所有服務(wù)實(shí)例下線,這里還需要通過心跳檢測。
log.info("監(jiān)聽到zk路徑無子節(jié)點(diǎn):{}", providerPath);
children = Lists.newArrayList();
}
List<String> added = children.stream()
.filter(one -> !ipAndPort2InvokerMap.containsKey(one))
.collect(Collectors.toList());
List<String> finalChildren = children;
List<String> deleted = ipAndPort2InvokerMap.keySet().stream()
.filter(one -> !finalChildren.contains(one))
.collect(Collectors.toList());
log.info("監(jiān)聽到zk路徑:{},子節(jié)點(diǎn)變化,新增zk節(jié)點(diǎn):{},刪除zk節(jié)點(diǎn):{}", providerPath, added, deleted);
added.forEach(ipAndPort -> {
if (!ipAndPort2InvokerMap.containsKey(ipAndPort)) {
ipAndPort2InvokerMap.put(ipAndPort, new DubboInvoker(ipAndPort, interfaceConfig));
}
});
deleted.forEach(ipAndPort -> {
ipAndPort2InvokerMap.get(ipAndPort).destroy();
ipAndPort2InvokerMap.remove(ipAndPort);
});
} catch (Exception e) {
log.error("處理zk事件出錯", e);
}
}
public List<DubboInvoker> getInvokerList() {
return new ArrayList<>(ipAndPort2InvokerMap.values());
}
}
4.1.3 Netty client網(wǎng)絡(luò)通信傳輸,編解碼器的實(shí)現(xiàn)
這里基于netty client 4.x api,也就是NioEventLoopGroup/NioSocketChannel等,快速開發(fā)網(wǎng)絡(luò)連接功能,異步發(fā)送網(wǎng)絡(luò)請求,處理各種網(wǎng)絡(luò)請求:
package com.jessin.practice.dubbo.netty;
import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: jessin
* @Date: 19-11-25 下午10:17
*/
@Slf4j
public class NettyClient {
private String ipAndPort;
/**
* worker可以共用
*/
private EventLoopGroup worker = new NioEventLoopGroup();
private Channel socketChannel;
private NettyClientHandler clientHandler = new NettyClientHandler();
public NettyClient(String ipAndPort) {
this.ipAndPort = ipAndPort;
connect();
}
public void connect() {
log.info("建立netty連接:{}", ipAndPort);
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(worker).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<SocketChannel>() {
// TODO 注意pipeline的順序
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
// clientHandler可以提為全局變量
.addLast(new BaseEncoder())
.addLast(new BaseDecoder())
.addLast(clientHandler);
}
});
// 客戶端是connect
String[] values = ipAndPort.split(":");
// TODO 考慮超時重連,心跳斷開重連,底層轉(zhuǎn)換為pipeline.connect()
ChannelFuture channelFuture = bootstrap.connect(values[0], Integer.parseInt(values[1])).sync();
if (channelFuture.isSuccess()) {
log.info("與服務(wù)端建立連接成功:{}", ipAndPort);
} else {
log.error("與服務(wù)端建立連接失敗:{}", ipAndPort);
}
// 建立連接時保存下來,可能有需要連接多個客戶端
this.socketChannel = channelFuture.channel();
} catch (Exception e) {
log.error("與服務(wù)端建立連接失敗:{}", ipAndPort, e);
throw new RuntimeException("與服務(wù)端建立連接失敗: " + ipAndPort, e);
}
}
/**
* 對外發(fā)送數(shù)據(jù)接口
* @param msg
*/
public void send(Object msg) {
// TODO 必須用writeAndFlush才會真正發(fā)出去,同時必須序列化為字符串,才能被編碼繼續(xù)往下走
String jsonStr = JSON.toJSONString(msg);
socketChannel.writeAndFlush(jsonStr);
}
public void close() {
log.info("關(guān)閉訪問服務(wù)的連接:{}", ipAndPort);
socketChannel.close();
if (socketChannel != null && socketChannel.isActive()) {
try {
socketChannel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
worker.shutdownGracefully();
}
}
編碼器通過長度字段 + body實(shí)現(xiàn),具體如下:
package com.jessin.practice.dubbo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* todo protocol buf
* @Author: jessin
* @Date: 19-11-25 下午10:20
*/
public class BaseEncoder extends MessageToByteEncoder<String> {
private Logger logger = LoggerFactory.getLogger(getClass());
/**
* Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled
* by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
* @param msg the message to encode
* @param out the {@link ByteBuf} into which the encoded message will be written
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
logger.info("對消息:{}進(jìn)行編碼", msg);
byte[] wordBytes = msg.getBytes("utf-8");
out.writeInt(wordBytes.length);
out.writeBytes(wordBytes);
}
}
對應(yīng)解碼器:
package com.jessin.practice.dubbo.netty;
/**
* 發(fā)送的字節(jié)數(shù) + 字節(jié)信息
* 按照這個格式進(jìn)行裝包和拆包,主要是會產(chǎn)生粘包的現(xiàn)象
* 也就是發(fā)送方按照abc, def, 發(fā)送
* 接收方收到的可能是a,bc,de,f,面向的是字節(jié)流,需要拆包解出命令
*
* @author jessin
* @create 19-11-25 下午10:20
**/
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class BaseDecoder extends ByteToMessageDecoder {
private Logger logger = LoggerFactory.getLogger(getClass());
private int totalBytes = -1;
/**
* Decode the from one {@link ByteBuf} to an other. This method will be called till either the input
* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input
* {@link ByteBuf}.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
* @param in the {@link ByteBuf} from which to read data
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int readableBytes = in.readableBytes();
if (totalBytes == -1 && readableBytes >= 4) {
totalBytes = in.readInt();
}
int secondReadableBytes = in.readableBytes();
if (totalBytes > 0 && secondReadableBytes >= totalBytes) {
byte[] realData = new byte[totalBytes];
in.readBytes(realData);
out.add(new String(realData, "utf-8"));
totalBytes = -1;
}
logger.info("讀取字節(jié)個數(shù):{},剩余字節(jié)個數(shù):{}", readableBytes, secondReadableBytes);
}
}
4.1.4 發(fā)起請求和接收響應(yīng)
如上,發(fā)送接口通過JSON進(jìn)行序列化。這里通過CountDownLatch實(shí)現(xiàn)客戶端等待功能,當(dāng)然也可以通過wait/notify實(shí)現(xiàn)。發(fā)送時設(shè)置自增id到請求體中,并保存到consumer本地map中,然后會調(diào)用CountDownLatch.await,并設(shè)置等待時間,進(jìn)行阻塞等待結(jié)果。在收到netty 響應(yīng)時,也就是netty channelRead有數(shù)據(jù)時,喚醒等待的調(diào)用線程。在channelRead中根據(jù)響應(yīng)的請求id,由netty線程設(shè)置上響應(yīng)結(jié)果,調(diào)用countDownLatch.countDown進(jìn)行喚醒。
package com.jessin.practice.dubbo.netty;
import com.alibaba.fastjson.JSON;
import com.jessin.practice.dubbo.transport.DefaultFuture;
import com.jessin.practice.dubbo.transport.Response;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: jessin
* @Date: 19-11-25 下午10:25
*/
@Slf4j
public class NettyClientHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端和服務(wù)端建立連接成功");
//ctx.writeAndFlush("{\"id\":1,\"rpcInvocation\":{\"interfaceName\":\"com.jessin.practice.dubbo.service.UserService\",\"methodName\":\"getUser\",\"parameterType\":[],\"version\":\"1.0.0\"}}");
}
/**
* 對響應(yīng)進(jìn)行處理
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("收到服務(wù)端消息:" + msg);
// result字段為JSONObject
Response response = JSON.parseObject((String)msg, Response.class);
// todo 返回list/map,帶復(fù)雜key/value的是否有問題
DefaultFuture.setResponse(response);
}
/**
* TODO 發(fā)送消息前進(jìn)行攔截,oubound,只有channel.writeAndFlush()才能起作用,active里直接用ctx不起作用
* @param ctx
* @param msg
* @param promise
* @throws Exception
*/
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("收到寫消息:" + msg);
// 必須的!保證繼續(xù)往下走,發(fā)送出去,其實(shí)就是ctx.write(msg, promise)
super.write(ctx, msg, promise);
// TODO promise模式,依賴地獄,以及Future回調(diào)模式(guava、java8)
promise.addListener(future -> {
// 監(jiān)聽發(fā)送回調(diào),看是否發(fā)送成功
if (future.isSuccess()) {
log.info("發(fā)送寫消息:{},成功", msg);
} else {
log.info("發(fā)送消息失敗:{}", msg);
}
});
}
}
4.2 服務(wù)實(shí)例曝光到注冊中心
4.2.1 在BeanDefinitionRegistryPostProcessor中處理@Service注解
服務(wù)端使用@Service注解:
package com.jessin.practice.dubbo.processor;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @Author: jessin
* @Date: 19-11-27 下午9:11
*/
@Target({ ElementType.TYPE, ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.PARAMETER, ElementType.FIELD, ElementType.ANNOTATION_TYPE})
// 必須寫為runtime,否則獲取不到
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Service {
String group() default "";
String version() default "1.0.0";
String timeout() default "3000";
}
需要掃描標(biāo)記有@Service的類,然后注冊到spring 容器,涉及到注冊BeanDefinition的,一般用到的擴(kuò)展點(diǎn)是BeanDefinitionRegistryPostProcessor#postProcessBeanDefinitionRegistry。這里為什么要自定義一個@Service注解,而不復(fù)用spring原有的@Service注解?是因?yàn)槲覀冞€要基于標(biāo)記有@Service的這個bean做一些處理操作,包括對該bean曝光到本地,開啟網(wǎng)絡(luò)監(jiān)聽,注冊到zk,這些動作需要封裝到“另外一個bean”來完成,我們需要注入“另外一個bean”這個Definition,這里也就是ServiceBean,下文講解。
package com.jessin.practice.dubbo.processor;
import com.jessin.practice.dubbo.config.InterfaceConfig;
import com.jessin.practice.dubbo.config.MiniDubboProperties;
import java.util.Set;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.annotation.AnnotationBeanNameGenerator;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.util.ClassUtils;
/**
* @Author: jessin
* @Date: 19-11-27 下午9:24
*/
public class ServiceBeanPostProcessor implements BeanDefinitionRegistryPostProcessor, BeanClassLoaderAware {
private AnnotationBeanNameGenerator annotationBeanNameGenerator = new AnnotationBeanNameGenerator();
private MiniDubboProperties miniDubboProperties;
private ClassLoader classLoader;
public ServiceBeanPostProcessor(MiniDubboProperties miniDubboProperties) {
this.miniDubboProperties = miniDubboProperties;
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
ClassPathBeanDefinitionScanner classPathBeanDefinitionScanner = new ClassPathBeanDefinitionScanner(registry, false);
classPathBeanDefinitionScanner.addIncludeFilter(new AnnotationTypeFilter(Service.class));
// 將該包下的@Service注解全部掃描為bean
Set<BeanDefinition> beanDefinitionSet
= classPathBeanDefinitionScanner.findCandidateComponents(miniDubboProperties.getPackagePath());
for (BeanDefinition beanDefinition : beanDefinitionSet) {
String beanName = annotationBeanNameGenerator.generateBeanName(beanDefinition, registry);
registry.registerBeanDefinition(beanName, beanDefinition);
BeanDefinition wrapper = new RootBeanDefinition(ServiceBean.class);
wrapper.getPropertyValues().addPropertyValue("ref", new RuntimeBeanReference(beanName));
wrapper.getPropertyValues().addPropertyValue("miniDubboProperties", miniDubboProperties);
Class beanClass = ClassUtils.resolveClassName(beanDefinition.getBeanClassName(), classLoader);
Service service = AnnotationUtils.findAnnotation(beanClass, Service.class);
wrapper.getPropertyValues().addPropertyValue("interfaceConfig", transform(service));
registry.registerBeanDefinition("dubbo_" + beanName, wrapper);
}
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
private InterfaceConfig transform(Service ref) {
InterfaceConfig interfaceConfig = new InterfaceConfig();
interfaceConfig.setGroup(ref.group());
interfaceConfig.setVersion(ref.version());
interfaceConfig.setTimeout(ref.timeout());
return interfaceConfig;
}
@Override
public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}
}
4.2.2 解析注解參數(shù),注冊服務(wù)到zookeeper
如上,我們在ServiceBean中,依賴標(biāo)記有@Service的bean,同時將其曝光到本地,開啟netty端口監(jiān)聽,注冊到zk,具體如下,見afterPropertiesSet方法:
package com.jessin.practice.dubbo.processor;
import com.jessin.practice.dubbo.config.InterfaceConfig;
import com.jessin.practice.dubbo.config.MiniDubboProperties;
import com.jessin.practice.dubbo.exporter.DubboExporter;
import com.jessin.practice.dubbo.netty.NettyManager;
import com.jessin.practice.dubbo.netty.NettyServer;
import com.jessin.practice.dubbo.registry.CuratorZookeeperClient;
import com.jessin.practice.dubbo.registry.RegistryManager;
import com.jessin.practice.dubbo.utils.NetUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
/**
* @Author: jessin
* @Date: 19-11-27 下午10:31
*/
@Slf4j
public class ServiceBean implements InitializingBean, DisposableBean {
private NettyServer nettyServer;
/**
* zk地址
*/
private CuratorZookeeperClient curatorZookeeperClient;
private Object ref;
private MiniDubboProperties miniDubboProperties;
private InterfaceConfig interfaceConfig;
public MiniDubboProperties getMiniDubboProperties() {
return miniDubboProperties;
}
public void setMiniDubboProperties(MiniDubboProperties miniDubboProperties) {
this.miniDubboProperties = miniDubboProperties;
}
public InterfaceConfig getInterfaceConfig() {
return interfaceConfig;
}
public void setInterfaceConfig(InterfaceConfig interfaceConfig) {
this.interfaceConfig = interfaceConfig;
}
public Object getRef() {
return ref;
}
public void setRef(Object ref) {
this.ref = ref;
}
@Override
public void afterPropertiesSet() throws Exception {
Class[] interfaces = ref.getClass().getInterfaces();
if (interfaces.length <= 0) {
throw new IllegalStateException(ref.getClass().getName() + "未實(shí)現(xiàn)接口");
}
// todo 目前只能實(shí)現(xiàn)一個接口
String clazzName = interfaces[0].getName();
log.info("曝光key:{},ref:{}", clazzName, ref);
// 暴露服務(wù) todo 版本
DubboExporter.exportService(clazzName, ref);
// 先開啟,再注冊
// 判斷協(xié)議
if ("dubbo".equals(miniDubboProperties.getProtocol())) {
// 開啟netty server
nettyServer = NettyManager.getNettyServer(miniDubboProperties.getPort());
} else {
throw new RuntimeException("unknown communicate protocol:" + miniDubboProperties.getProtocol());
}
// 判斷什么類型的注冊中心
curatorZookeeperClient = RegistryManager.getCuratorZookeeperClient(miniDubboProperties.getRegistry());
String providerPath = "/miniDubbo/" + interfaceConfig.getGroup() + "/" + clazzName + "/providers" + "/" + NetUtils.getServerIp() + ":" + miniDubboProperties.getPort();
// 注冊zk,提煉register方法
curatorZookeeperClient.create(providerPath, true);
}
@Override
public void destroy() throws Exception {
curatorZookeeperClient.doClose();
nettyServer.close();
}
}
4.2.3 開啟netty server,接收請求
在接受到consumer請求后,解碼,然后根據(jù)類名、方法名,找到對應(yīng)的曝光服務(wù),進(jìn)行反射調(diào)用,將方法返回結(jié)果和請求id原樣寫出去,返回給客戶端。具體如下:
package com.jessin.practice.dubbo.netty;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jessin.practice.dubbo.exception.DubboException;
import com.jessin.practice.dubbo.exporter.DubboExporter;
import com.jessin.practice.dubbo.invoker.RpcInvocation;
import com.jessin.practice.dubbo.transport.Request;
import com.jessin.practice.dubbo.transport.Response;
import com.jessin.practice.dubbo.utils.ArgDeserializerUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.lang.reflect.Method;
import lombok.extern.slf4j.Slf4j;
/**
* @Author: jessin
* @Date: 19-11-27 下午7:40
*/
@Slf4j
public class NettyServer {
// todo 底層會啟動2*cpu個數(shù)的NioEventLoop,輪詢注冊到對應(yīng)的NioEventLoop運(yùn)行
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup worker = new NioEventLoopGroup();
// 全局復(fù)用,是否需要考慮可共享?
private ServerHandler serverHandler = new ServerHandler();
private int port;
public NettyServer(int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
// boss線程池用于accept到達(dá)的請求,worker線程池對到達(dá)的請求進(jìn)行讀寫
// child表示對到達(dá)的請求起作用,沒有child表示對ServerSocketChannel起作用
// 服務(wù)端用NioServerSocketChannel
ChannelFuture channelFuture;
this.port = port;
try {
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.SO_KEEPALIVE, true)
// todo option最終設(shè)置到j(luò)dk sever channel上
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 對到達(dá)的請求進(jìn)行讀寫操作,責(zé)任鏈模式,ChannelPipeline
ch.pipeline()
.addLast(new BaseDecoder())
.addLast(new BaseEncoder())
.addLast(serverHandler);
}
});
// todo bind時,會新建NioServerSocketChannel,并注冊到NioEventLoop.selector中
// todo 底層轉(zhuǎn)換為pipeline.bind(),最終調(diào)用serverSocketChannel.bind(socketAddress, 128);
channelFuture = serverBootstrap.bind(port);
// 下面會阻塞
channelFuture.sync();
log.info("服務(wù)器綁定端口:{}成功", port);
// TODO 關(guān)閉時調(diào)用,客戶端也得關(guān)閉
// channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException("bind port error:" + port, e);
}
}
/**
* 允許注冊到多個客戶端SocketChannel中
*/
@ChannelHandler.Sharable
class ServerHandler extends ChannelDuplexHandler {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客戶端:{}和服務(wù)端建立連接成功", ctx.channel().remoteAddress());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 這里是String類型,已經(jīng)解碼了
Request request = JSONObject.parseObject((String)msg, Request.class);
log.info("收到請求消息:{}", msg);
RpcInvocation rpcInvocation = request.getRpcInvocation();
Object obj = DubboExporter.getService(rpcInvocation);
if (obj == null) {
throw new IllegalStateException("服務(wù)端未曝光接口:" + request);
}
Response response = new Response();
response.setId(request.getId());
try {
log.info("開始反射調(diào)用:{}", msg);
// todo 這里最好用線程池實(shí)現(xiàn),不然會阻塞NioEventLoop
Method method = obj.getClass().getMethod(rpcInvocation.getMethodName(), rpcInvocation.getParameterType());
Object[] originArgs = ArgDeserializerUtils.parseArgs(method, rpcInvocation.getParameterType(), rpcInvocation.getArgs());
log.info("入?yún)ⅲ簕}", originArgs);
Object responseData = method.invoke(obj, originArgs);
response.setResult(responseData);
log.info("調(diào)用實(shí)例:{},方法:{},返回結(jié)果:{}",
obj, method, response);
} catch (Exception e) {
log.error("調(diào)用dubbo異常:{}", rpcInvocation, e);
response.setException(true);
response.setResult(new DubboException("服務(wù)端調(diào)用接口異常", e));
}
// TODO 通過原來客戶端通道發(fā)送出去
// 這里會走編碼嗎?,必須寫成String,或者改下Encoder
ctx.writeAndFlush(JSON.toJSONString(response));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("收到客戶端退出的消息");
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("IO出錯了...", cause);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.info("發(fā)起寫請求:{}", msg);
// TODO 寫的一般都有這個?
super.write(ctx, msg, promise);
}
}
/**
* dubbo shutdown hook
*/
public void close() {
// TODO 這里是否有問題??
log.info("關(guān)閉端口:{}", port);
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
4.3 自動化配置實(shí)現(xiàn)
這里根據(jù)yaml中配置的開關(guān),自動開啟consumer/provider配置,需要注意的是,由于注入的@Service處理器是容器級別的后處理器,需要使用靜態(tài)方法進(jìn)行注入,避免過早初始化自動配置類,而且不能@autowirte 自動化屬性,需要通過方法獲取Environment,因?yàn)檫@個時候自動化屬性類還沒有對應(yīng)的后處理器對其進(jìn)行處理,拿到的屬性是空的,需要自己做bind。
最后在Resource目錄下,META-INF/spring.factories下,配置自動啟動即可:

package com.jessin.practice.dubbo.config;
import com.jessin.practice.dubbo.processor.ReferenceBeanPostProcessor;
import com.jessin.practice.dubbo.processor.Service;
import com.jessin.practice.dubbo.processor.ServiceBeanPostProcessor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* todo 自己調(diào)用自己,以及url支持,資源銷毀
* @Author: jessin
* @Date: 2021/10/26 9:27 PM
*/
@Configuration
@ConditionalOnClass(Service.class)
@EnableConfigurationProperties(MiniDubboProperties.class)
@Slf4j
public class MiniDubboAutoConfiguration {
static {
FastJsonConfig.config();
}
/**
* 由于BeanFactoryPostProcessor是提前獲取的,這個時候CommonAnnotationBeanPostProcessor還沒注冊到beanFactory中,
* serviceBeanPostProcessor注入的屬性為空
*/
// @Autowired
// private MiniDubboProperties miniDubboProperties;
// public MiniDubboAutoConfiguration() {
// log.info("init MiniDubboAutoConfiguration");
// }
/**
* 由于這個Bean是BeanFactoryPostProcessor,提前獲取時,
* ConfigurationProperties的ConfigurationPropertiesBindingPostProcessor還沒注入到beanFactory中,
* 所以MiniDubboProperties屬性沒法注入
* 這里通過environment構(gòu)造
* @param environment
* @return
*/
@Bean
@Conditional(ServerCondition.class)
@ConditionalOnMissingBean
public static ServiceBeanPostProcessor serviceBeanPostProcessor(Environment environment) {
MiniDubboProperties miniDubboProperties = getMiniDubboProperties(environment);
return new ServiceBeanPostProcessor(miniDubboProperties);
}
static class ServerCondition extends AnyNestedCondition {
ServerCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnProperty(prefix = MiniDubboProperties.PREFIX, name = "type", havingValue = "both")
static class HostProperty {
}
@ConditionalOnProperty(prefix = MiniDubboProperties.PREFIX, name = "type", havingValue = "server")
static class JndiNameProperty {
}
}
/**
* 使用靜態(tài)方法,防止造成自動化配置實(shí)例提前初始化,沒有進(jìn)行增強(qiáng)
* @param environment
* @return
*/
@Bean
@Conditional(ClientCondition.class)
@ConditionalOnMissingBean
public static ReferenceBeanPostProcessor referenceBeanPostProcessor(Environment environment) {
MiniDubboProperties miniDubboProperties = getMiniDubboProperties(environment);
return new ReferenceBeanPostProcessor(miniDubboProperties);
}
static class ClientCondition extends AnyNestedCondition {
ClientCondition() {
super(ConfigurationPhase.REGISTER_BEAN);
}
@ConditionalOnProperty(prefix = MiniDubboProperties.PREFIX, name = "type", havingValue = "both")
static class HostProperty {
}
@ConditionalOnProperty(prefix = MiniDubboProperties.PREFIX, name = "type", havingValue = "client")
static class JndiNameProperty {
}
}
private static MiniDubboProperties getMiniDubboProperties(Environment environment) {
MiniDubboProperties miniDubboProperties = Binder.get(environment) //首先要綁定配置器
//再將屬性綁定到對象上
.bind(MiniDubboProperties.PREFIX, Bindable.of(MiniDubboProperties.class) ).get(); //再獲取實(shí)例
return miniDubboProperties;
}
}
五、測試
5.1 編寫api
這里設(shè)置了5個接口,覆蓋了較多的場景,具體見:github
package com.jessin.practice.dubbo.service;
import com.jessin.practice.dubbo.model.User;
import com.jessin.practice.dubbo.model.UserParam;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author: jessin
* @Date: 19-11-24 上午11:23
*/
public interface UserService {
User getUser();
User getUser(UserParam userParam);
User getUser(int age);
User getUser(UserParam userParam,
int type,
String note,
int[] ages,
List<Integer> list);
User getUser(List<UserParam> list,
ArrayList<UserParam> list2,
Map<String, UserParam> userParamMap,
HashMap<String, UserParam> userParamMap2);
User getUser(List list, Map userParamMap);
}
5.2 實(shí)現(xiàn)api,標(biāo)記@Service
? ? 在miniDubboDemo中實(shí)現(xiàn)api,該工程基于spring boot實(shí)現(xiàn)。見miniDubboDemo
package com.example.demo.service;
import com.jessin.practice.dubbo.model.User;
import com.jessin.practice.dubbo.model.UserParam;
import com.jessin.practice.dubbo.processor.Service;
import com.jessin.practice.dubbo.service.UserService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Author: jessin
* @Date: 19-11-27 下午11:33
*/
@Service(group="myGroup")
public class UserServiceImpl implements UserService {
@Override
public User getUser() {
User user = new User();
user.setId(1L);
user.setName("小明");
user.setAge(10);
user.setNote("測試無參miniDubbo");
return user;
}
@Override
public User getUser(UserParam userParam) {
User user = new User();
user.setId(2L);
user.setName("小紅");
user.setAge(11);
user.setNote("測試帶參數(shù)UserParam");
return user;
}
@Override
public User getUser(int age) {
User user = new User();
user.setId(2L);
user.setName("小紅");
user.setAge(11);
user.setNote("測試帶參數(shù)int");
return user;
}
@Override
public User getUser(UserParam userParam, int type, String note, int[] ages, List<Integer> list) {
User user = new User();
user.setId(2L);
user.setName("小紅");
user.setAge(11);
user.setNote("測試帶多個參數(shù)");
return user;
}
@Override
public User getUser(List<UserParam> list, ArrayList<UserParam> arrayList, Map<String, UserParam> map,
HashMap<String, UserParam> hashMap) {
User user = new User();
user.setId(2L);
user.setName("小紅");
user.setAge(11);
user.setNote("測試帶list/map泛型參數(shù)");
return user;
}
@Override
public User getUser(List list, Map userParamMap) {
User user = new User();
user.setId(2L);
user.setName("小紅");
user.setAge(11);
user.setNote("測試帶list/map無參");
return user;
}
}
5.3 編寫controller接口,使用@Reference注入api依賴
也是在demo工程中,這里demo工程既作為provider,也作為consumer。
package com.example.demo.controller;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.jessin.practice.dubbo.model.User;
import com.jessin.practice.dubbo.model.UserParam;
import com.jessin.practice.dubbo.processor.Reference;
import com.jessin.practice.dubbo.service.UserService;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* TODO 泛化調(diào)用
*
* @Author: jessin
* @Date: 19-8-3 下午4:25
*/
@RestController
public class HelloController {
private Logger log = LoggerFactory.getLogger(getClass());
@Reference(group = "myGroup")
private UserService userService;
/**
* http://localhost:9999/practice/helloParam
*
* @return
*/
@RequestMapping("/hello")
public User hello(UserParam userParam, @RequestParam int type) {
log.info("test miniDubbo param");
if (type == 1) {
return userService.getUser();
} else if (type == 2) {
return userService.getUser(userParam);
} else if (type == 3) {
return userService.getUser(3);
} else if (type == 4) {
return userService.getUser(userParam, 3, "hello", new int[]{1, 2, 3}, Lists.newArrayList(3, 5, 7));
} else if (type == 5) {
UserParam user2 = new UserParam();
user2.setId(1132);
user2.setName("hello");
UserParam user3 = new UserParam();
user3.setId(353);
user3.setName("world");
Map<String, UserParam> map = Maps.newHashMap();
map.put("key1", user2);
HashMap<String, UserParam> hashMap = Maps.newHashMap();
hashMap.put("key2", user3);
return userService.getUser(Lists.newArrayList(user2, user3),
Lists.newArrayList(userParam), map, hashMap);
} else {
UserParam user2 = new UserParam();
user2.setId(1132);
user2.setName("hello");
UserParam user3 = new UserParam();
user3.setId(353);
user3.setName("world");
Map<String, UserParam> map = Maps.newHashMap();
map.put("key1", user2);
HashMap<String, UserParam> hashMap = Maps.newHashMap();
hashMap.put("key2", user3);
return userService.getUser(Lists.newArrayList(userParam, user2, user3), map);
}
}
}
5.4 啟動步驟
5.4.1 啟動zk
博主的zk部署到云主機(jī)上,需保證2181端口已經(jīng)啟動:
ubuntu@VM-0-14-ubuntu:~$ lsof -i:2181 COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME java 3904 ubuntu 27u IPv6 545176752 0t0 TCP VM-0-14-ubuntu:35306->1.15.130.58:2181 (ESTABLISHED) java 26361 ubuntu 45u IPv6 452967153 0t0 TCP *:2181 (LISTEN) java 26361 ubuntu 49u IPv6 545176753 0t0 TCP VM-0-14-ubuntu:2181->1.15.130.58:35306 (ESTABLISHED)
5.4.2 部署api和miniDubbo到本地倉庫
部署api到本地倉庫:api,部署miniDubbo到本地倉庫:miniDubbo
mvn install
5.4.3 在demo yaml配置miniDubbo。
配置zk地址和掃描的服務(wù)端路徑(只在角色為服務(wù)端生效)
mini-dubbo: package-path: "@Service注解所在包" registry: "your_zk_ip:2181"
5.4.4 編譯demo為可執(zhí)行jar
mvn package
5.4.5 開啟兩個服務(wù)端
這兩個服務(wù)端都部署在云服務(wù)器上,參數(shù)說明:
server.port=9997,指定http端口
public.ip=1.15.130.58,可以指定當(dāng)前注冊到zk上的公網(wǎng)地址,如果用的是云主機(jī)的話,可以在云端部署兩臺服務(wù)實(shí)例,以便在本地能進(jìn)行服務(wù)發(fā)現(xiàn)和調(diào)用。
mini-dubbo.type=server,表示角色是服務(wù)端
catalina.base=xxx,設(shè)置的是日志目錄,改為你的即可。
啟動第一個provider實(shí)例: 默認(rèn)9999 http端口,20880 miniDubbo端口
java -Dmini-dubbo.type=server -Dcatalina.base=/home/ubuntu/program/java/miniDubboDemo/tomcat1 -Dpublic.ip=1.15.130.58 -jar target/demo-0.0.1-SNAPSHOT.jar

啟動第二個provider實(shí)例: 默認(rèn)9998 http端口,20881 miniDubbo端口
java -Dmini-dubbo.type=server -Dcatalina.base=/home/ubuntu/program/java/miniDubboDemo/tomcat2 -Dserver.port=9998 -Dmini-dubbo.port=20881 -Dpublic.ip=1.15.130.58 -jar target/demo-0.0.1-SNAPSHOT.jar

5.4.6 本地開啟客戶端進(jìn)行調(diào)用
在本地啟動客戶端,并與兩個provider建立連接:
java -Dmini-dubbo.type=client -Dcatalina.base=./tomcat3 -Dserver.port=9997 -jar target/demo-0.0.1-SNAPSHOT.jar```

通過客戶端restapi觸發(fā)miniDubbo調(diào)用,可以掛掉一臺服務(wù),兩臺服務(wù),看看效果,并通過type調(diào)用不同的api方法
http://localhost:9997/practice/hello?type=2
得到結(jié)果:

客戶端發(fā)起請求,可以看到調(diào)用的是20881這個服務(wù):

服務(wù)端返回結(jié)果:

上面將20881銷毀后,再次調(diào)用客戶端發(fā)起請求,自動轉(zhuǎn)移到20880這個服務(wù):

將20880也銷毀后,再次調(diào)用客戶端發(fā)起請求,直接拋出no provider異常:

六、總結(jié)
6.1 已完成功能列表
- 基于zk服務(wù)注冊和服務(wù)發(fā)現(xiàn)
- 基于netty + json序列化網(wǎng)絡(luò)通信
- zk連接復(fù)用、netty client連接復(fù)用
- 與spring boot集成
6.2 TODO LIST
通用需求:
- spring容器銷毀時,關(guān)閉占用的資源,如netty client/netty server
- 支持protobuf序列化
- 支持http協(xié)議通信
- netty超時重連,心跳斷開重連,銷毀dubbo invoker
- zk抖動導(dǎo)致所有服務(wù)實(shí)例下線優(yōu)化
- 其他注冊中心支持,如consul/redis
- Attach/tag實(shí)現(xiàn)
- 支持一個類多個版本實(shí)現(xiàn)
- zk重新連接時,需要重新注冊關(guān)注的事件,恢復(fù)現(xiàn)場,對于服務(wù)端是重新注冊;對于客戶端是重新訂閱
provider功能:
- 服務(wù)端優(yōu)雅啟動和優(yōu)雅下線,防止流量過早進(jìn)來,造成超時。在spring容器啟動成功后,再注冊到zk上。在spring容器銷毀時,先從zk取消注冊,最后再關(guān)閉客戶端連接。
- 服務(wù)端請求用線程池實(shí)現(xiàn),避免阻塞NioEventLoop
- 服務(wù)端支持曝光實(shí)現(xiàn)多個接口的一個類
consumer功能:
- netty client通過計(jì)數(shù)引用銷毀資源
- 服務(wù)負(fù)載均衡算法:隨機(jī)/輪詢/加權(quán)
- 集群失敗策略:failover/failsafe/failback
以上就是java實(shí)現(xiàn)簡易版簡易版dubbo的詳細(xì)內(nèi)容,更多關(guān)于dubbo的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Elasticsearch QueryBuilder簡單查詢實(shí)現(xiàn)解析
這篇文章主要介紹了Elasticsearch QueryBuilder簡單查詢實(shí)現(xiàn)解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-08-08
java網(wǎng)絡(luò)編程學(xué)習(xí)java聊天程序代碼分享
java聊天程序代碼分享,大家參考使用吧2013-12-12
使用google.kaptcha來生成圖片驗(yàn)證碼的實(shí)現(xiàn)方法
這篇文章主要介紹了使用google.kaptcha來生成圖片驗(yàn)證碼的實(shí)現(xiàn)方法,非常不錯具有一定的參考借鑒價值,需要的朋友可以參考下2018-09-09
淺析Android系統(tǒng)中HTTPS通信的實(shí)現(xiàn)
這篇文章主要介紹了淺析Android系統(tǒng)中HTTPS通信的實(shí)現(xiàn),實(shí)現(xiàn)握手的源碼為Java語言編寫,需要的朋友可以參考下2015-07-07

