Sentinel源碼解析入口類和SlotChain構(gòu)建過程詳解
1. 測試用例
我們以sentinel-demo中的sentinel-annotation-spring-aop為例,分析sentinel的源碼。核心代碼如下:
DemoController:
@RestController public class DemoController { @Autowired private TestService service; @GetMapping("/foo") public String apiFoo(@RequestParam(required = false) Long t) throws Exception { if (t == null) { t = System.currentTimeMillis(); } service.test(); return service.hello(t); } @GetMapping("/baz/{name}") public String apiBaz(@PathVariable("name") String name) { return service.helloAnother(name); } }
TestServiceImpl:
@Service public class TestServiceImpl implements TestService { @Override @SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class}) public void test() { System.out.println("Test"); } @Override @SentinelResource(value = "hello", fallback = "helloFallback") public String hello(long s) { if (s < 0) { throw new IllegalArgumentException("invalid arg"); } return String.format("Hello at %d", s); } @Override @SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback", exceptionsToIgnore = {IllegalStateException.class}) public String helloAnother(String name) { if (name == null || "bad".equals(name)) { throw new IllegalArgumentException("oops"); } if ("foo".equals(name)) { throw new IllegalStateException("oops"); } return "Hello, " + name; } public String helloFallback(long s, Throwable ex) { // Do some log here. ex.printStackTrace(); return "Oops, error occurred at " + s; } public String defaultFallback() { System.out.println("Go to default fallback"); return "default_fallback"; } }
啟動類DemoApplication
:
@SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
在啟動這個工程上增加參數(shù):
-Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj
如圖:
打開http://localhost:8081/#/dashboard 地址,可以看到應(yīng)用已經(jīng)注冊到sentinel管理后臺:
1.1 流控測試
訪問 http://localhost:19966/foo?t=188 這個鏈接,多訪問幾次,在實時監(jiān)控
頁面可以看到:
然后,我們先簡單配置一個流控規(guī)則,如下:
其中,資源名為:
然后我們在快速刷新http://localhost:19966/foo?t=188 接口,會出現(xiàn)限流的情況,返回如下:
Oops, error occurred at 188
實時監(jiān)控為:
2. 注解版源碼分析
使用注解@SentinelResource
核心原理就是 利用AOP切入到方法中,我們直接看SentinelResourceAspect
類,這是一個切面類:
@Aspect // 切面 public class SentinelResourceAspect extends AbstractSentinelAspectSupport { // 指定切入點為@SentinelResource 注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } // 環(huán)繞通知 @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 要織入的,增強的功能 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); // 調(diào)用目標方法 return pjp.proceed(); } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } }
核心方法SphU.entry()
:
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException { // 注意 第4個參數(shù)值為 1 return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException { // count 參數(shù):表示當前請求可以增加多少個計數(shù) // 注意 第5個參數(shù)為false return entryWithType(name, resourceType, entryType, count, false, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { // 將信息封裝為一個資源對象 StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType); // 返回一個資源操作對象entry // prioritized 為true 表示當前訪問必須等待"根據(jù)其優(yōu)先級計算出的時間"后才通過 // prioritized 為 false 則當前請求無需等待 return entryWithPriority(resource, count, prioritized, args); }
我們重點看一下CtSph#entryWithPriority
:
/** * @param resourceWrapper * @param count 默認為1 * @param prioritized 默認為false * @param args * @return * @throws BlockException */ private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 從ThreadLocal中獲取Context // 一個請求會占用一個線程,一個線程會綁定一個context Context context = ContextUtil.getContext(); // 若context是 NullContext類型,則表示當前系統(tǒng)中的context數(shù)量已經(jīng)超過閾值 // 即訪問的請求的數(shù)量已經(jīng)超出了閾值,此時直接返回一個無需做規(guī)則檢測的資源操作對象 if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } // 當前線程中沒有綁定context,則創(chuàng)建一個context并將其放入到Threadlocal if (context == null) { // todo Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. // 若全局開關(guān)是關(guān)閉的,直接返回一個無需做規(guī)則檢測的資源操作對象 if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // todo 查找SlotChain ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ // 若沒有知道chain,則意味著chain數(shù)量超出了閾值 if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 創(chuàng)建一個資源操作對象 Entry e = new CtEntry(resourceWrapper, chain, context); try { // todo 對資源進行操作 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
2.1 默認Context創(chuàng)建
當前線程沒有綁定Context,則創(chuàng)建一個context并將其放入到Threadlocal
。核心方法為 InternalContextUtil.internalEnter
:
public static Context enter(String name, String origin) { if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) { throw new ContextNameDefineException( "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!"); } return trueEnter(name, origin); } protected static Context trueEnter(String name, String origin) { // 嘗試從ThreadLocal中獲取context Context context = contextHolder.get(); // 若Threadlocal中沒有,則嘗試從緩存map中獲取 if (context == null) { // 緩存map的key為context名稱,value為EntranceNode Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; // DCL 雙重檢測鎖,防止并發(fā)創(chuàng)建對象 DefaultNode node = localCacheNameMap.get(name); if (node == null) { // 若緩存map的size 大于 context數(shù)量的最大閾值,則直接返回NULL_CONTEXT if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 創(chuàng)建一個EntranceNode node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. // 將新建的node添加到Root Constants.ROOT.addChild(node); // 將新建的node寫入到緩存map // 為了防止"迭代穩(wěn)定性問題"-iterate stable 對于共享集合的寫操作 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } // 將context的name與entranceNode 封裝成context context = new Context(node, name); // 初始化context的來源 context.setOrigin(origin); // 將context寫入到ThreadLocal contextHolder.set(context); } return context; }
注意:因為 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();
是 HashMap結(jié)構(gòu),所以存在并發(fā)安全問題,采用 代碼中方式進行添加操作。
2.2 查找并創(chuàng)建SlotChain
構(gòu)建調(diào)用鏈lookProcessChain(resourceWrapper)
:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { // 緩存map的key為資源 value為其相關(guān)的SlotChain ProcessorSlotChain chain = chainMap.get(resourceWrapper); // DCL // 若緩存中沒有相關(guān)的SlotChain 則創(chuàng)建一個并放入到緩存中 if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. // 緩存map的size 大于 chain數(shù)量的最大閾值,則直接返回null,不在創(chuàng)建新的chain if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // todo 創(chuàng)建新的chain chain = SlotChainProvider.newSlotChain(); // 防止 迭代穩(wěn)定性問題 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
我們直接看核心方法SlotChainProvider.newSlotChain();
:
public static ProcessorSlotChain newSlotChain() { // 若builder不為null,則直接使用builder構(gòu)建一個chain // 否則先創(chuàng)建一個builder if (slotChainBuilder != null) { return slotChainBuilder.build(); } // Resolve the slot chain builder SPI. // 通過SPI方式創(chuàng)建builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); // 若通過SPI未能創(chuàng)建builder,則創(chuàng)建一個默認的DefaultSlotChainBuilder if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } // todo 構(gòu)建一個chain return slotChainBuilder.build(); } private SlotChainProvider() {} }
2.2.1 創(chuàng)建slotChainBuilder
// 通過SPI方式創(chuàng)建builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
通過SPI方法創(chuàng)建slotChainBuilder
,去項目中META-INF.service
中獲取:
2.2.2 slotChainBuilder.build()
@Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 通過SPI方式構(gòu)建Slot List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }
通過SPI機制,去項目中META-INF.service
中獲取,在sentinel-core
項目中:
還有一個ParamFlowSlot
,在sentinel-extension/sentinel-parameter-flow-control
下:
我們點擊 NodeSelectorSlot
, 類上面是有 優(yōu)先級order,數(shù)字越小,優(yōu)先級越高。
@Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
優(yōu)先級常量為:
public static final int ORDER_NODE_SELECTOR_SLOT = -10000; public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000; public static final int ORDER_LOG_SLOT = -8000; public static final int ORDER_STATISTIC_SLOT = -7000; public static final int ORDER_AUTHORITY_SLOT = -6000; public static final int ORDER_SYSTEM_SLOT = -5000; public static final int ORDER_FLOW_SLOT = -2000; public static final int ORDER_DEGRADE_SLOT = -1000;
我們看代碼中的變量sortedSlotList
,已經(jīng)按照優(yōu)先級排序好了:
我們看一下構(gòu)建的ProcessorSlotChain
,類似一個單鏈表結(jié)構(gòu),如下:
我們看一下相關(guān)的類結(jié)構(gòu):DefaultProcessorSlotChain
:
// 這是一個單向鏈表,默認包含一個接節(jié)點,且有兩個指針first 和end同時指向這個節(jié)點 public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } }
AbstractLinkedProcessorSlot
:
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { // 聲明一個同類型的變量,則可以指向下一個Slot節(jié)點 private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } @Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { if (next != null) { next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot<?> getNext() { return next; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; } }
構(gòu)建完成后的SlotChain
和工作原理圖一樣:
接下來,對資源進行操作的核心方法為chain.entry(context, resourceWrapper, null, count, prioritized, args);
,這個我們下篇文章分析。
參考文章
以上就是Sentinel源碼解析入口類和SlotChain構(gòu)建過程詳解的詳細內(nèi)容,更多關(guān)于Sentinel入口類SlotChain構(gòu)建的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Maven配置文件修改及導(dǎo)入第三方j(luò)ar包的實現(xiàn)
本文主要介紹了Maven配置文件修改及導(dǎo)入第三方j(luò)ar包的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08spring cloud gateway 全局過濾器的實現(xiàn)
全局過濾器作用于所有的路由,不需要單獨配置,我們可以用它來實現(xiàn)很多統(tǒng)一化處理的業(yè)務(wù)需求,這篇文章主要介紹了spring cloud gateway 全局過濾器的實現(xiàn),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-03-03IDEA安裝部署Alibaba Cloud Toolkit的實現(xiàn)步驟
Alibaba Cloud Toolkit是阿里云針對IDE平臺為開發(fā)者提供的一款插件,本文主要介紹了IDEA安裝部署Alibaba Cloud Toolkit的實現(xiàn)步驟,具有一定的參考價值,感興趣的可以了解一下2023-08-08idea手動執(zhí)行maven命令的三種實現(xiàn)方式
這篇文章主要介紹了idea手動執(zhí)行maven命令的三種實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08使用SpringBoot項目導(dǎo)入openfeign版本的問題
這篇文章主要介紹了使用SpringBoot項目導(dǎo)入openfeign版本的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03