java客戶端Etcd官方倉庫jetcd中KeepAlive接口實現(xiàn)
前言
Etcd的Java客戶端有很多開源實現(xiàn),Jetcd是Etcd官方倉庫的Java客戶端,整體api接口設(shè)計實現(xiàn)和官方go客戶端類似,簡潔易用。其中,租期續(xù)約的接口提供了兩個分別是keepAliveOnce和keepAlive。功能如其名,keepAliveOnce是單次續(xù)約的接口,如果要保持租約,需要手動觸發(fā)這個接口,所以這個接口基本不用。而keepAlive是自動續(xù)約?;畹慕涌?。大多數(shù)場景下,使用keepAlive即可,但是針對不同的場景,我們還需要考慮幾個問題,如租約ttl的設(shè)置,以及keepAlive異常時的處理。
Jetcd項目地址:https://github.com/etcd-io/jetcd
背景問題
我們有一個基于mysql的binlog訂閱數(shù)據(jù)變更的應(yīng)用,線上有非常重要的應(yīng)用基于這個服務(wù),因為存在單點故障,后面使用了jetcd的lock + keepAlive的機(jī)制實現(xiàn)了主備服務(wù)秒級切換的功能,具體參見《高可用架構(gòu)etcd選主故障主備秒級切換實現(xiàn)》,系統(tǒng)上線運行后發(fā)現(xiàn),binlog的服務(wù)經(jīng)常切換發(fā)生主備切換,而實際情況是,binlog的服務(wù)非常穩(wěn)定,在沒有上線主備切換服務(wù)前,從來沒有發(fā)生過線上binlog服務(wù)宕掉的情況。最后查明問題出在了租約TTL的設(shè)置上面。這里先拋出問題和定位,下面先看下Jetcd的keepAlive具體實現(xiàn),然后在分析為什么導(dǎo)致這個問題。
KeepAlive實現(xiàn)
先看下keepAlive的用法
private long acquireActiveLease() throws InterruptedException, ExecutionException { long leaseId = leaseClient.grant(leaseTTL).get().getID(); logger.debug("LeaderSelector get leaseId:[{}] and ttl:[{}]", leaseId, leaseTTL); this.leaseCloser = leaseClient.keepAlive(leaseId, new StreamObserver() { @Override public void onNext(LeaseKeepAliveResponse value) { logger.debug("LeaderSelector lease keeps alive for [{}]s:", value.getTTL()); } @Override public void onError(Throwable t) { logger.debug("LeaderSelector lease renewal Exception!", t.fillInStackTrace()); cancelTask(); } @Override public void onCompleted() { logger.info("LeaderSelector lease renewal completed! start canceling task."); cancelTask(); } }); return leaseId; }
租約實現(xiàn)都在LeaseImpl類里,通過EtcdClient拿到LeaseImpl實例后,首先通過grant方法設(shè)置ttl拿到租約的id,然后將租約作為入?yún)⒄{(diào)用keepAlive方法,第二個入?yún)⑹且粋€觀察者對象,內(nèi)置了三個接口,分別是onNext:確定下一次租約續(xù)約時間后觸發(fā),onError:續(xù)約異常時觸發(fā),onCompleted:租約過期后觸發(fā)。
keepAlive方法代碼:
public synchronized CloseableClient keepAlive(long leaseId, StreamObserverobserver) { if (this.closed) { throw newClosedLeaseClientException(); } KeepAlive keepAlive = this.keepAlives.computeIfAbsent(leaseId, (key) -> new KeepAlive(leaseId)); keepAlive.addObserver(observer); if (!this.hasKeepAliveServiceStarted) { this.hasKeepAliveServiceStarted = true; this.start(); } return new CloseableClient() { @Override public void close() { keepAlive.removeObserver(observer); } }; }
LeaseImpl內(nèi)部維護(hù)了一個以LeaseId為key,KeepAlive對象為value的map,KeepAlive的類中維護(hù)了一個StreamObserver集合,到期時間deadLine,下次續(xù)約時間nextKeepAlive和續(xù)約leaseId。第一次調(diào)用keepAlive方法時會觸發(fā)start,啟動續(xù)約的線程(sendKeepAliveExecutor())和檢查是否過期的線程(deadLineExecutor())。
private void sendKeepAliveExecutor() { this.keepAliveResponseObserver = Observers.observer( response -> processKeepAliveResponse(response), error -> processOnError() ); this.keepAliveRequestObserver = this.leaseStub.leaseKeepAlive(this.keepAliveResponseObserver); this.keepAliveFuture = scheduledExecutorService.scheduleAtFixedRate( () -> { // send keep alive req to the leases whose next keep alive is before now. this.keepAlives.entrySet().stream() .filter(entry -> entry.getValue().getNextKeepAlive() < System.currentTimeMillis()) .map(Entry::getKey) .map(leaseId -> LeaseKeepAliveRequest.newBuilder().setID(leaseId).build()) .forEach(keepAliveRequestObserver::onNext); }, 0, 500, TimeUnit.MILLISECONDS ); }
sendKeepAliveExecutor方法是整個keepAlive功能實現(xiàn)的核心,這個方法在LeaseImpl實例里只會被觸發(fā)一次,開啟了一個時間間隔為500毫秒的的定時任務(wù)調(diào)度。每次從keepAlives中篩選出nextkeepAlive時間小于當(dāng)前時間的KeepAlive對象,觸發(fā)續(xù)約。
nextkeepAlive初始化值就是創(chuàng)建KeepAlive實例時的當(dāng)前時間,然后在續(xù)約的響應(yīng)流觀察者實例中,執(zhí)行了processKeepAliveResponse方法,在這個里面維護(hù)了KeepAlive對象的nextkeepAlive。
private synchronized void processKeepAliveResponse(io.etcd.jetcd.api.LeaseKeepAliveResponse leaseKeepAliveResponse) { if (this.closed) { return; } final long leaseID = leaseKeepAliveResponse.getID(); final long ttl = leaseKeepAliveResponse.getTTL(); final KeepAlive ka = this.keepAlives.get(leaseID); if (ka == null) { // return if the corresponding keep alive has closed. return; } if (ttl > 0) { long nextKeepAlive = System.currentTimeMillis() + ttl * 1000 / 3; ka.setNextKeepAlive(nextKeepAlive); ka.setDeadLine(System.currentTimeMillis() + ttl * 1000); ka.onNext(leaseKeepAliveResponse); } else { // lease expired; close all keep alive this.removeKeepAlive(leaseID); ka.onError( newEtcdException( ErrorCode.NOT_FOUND, "etcdserver: requested lease not found" ) ); } }
可以看到,在首次續(xù)約后的響應(yīng)處理中,nextKeepAlive被設(shè)置為當(dāng)前時間加上ttl的1/3時間后,也就是說如果我們設(shè)置一個key的過期時間為6s,那么在使用keepAlive時續(xù)期的時間間隔為,每2s執(zhí)行續(xù)約一次。如果ttl小于零,說明key已經(jīng)過期被刪除了,就直接觸發(fā)onError,傳遞了一個requested lease not found的異常對象。
文末小結(jié)
1、回到最上面binlog的主備頻繁切換的問題,由于我們將ttl的時間設(shè)置的過小5s。只要client和etcd 服務(wù)失聯(lián)5s以上,期間可能由于各種原因?qū)е耴eepAlive沒有正常續(xù)約上,就會觸發(fā)主備切換。這個時候binlog服務(wù)本身是沒有任何問題的,卻要因為失去領(lǐng)導(dǎo)權(quán),而選擇自殺。后面將ttl調(diào)整到了20s后,主備切換就沒有那么敏感了。
2、還有一個場景,在將etcd作為服務(wù)注冊中心時,也會使用到keepAlive,即使設(shè)置了ttl為20s,還是有可能沒有續(xù)約上,導(dǎo)致注冊的服務(wù)過期被刪了,這個時候,我們的服務(wù)進(jìn)程還是健康的。這個場景下,需要在onError、onCompleted事件中重新獲取租約以及添加新的KeepAlive。
以上就是java客戶端Etcd官方倉庫jetcd中KeepAlive接口實現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于jetcd中的KeepAlive實現(xiàn)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java的StringBuilder在高性能場景下的正確用法
StringBuilder?對字符串的操作是直接改變字符串對象本身,而不是生成新的對象,所以新能開銷小.與StringBuffer相比StringBuilder的性能略高,StringBuilder則沒有保證線程的安全,從而性能略高于StringBuffer,需要的朋友可以參考下2023-05-05Spring解決循環(huán)依賴問題的三種方法小結(jié)
在 Spring 中,循環(huán)依賴問題指的是兩個或多個 bean 之間相互依賴形成的閉環(huán),具體而言,當(dāng) bean A 依賴于 bean B,同時 bean B 也依賴于 bean A,就形成了循環(huán)依賴,本文就給大家介紹了Spring解決循環(huán)依賴問題的三種方法,需要的朋友可以參考下2023-09-09Spring中@Configuration注解和@Component注解的區(qū)別詳解
這篇文章主要介紹了Spring中@Configuration注解和@Component注解的區(qū)別詳解,@Configuration 和 @Component 到底有何區(qū)別呢?我先通過如下一個案例,在不分析源碼的情況下,小伙伴們先來直觀感受一下這兩個之間的區(qū)別,需要的朋友可以參考下2023-09-09Java基礎(chǔ)之finally語句與return語句詳解
這篇文章主要介紹了Java基礎(chǔ)之finally語句與return語句詳解,文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java基礎(chǔ)的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04java實現(xiàn)解析二進(jìn)制文件的方法(字符串、圖片)
本篇文章主要介紹了java實現(xiàn)解析二進(jìn)制文件的方法(字符串、圖片),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-02-02java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比
這篇文章主要介紹了java 實現(xiàn)字節(jié)流和字節(jié)緩沖流讀寫文件時間對比,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01