Java RPC框架過(guò)濾器機(jī)制原理解析
過(guò)濾器
字面義上理解的過(guò)濾器類似下圖,從一堆物品中篩選出符合條件的留下,不符合的丟棄。
GOF 職責(zé)鏈
GOF中有一種設(shè)計(jì)模式叫職責(zé)鏈,或者叫責(zé)任鏈,常規(guī)的UML圖如下:
正統(tǒng)的職責(zé)鏈?zhǔn)菍⒁粋€(gè)請(qǐng)求發(fā)給第一個(gè)接收者,接收者判斷是否屬于自己能處理的,如果能處理則執(zhí)行操作并中止請(qǐng)求下發(fā),流程到此為止。如果不能處理則將請(qǐng)求下發(fā)給下一個(gè)接收者一直到最后一個(gè)接收者。
變體職責(zé)鏈
上面提到正統(tǒng)的職責(zé)鏈有一個(gè)特點(diǎn):當(dāng)找到符合條件的執(zhí)行者后流程中止并不會(huì)將請(qǐng)求繼續(xù)下發(fā)給后續(xù)的執(zhí)行者。這類場(chǎng)景比較適合比如審核操作,一個(gè)報(bào)銷單由主管,經(jīng)理,CTO一級(jí)一級(jí)的審批不能越權(quán)。
解耦合/細(xì)分
在實(shí)際項(xiàng)目中,往往會(huì)遇到非常復(fù)雜的業(yè)務(wù)場(chǎng)景,有可能是需要執(zhí)行的方法特別多,也有可能是因?yàn)樾枰獔?zhí)行的方法有可能事先不知道,需要在運(yùn)行時(shí)才能判斷。如果將這些邏輯全部寫在一個(gè)類或者一個(gè)方法中就會(huì)出現(xiàn)這樣的問(wèn)題:
耦合度高,一個(gè)方法或者一個(gè)類需要關(guān)聯(lián)所有業(yè)務(wù)方法以及相關(guān)類不易擴(kuò)展,需要執(zhí)行的業(yè)務(wù)經(jīng)常發(fā)生變化,如果每次變化都去修改統(tǒng)一的方法或者類,不符合開閉原則,維護(hù)成本非常高
所以就有了下圖,一堆需要執(zhí)行的方法發(fā)給第一個(gè)接收者,接收者判斷哪些方法是自己可以執(zhí)行的,有執(zhí)行的就執(zhí)行,然后無(wú)論是否有可執(zhí)行的方法在處理完成后都將請(qǐng)求繼續(xù)下發(fā)給后面的接收者。每個(gè)接收者完成自己負(fù)責(zé)的內(nèi)容,多個(gè)接收者完成了復(fù)雜任務(wù)的分解。
RPC 過(guò)濾器
RPC過(guò)濾器與Spring MVC中的Filter作用基本相同,其中很大一個(gè)作用就是動(dòng)態(tài)的給客戶端或者是服務(wù)端增加切面功能,比如:
權(quán)限控制加密解密訪問(wèn)日志限流控制并發(fā)控制......
下圖是我在RPC項(xiàng)目中實(shí)現(xiàn)過(guò)濾器機(jī)制的UML示意圖
RpcFilter
定義一個(gè)過(guò)濾器接口,只包含一個(gè)invoke方法。
public interface RpcFilter<T> { <T> T invoke(RpcInvoker invoker, RpcInvocation invocation); }
RpcInvoker
這是RPC客戶端以及服務(wù)端執(zhí)行服務(wù)端方法或者是將客戶端請(qǐng)求發(fā)送給服務(wù)端時(shí)需要調(diào)用的方法接口。
這個(gè)角色在Netty中也可以叫Handle,這個(gè)接口與上面的RpcFilter有點(diǎn)類似,只是在RPC框架中體現(xiàn)的角色不同而已,具體看UML圖可知道兩者關(guān)系。
public interface RpcInvoker { Object invoke(RpcInvocation invocation); }
RpcServerInvoker
服務(wù)端的一個(gè)執(zhí)行者實(shí)現(xiàn),包含兩個(gè)核心功能:
構(gòu)建職責(zé)鏈this.filterMap,是通過(guò)注解獲取到的一組過(guò)濾器,此處不詳細(xì)講因?yàn)榕c本文關(guān)系不大RpcInvoker的invoker方法實(shí)際調(diào)用的是RpcFilter的方法,并將自身實(shí)例傳遞給RpcFilter,目的是構(gòu)建職責(zé)鏈的關(guān)聯(lián)關(guān)系
public RpcInvoker buildInvokerChain(final RpcInvoker invoker) { RpcInvoker last = invoker; List<RpcFilter> filters = Lists.newArrayList(this.filterMap.values()); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final RpcFilter filter = filters.get(i); final RpcInvoker next = last; last = new RpcInvoker() { @Override public Object invoke(RpcInvocation invocation) { return filter.invoke(next, invocation); } }; } } return last; }
此處并沒有考慮職責(zé)鏈的排序,可以通過(guò)過(guò)濾器的注解上增加排序數(shù)字來(lái)解決。目前我寫的過(guò)濾器注解中并沒有實(shí)現(xiàn)排序功能,可以增加一個(gè)order的屬性,然后在需要指定順序的過(guò)濾器上增加對(duì)應(yīng)屬性值來(lái)支持。
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface ActiveFilter { String[] group() default {}; String[] value() default {}; }
發(fā)請(qǐng)求給職責(zé)鏈
服務(wù)端在讀取到客戶端的請(qǐng)求后,首先通過(guò)構(gòu)建職責(zé)鏈得到RpcInvoker,然后調(diào)用RpcInvoker的invoke方法將請(qǐng)求下發(fā)。
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcMessage message) { this.executor.execute(new Runnable() { @Override public void run() { RpcInvoker rpcInvoker=.... RpcResponse response=(RpcResponse) rpcInvoker.invoke...... }
過(guò)濾器案例
一般HTTP請(qǐng)求在不同網(wǎng)絡(luò)角色中處理請(qǐng)求的能力會(huì)呈現(xiàn)為一個(gè)漏斗型,越上層職責(zé)越輕,越往下層職責(zé)越重,所對(duì)應(yīng)的就是越上層處理請(qǐng)求量越大,越下層處理請(qǐng)求量越小。比如負(fù)載均衡器只負(fù)責(zé)請(qǐng)求轉(zhuǎn)發(fā)而不負(fù)責(zé)具體的任務(wù)執(zhí)行,而后端的Service服務(wù)器會(huì)執(zhí)行大量的IO操作或者是消耗cpu的計(jì)算任務(wù)等,所以這兩者在處理請(qǐng)求的量上往往是數(shù)量級(jí)的。
當(dāng)出現(xiàn)大量請(qǐng)求時(shí),為了有效的保護(hù)后端服務(wù)的穩(wěn)定性(盡量不出現(xiàn)宕機(jī)),除了橫向擴(kuò)展服務(wù)器外還可以通過(guò)一些軟件手段緩解后端服務(wù)的壓力,這就是通常說(shuō)的限流,本文因?yàn)樾枰?jiǎn)單實(shí)現(xiàn)一個(gè)限制的過(guò)濾器,所以直接引用現(xiàn)成的限流算法:令牌桶。
下面是客戶端請(qǐng)求限流的一個(gè)簡(jiǎn)單實(shí)現(xiàn),客戶端在給服務(wù)端發(fā)起請(qǐng)求之前需要獲取令牌,如果獲取到則發(fā)送請(qǐng)求,如果獲取不到一直等待。當(dāng)然為了防止死鎖,可以調(diào)用帶超時(shí)時(shí)間的獲取令牌方法。
@ActiveFilter(group = {ConstantConfig.CONSUMER}) public class AccessLimitFilter implements RpcFilter { private final static Logger logger = LoggerFactory.getLogger(AccessLimitFilter.class); @Override public Object invoke(RpcInvoker invoker, RpcInvocation invocation) { logger.info("before acquire"); AccessLimitManager.acquire(); Object rpcResponse=invoker.invoke(invocation); logger.info("after acquire"); return rpcResponse; } static class AccessLimitManager{ private final static RateLimiter rateLimiter=RateLimiter.create(2); public static void acquire(){ rateLimiter.acquire(); } } }
實(shí)現(xiàn)的比較粗糙,桶的大小是寫死的,應(yīng)該實(shí)現(xiàn)為可配置型,后續(xù)抽空完善下。
本文源碼
https://github.com/jiangmin168168/jim-framework
文中代碼是依賴上述項(xiàng)目的,如果有不明白的可下載源碼
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Mybatis使用JSONObject接收數(shù)據(jù)庫(kù)查詢的方法
這篇文章主要介紹了Mybatis使用JSONObject接收數(shù)據(jù)庫(kù)查詢,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-12-12java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片
線程調(diào)度器和時(shí)間分片是多線程編程和操作系統(tǒng)設(shè)計(jì)中的核心概念,本文主要介紹了java實(shí)現(xiàn)線程調(diào)度器和時(shí)間分片,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-10-10記一次springboot服務(wù)凌晨無(wú)故宕機(jī)問(wèn)題的解決
這篇文章主要介紹了記一次springboot服務(wù)凌晨無(wú)故宕機(jī)問(wèn)題的解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-09-09Java基于IDEA實(shí)現(xiàn)qq郵件發(fā)送小程序
這篇文章主要介紹了Java基于IDEA實(shí)現(xiàn)qq郵件發(fā)送小程序功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09Spring MVC登錄注冊(cè)以及轉(zhuǎn)換json數(shù)據(jù)
本文主要介紹了Spring MVC登錄注冊(cè)以及轉(zhuǎn)換json數(shù)據(jù)的相關(guān)知識(shí)。具有很好的參考價(jià)值。下面跟著小編一起來(lái)看下吧2017-04-04Java編程實(shí)現(xiàn)提取文章中關(guān)鍵字的方法
這篇文章主要介紹了Java編程實(shí)現(xiàn)提取文章中關(guān)鍵字的方法,較為詳細(xì)的分析了Java提取文章關(guān)鍵字的原理與具體實(shí)現(xiàn)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-11-11