rocketmq-streams的ILeaseService使用示例詳解
序
本文主要研究一下rocketmq-streams的ILeaseService
ILeaseService
/** * 通過(guò)db實(shí)現(xiàn)租約和鎖,可以更輕量級(jí),減少其他中間件的依賴(lài) 使用主備場(chǎng)景,只有一個(gè)實(shí)例運(yùn)行,當(dāng)當(dāng)前實(shí)例掛掉,在一定時(shí)間內(nèi),會(huì)被其他實(shí)例接手 也可以用于全局鎖 */ public interface ILeaseService { /** * 默認(rèn)鎖定時(shí)間 */ static final int DEFALUT_LOCK_TIME = 60 * 5; /** * 檢查某用戶當(dāng)前時(shí)間是否具有租約。這個(gè)方法是純內(nèi)存操作,無(wú)性能開(kāi)銷(xiāo) * * @return true,租約有效;false,租約無(wú)效 */ boolean hasLease(String name); /** * 申請(qǐng)租約,會(huì)啟動(dòng)一個(gè)線程,不停申請(qǐng)租約,直到申請(qǐng)成功。 申請(qǐng)成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對(duì)方租約失效,后才允許新的租戶獲取租約 * * @param name 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 */ void startLeaseTask(String name); /** * 申請(qǐng)租約,會(huì)啟動(dòng)一個(gè)線程,不停申請(qǐng)租約,直到申請(qǐng)成功。 申請(qǐng)成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對(duì)方租約失效,后才允許新的租戶獲取租約 * * @param name 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 * @param callback 當(dāng)?shù)谝猾@取租約時(shí),回調(diào)此函數(shù) */ void startLeaseTask(final String name, ILeaseGetCallback callback); /** * 申請(qǐng)租約,會(huì)啟動(dòng)一個(gè)線程,不停申請(qǐng)租約,直到申請(qǐng)成功。 申請(qǐng)成功后,每 租期/2 續(xù)約。 如果目前被其他租戶獲取租約,只有在對(duì)方租約失效,后才允許新的租戶獲取租約 * * @param name 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 * @param leaseTermSecond 租期,在租期內(nèi)可以做業(yè)務(wù)處理,單位是秒 * @param callback 當(dāng)?shù)谝猾@取租約時(shí),回調(diào)此函數(shù) */ void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback); /** * 申請(qǐng)鎖,無(wú)論成功與否,立刻返回。如果不釋放,最大鎖定時(shí)間是5分鐘 * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @return 是否枷鎖成功 */ boolean lock(String name, String lockerName); /** * 申請(qǐng)鎖,無(wú)論成功與否,立刻返回。默認(rèn)鎖定時(shí)間是5分鐘 * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @param lockTimeSecond 如果不釋放,鎖定的最大時(shí)間,單位是秒 * @return 是否枷鎖成功 * @return */ boolean lock(String name, String lockerName, int lockTimeSecond); /** * 申請(qǐng)鎖,如果沒(méi)有則等待,等待時(shí)間可以指定,如果是-1 則無(wú)限等待。如果不釋放,最大鎖定時(shí)間是5分鐘 * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @param waitTime 沒(méi)獲取鎖時(shí),最大等待多長(zhǎng)時(shí)間,如果是-1 則無(wú)限等待 * @return 是否枷鎖成功 */ boolean tryLocker(String name, String lockerName, long waitTime); /** * 申請(qǐng)鎖,如果沒(méi)有則等待,等待時(shí)間可以指定,如果是-1 則無(wú)限等待。如果不釋放,最大鎖定時(shí)間是lockTimeSecond * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @param waitTime 沒(méi)獲取鎖時(shí),最大等待多長(zhǎng)時(shí)間,如果是-1 則無(wú)限等待 * @param lockTimeSecond 如果不釋放,鎖定的最大時(shí)間,單位是秒 * @return 是否枷鎖成功 */ boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond); /** * 釋放鎖 * * @param name * @param lockerName * @return */ boolean unlock(String name, String lockerName); /** * 對(duì)于已經(jīng)獲取鎖的,可以通過(guò)這個(gè)方法,一直持有鎖。 和租約的區(qū)別是,當(dāng)釋放鎖后,無(wú)其他實(shí)例搶占。無(wú)法實(shí)現(xiàn)主備模式 * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @param lockTimeSecond 租期,這個(gè)方法會(huì)自動(dòng)續(xù)約,如果不主動(dòng)釋放,會(huì)一直持有鎖 * @return 是否成功獲取鎖 */ boolean holdLock(String name, String lockerName, int lockTimeSecond); /** * 是否持有鎖,不會(huì)申請(qǐng)鎖。如果以前申請(qǐng)過(guò),且未過(guò)期,返回true,否則返回false * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @return */ boolean hasHoldLock(String name, String lockerName); List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix); }
- ILeaseService接口定義了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix方法
BasedLesaseImpl
public abstract class BasedLesaseImpl implements ILeaseService { private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class); private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_"; private static final AtomicBoolean syncStart = new AtomicBoolean(false); private static final int synTime = 120; // 5分鐘的一致性hash同步時(shí)間太久了,改為2分鐘 protected ScheduledExecutorService taskExecutor = null; protected int leaseTerm = 300 * 2; // 租約時(shí)間 // protected transient JDBCDriver jdbcDataSource = null; protected ILeaseStorage leaseStorage; protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); // 每個(gè)lease name對(duì)應(yīng)的租約到期時(shí)間 public BasedLesaseImpl() { taskExecutor = new ScheduledThreadPoolExecutor(10); } /** * lease_name: consistent_hash_ip, lease_user_ip: ip,定時(shí)刷新lease_info表,檢查一致性hash環(huán)的節(jié)點(diǎn)情況 * * @param name * @return */ @Override public boolean hasLease(String name) { // 內(nèi)存中沒(méi)有租約信息則表示 沒(méi)有租約 Date leaseEndTime = leaseName2Date.get(name); if (leaseEndTime == null) { // LOG.info("內(nèi)存中根據(jù) " + name + "沒(méi)有查詢(xún)到租約信息,表示沒(méi)有租約"); return false; } // LOG.info("查詢(xún)是否有租約 name:" + name + " ,當(dāng)前時(shí)間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) // + " 租約到期時(shí)間 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime)); // 有租約時(shí)間,并且租約時(shí)間大于當(dāng)前時(shí)間,表示有租約信息 if (new Date().before(leaseEndTime)) { return true; } return false; } private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>(); @Override public void startLeaseTask(final String name) { startLeaseTask(name, this.leaseTerm, null); } @Override public void startLeaseTask(final String name, ILeaseGetCallback callback) { startLeaseTask(name, this.leaseTerm, callback); } @Override public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) { ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback); startLeaseTask(name, applyTask, leaseTerm / 2, true); } /** * 啟動(dòng)定時(shí)器,定時(shí)執(zhí)行任務(wù),確保任務(wù)可重入 * * @param name * @param runnable 具體任務(wù) * @param scheduleTime 調(diào)度時(shí)間 * @param startNow 是否立刻啟動(dòng)一次 */ protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) { AtomicBoolean isStartLease = startLeaseMap.get(name);//多次調(diào)用,只啟動(dòng)一次定時(shí)任務(wù) if (isStartLease == null) { synchronized (this) { isStartLease = startLeaseMap.get(name); if (isStartLease == null) { isStartLease = new AtomicBoolean(false); startLeaseMap.put(name, isStartLease); } } } if (isStartLease.compareAndSet(false, true)) { if (startNow) { runnable.run(); } taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS); } } //...... }
- BasedLesaseImpl聲明實(shí)現(xiàn)了ILeaseService,它依賴(lài)ILeaseStorage,startLeaseTask方法會(huì)創(chuàng)建ApplyTask,然后以固定間隔調(diào)度執(zhí)行
ApplyTask
/** * 續(xù)約任務(wù) */ protected class ApplyTask implements Runnable { protected String name; protected int leaseTerm; protected ILeaseGetCallback callback; public ApplyTask(int leaseTerm, String name) { this(leaseTerm, name, null); } public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) { this.name = name; this.leaseTerm = leaseTerm; this.callback = callback; } @Override public void run() { try { // LOG.info("LeaseServiceImpl name: " + name + "開(kāi)始獲取租約..."); AtomicBoolean newApplyLease = new AtomicBoolean(false); Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease); if (leaseDate != null) { leaseName2Date.put(name, leaseDate); LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 獲取租約成功, 租約到期時(shí)間為 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate)); } else { // fix.2020.08.13 這時(shí)name對(duì)應(yīng)的租約可能還在有效期內(nèi),或者本機(jī)還持有租約,需要remove // leaseName2Date.remove(name); LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約失敗 "); } if (newApplyLease.get() && callback != null) { callback.callback(leaseDate); } } catch (Exception e) { LOG.error(" LeaseServiceImpl name: " + name + " " + getSelfUser() + " 獲取租約出現(xiàn)異常 ", e); } } } /** * 申請(qǐng)租約,如果當(dāng)期租約有效,直接更新一個(gè)租約周期,如果當(dāng)前租約無(wú)效,先查詢(xún)是否有有效的租約,如果有申請(qǐng)失敗,否則直接申請(qǐng)租約 */ protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) { // 計(jì)算下一次租約時(shí)間 = 當(dāng)前時(shí)間 + 租約時(shí)長(zhǎng) Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm); // 1 如果已經(jīng)有租約,則更新租約時(shí)間(內(nèi)存和數(shù)據(jù)庫(kù))即可 if (hasLease(name)) { // LOG.info("用戶已有租約,更新數(shù)據(jù)庫(kù)和內(nèi)存中的租約信息"); // 更新數(shù)據(jù)庫(kù) LeaseInfo leaseInfo = queryValidateLease(name); if (leaseInfo == null) { LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null"); return null; } // fix.2020.08.13,與本機(jī)ip相等且滿足一致性hash分配策略,才續(xù)約,其他情況為null String leaseUserIp = leaseInfo.getLeaseUserIp(); if (!leaseUserIp.equals(getSelfUser())) { return null; } leaseInfo.setLeaseEndDate(nextLeaseDate); updateLeaseInfo(leaseInfo); return nextLeaseDate; } // 2 沒(méi)有租約情況 判斷是否可以獲取租約,只要租約沒(méi)有被其他人獲取,則說(shuō)明有有效租約 boolean success = canGetLease(name); if (!success) { // 表示被其他機(jī)器獲取到了有效的租約 // LOG.info("其他機(jī)器獲取到了有效的租約"); return null; } // 3 沒(méi)有租約而且可以獲取租約的情況,則嘗試使用數(shù)據(jù)庫(kù)原子更新的方式獲取租約,保證只有一臺(tái)機(jī)器成功獲取租約,而且可以運(yùn)行 boolean flag = tryGetLease(name, nextLeaseDate); if (flag) { // 獲取租約成功 newApplyLease.set(true); return nextLeaseDate; } return null; }
- ApplyTask內(nèi)部調(diào)用applyLeaseTask,如果已有租約則更新租約時(shí)間,沒(méi)有租約則判斷是否可以獲取租約,可以則執(zhí)行tryGetLease
tryGetLease
/** * 更新數(shù)據(jù)庫(kù),占用租期并更新租期時(shí)間 * * @param time */ protected boolean tryGetLease(String name, Date time) { // LOG.info("嘗試獲取租約 lease name is : " + name + " 下次到期時(shí)間: " // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time)); LeaseInfo validateLeaseInfo = queryValidateLease(name); if (validateLeaseInfo == null) {// 這里有兩種情況 1 數(shù)據(jù)庫(kù)里面沒(méi)有租約信息 2 數(shù)據(jù)庫(kù)里面有租約信息但是已經(jīng)過(guò)期 Integer count = countLeaseInfo(name); if (count == null || count == 0) {// 表示現(xiàn)在數(shù)據(jù)庫(kù)里面沒(méi)有任何租約信息,插入租約成功則表示獲取成功,失敗表示在這一時(shí)刻其他機(jī)器獲取了租約 // LOG.info("數(shù)據(jù)庫(kù)中暫時(shí)沒(méi)有租約信息,嘗試原子插入租約:" + name); // fix.2020.08.13,經(jīng)過(guò)一致性hash計(jì)算,該名字的任務(wù)不應(yīng)該在本機(jī)執(zhí)行,直接返回,無(wú)需插入。只有分配到hash執(zhí)行權(quán)限的機(jī)器才可以插入并獲取租約 if (!getSelfUser().equals(getConsistentHashHost(name))) { return false; } validateLeaseInfo = new LeaseInfo(); validateLeaseInfo.setLeaseName(name); validateLeaseInfo.setLeaseUserIp(getSelfUser()); validateLeaseInfo.setLeaseEndDate(time); validateLeaseInfo.setStatus(1); validateLeaseInfo.setVersion(1); if (insert(validateLeaseInfo)) { LOG.info("數(shù)據(jù)庫(kù)中暫時(shí)沒(méi)有租約信息,原子插入成功,獲取租約成功:" + name); return true; } else { LOG.info("數(shù)據(jù)庫(kù)中暫時(shí)沒(méi)有租約信息,原子插入失敗,已經(jīng)被其他機(jī)器獲取租約:" + name); return false; } } else { // 表示數(shù)據(jù)庫(kù)里面有一條但是無(wú)效,這里需要兩臺(tái)機(jī)器按照version進(jìn)行原子更新,更新成功的獲取租約 // LOG.info("數(shù)據(jù)庫(kù)中有一條無(wú)效的租約信息,嘗試根據(jù)版本號(hào)去原子更新租約信息:" + name); LeaseInfo inValidateLeaseInfo = queryInValidateLease(name); if (inValidateLeaseInfo == null) {// 說(shuō)明這個(gè)時(shí)候另外一臺(tái)機(jī)器獲取成功了 LOG.info("另外一臺(tái)機(jī)器獲取成功了租約:" + name); return false; } // fix.2020.08.13,機(jī)器重啟之后,該名字的任務(wù)已經(jīng)不分配在此機(jī)器上執(zhí)行,直接返回,無(wú)需更新數(shù)據(jù)庫(kù) if (!getSelfUser().equals(getConsistentHashHost(name))) { return false; } inValidateLeaseInfo.setLeaseName(name); inValidateLeaseInfo.setLeaseUserIp(getSelfUser()); inValidateLeaseInfo.setLeaseEndDate(time); inValidateLeaseInfo.setStatus(1); boolean success = updateDBLeaseInfo(inValidateLeaseInfo); if (success) { LOG.info("LeaseServiceImpl 原子更新租約成功,當(dāng)前機(jī)器獲取到了租約信息:" + name); } else { LOG.info("LeaseServiceImpl 原子更新租約失敗,租約被其他機(jī)器獲取:" + name); } return success; } } else { // 判斷是否是自己獲取了租約,如果是自己獲取了租約則更新時(shí)間(內(nèi)存和數(shù)據(jù)庫(kù)), // 這里是為了解決機(jī)器重啟的情況,機(jī)器重啟,內(nèi)存中沒(méi)有租約信息,但是實(shí)際上該用戶是有租約權(quán)限的 // fix.2020.08.13,租約的ip與本機(jī)ip相等,且滿足一致性hash策略,才會(huì)被本機(jī)執(zhí)行 String leaseUserIp = validateLeaseInfo.getLeaseUserIp(); if (leaseUserIp.equals(getSelfUser())) { // 如果當(dāng)期用戶有租約信息,則更新數(shù)據(jù)庫(kù) validateLeaseInfo.setLeaseEndDate(time); boolean hasUpdate = updateLeaseInfo(validateLeaseInfo); if (hasUpdate) { LOG.info( "LeaseServiceImpl機(jī)器重啟情況,當(dāng)前用戶有租約信息,并且更新數(shù)據(jù)庫(kù)成功,租約信息為 name :" + validateLeaseInfo.getLeaseName() + " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期時(shí)間 : " + new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate())); return true; } else { LOG.info("LeaseServiceImpl 機(jī)器重啟情況,當(dāng)前用戶有租約信息,并且更新數(shù)據(jù)庫(kù)失敗,表示失去租約:" + name); return false; } } // LOG.info("LeaseServiceImpl 租約被其他機(jī)器獲取,租約信息為 name :" + validateLeaseInfo.getLeaseName() + " ip : " // + validateLeaseInfo.getLeaseUserIp() + " 到期時(shí)間 : " // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate())); return false; } } protected LeaseInfo queryValidateLease(String name) { //String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()"; //// LOG.info("LeaseServiceImpl query validate lease sql:" + sql); //return queryLease(name, sql); return leaseStorage.queryValidateLease(name); }
- tryGetLease先通過(guò)queryValidateLease查詢(xún)租約信息,若沒(méi)有租約則插入,若過(guò)期則根據(jù)版本號(hào)更新,若已有租約則判斷是否是自己獲取了租約,是則更新租約信息
LeaseServiceImpl
public class LeaseServiceImpl extends BasedLesaseImpl { private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class); private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap(); protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>(); //如果是搶占鎖狀態(tài)中,則不允許申請(qǐng)鎖 public LeaseServiceImpl() { super(); } /** * 嘗試獲取鎖,可以等待waitTime,如果到點(diǎn)未返回,則直接返回。如果是-1,則一直等待 * * @param name 業(yè)務(wù)名稱(chēng) * @param lockerName 鎖名稱(chēng) * @param waitTime 等待時(shí)間,是微秒單位 * @return */ @Override public boolean tryLocker(String name, String lockerName, long waitTime) { return tryLocker(name, lockerName, waitTime, ILeaseService.DEFALUT_LOCK_TIME); } @Override public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) { long now = System.currentTimeMillis(); boolean success = lock(name, lockerName, lockTimeSecond); while (!success) { if (waitTime > -1 && (System.currentTimeMillis() - now > waitTime)) { break; } success = lock(name, lockerName, lockTimeSecond); if (success) { return success; } try { Thread.sleep(100); } catch (InterruptedException e) { LOG.error("LeaseServiceImpl try locker error", e); } } return success; } @Override public boolean lock(String name, String lockerName) { return lock(name, lockerName, ILeaseService.DEFALUT_LOCK_TIME); } @Override public boolean lock(String name, String lockerName, int leaseSecond) { lockerName = createLockName(name, lockerName); Future future = seizeLockingFuntures.get(lockerName); if (future != null && ((HoldLockFunture)future).isDone == false) { return false; } Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseSecond);// 默認(rèn)鎖定5分鐘,用完需要立刻釋放.如果時(shí)間不同步,可能導(dǎo)致鎖失敗 return tryGetLease(lockerName, nextLeaseDate); } @Override public boolean unlock(String name, String lockerName) { // LOG.info("LeaseServiceImpl unlock,name:" + name); lockerName = createLockName(name, lockerName); LeaseInfo validateLeaseInfo = queryValidateLease(lockerName); if (validateLeaseInfo == null) { LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName); } if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) { validateLeaseInfo.setStatus(0); updateDBLeaseInfo(validateLeaseInfo); } HoldLockTask holdLockTask = holdLockTasks.remove(lockerName); if (holdLockTask != null) { holdLockTask.close(); } leaseName2Date.remove(lockerName); return false; } /** * 如果有鎖,則一直持有,如果不能獲取,則結(jié)束。和租約不同,租約是沒(méi)有也會(huì)嘗試重試,一備對(duì)方掛機(jī),自己可以接手工作 * * @param name * @param secondeName * @param lockTimeSecond 獲取鎖的時(shí)間 * @return */ @Override public boolean holdLock(String name, String secondeName, int lockTimeSecond) { if (hasHoldLock(name, secondeName)) { return true; } synchronized (this) { if (hasHoldLock(name, secondeName)) { return true; } String lockerName = createLockName(name, secondeName); Date nextLeaseDate = DateUtil.addSecond(new Date(), lockTimeSecond); boolean success = tryGetLease(lockerName, nextLeaseDate);// 申請(qǐng)鎖,鎖的時(shí)間是leaseTerm if (!success) { return false; } leaseName2Date.put(lockerName, nextLeaseDate); if (!holdLockTasks.containsKey(lockerName)) { HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this); holdLockTask.start(); holdLockTasks.putIfAbsent(lockerName, holdLockTask); } } return true; } /** * 是否持有鎖,不訪問(wèn)數(shù)據(jù)庫(kù),直接看本地 * * @param name * @param secondeName * @return */ @Override public boolean hasHoldLock(String name, String secondeName) { String lockerName = createLockName(name, secondeName); return hasLease(lockerName); } @Override public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) { String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix); return queryValidateLeaseByNamePrefix(leaseNamePrefix); } //...... }
- LeaseServiceImpl繼承了BasedLesaseImpl,tryLocker方法會(huì)根據(jù)等待時(shí)間循環(huán)執(zhí)行l(wèi)ock,lock方法則執(zhí)行tryGetLease,unlock方法則更新租約信息,同時(shí)移除內(nèi)存記錄;holdLock則通過(guò)hasHoldLock判斷是否持有鎖,若有則返回,沒(méi)有則執(zhí)行tryGetLease
ILeaseStorage
public interface ILeaseStorage { /** * 更新lease info,需要是原子操作,存儲(chǔ)保障多線程操作的原子性 * * @param leaseInfo 租約表數(shù)據(jù) * @return */ boolean updateLeaseInfo(LeaseInfo leaseInfo); /** * 統(tǒng)計(jì)這個(gè)租約名稱(chēng)下,LeaseInfo對(duì)象個(gè)數(shù) * * @param leaseName 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 * @return */ Integer countLeaseInfo(String leaseName); /** * 查詢(xún)無(wú)效的的租約 * * @param leaseName 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 * @return */ LeaseInfo queryInValidateLease(String leaseName); /** * 查詢(xún)有效的的租約 * * @param leaseName 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 * @return */ LeaseInfo queryValidateLease(String leaseName); /** * 按前綴查詢(xún)有效的租約信息 * * @param namePrefix * @return */ List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix); /** * 增加租約 * * @param leaseInfo 租約名稱(chēng),無(wú)特殊要求,相同名稱(chēng)會(huì)競(jìng)爭(zhēng)租約 */ void addLeaseInfo(LeaseInfo leaseInfo); }
- ILeaseStorage接口定義了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo方法
DBLeaseStorage
public class DBLeaseStorage implements ILeaseStorage { private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class); protected JDBCDriver jdbcDataSource; private String url; protected String userName; protected String password; protected String jdbc; public DBLeaseStorage(String jdbc, String url, String userName, String password) { this.jdbc = jdbc; this.url = url; this.userName = userName; this.password = password; jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password); } @Override public boolean updateLeaseInfo(LeaseInfo leaseInfo) { String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()"; String whereSQL = " WHERE id=#{id} and version=#{version}"; if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) { sql += ",lease_name=#{leaseName}"; } if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) { sql += ",lease_user_ip=#{leaseUserIp}"; } if (leaseInfo.getLeaseEndDate() != null) { sql += ",lease_end_time=#{leaseEndDate}"; } sql += whereSQL; sql = SQLUtil.parseIbatisSQL(leaseInfo, sql); try { int count = getOrCreateJDBCDataSource().update(sql); boolean success = count > 0; if (success) { synchronized (this) { leaseInfo.setVersion(leaseInfo.getVersion() + 1); } } else { System.out.println(count); } return success; } catch (Exception e) { LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e); throw new RuntimeException("execute sql error " + sql, e); } } @Override public Integer countLeaseInfo(String leaseName) { String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name = '" + leaseName + "' and status = 1"; try { List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql); if (rows == null || rows.size() == 0) { return null; } Long value = (Long) rows.get(0).get("c"); return value.intValue(); } catch (Exception e) { throw new RuntimeException("execute sql error " + sql, e); } } @Override public LeaseInfo queryInValidateLease(String leaseName) { String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'"; LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql); return queryLease(leaseName, sql); } @Override public LeaseInfo queryValidateLease(String leaseName) { String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()"; return queryLease(leaseName, sql); } @Override public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) { String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()"; try { List<LeaseInfo> leaseInfos = new ArrayList<>(); List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql); if (rows == null || rows.size() == 0) { return null; } for (Map<String, Object> row : rows) { LeaseInfo leaseInfo = convert(row); leaseInfos.add(leaseInfo); } return leaseInfos; } catch (Exception e) { throw new RuntimeException("execute sql error " + sql, e); } } @Override public void addLeaseInfo(LeaseInfo leaseInfo) { String sql = " REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)" + " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())"; sql = SQLUtil.parseIbatisSQL(leaseInfo, sql); try { getOrCreateJDBCDataSource().execute(sql); } catch (Exception e) { LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e); throw new RuntimeException("execute sql error " + sql, e); } } protected JDBCDriver getOrCreateJDBCDataSource() { if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) { synchronized (this) { if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) { this.jdbcDataSource = DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password); } } } return jdbcDataSource; } protected LeaseInfo queryLease(String name, String sql) { try { List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql); if (rows == null || rows.size() == 0) { return null; } return convert(rows.get(0)); } catch (Exception e) { throw new RuntimeException("execute sql error " + sql, e); } } protected LeaseInfo convert(Map<String, Object> map) { LeaseInfo leaseInfo = new LeaseInfo(); leaseInfo.setId(getMapLongValue("id", map)); leaseInfo.setCreateTime(getMapDateValue("gmt_create", map)); leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map)); leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class)); leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class)); Integer status = getMapValue("status", map, Integer.class); if (status != null) { leaseInfo.setStatus(status); } leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map)); Long version = getMapLongValue("version", map); if (version != null) { leaseInfo.setVersion(version); } return leaseInfo; } @SuppressWarnings("unchecked") private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) { Object value = map.get(fieldName); if (value == null) { return null; } return (T) value; } private Long getMapLongValue(String fieldName, Map<String, Object> map) { Object value = map.get(fieldName); if (value == null) { return null; } if (value instanceof Long) { return (Long) value; } if (value instanceof BigInteger) { return ((BigInteger) value).longValue(); } return null; } private Date getMapDateValue(String fieldName, Map<String, Object> map) { Object value = map.get(fieldName); if (value == null) { return null; } if (value instanceof Date) { return (Date) value; } if (value instanceof String) { return DateUtil.parseTime(((String) value)); } return null; } }
- DBLeaseStorage實(shí)現(xiàn)了ILeaseStorage接口,使用jdbc實(shí)現(xiàn)了其方法
小結(jié)
rocketmq-streams的LeaseService基于db實(shí)現(xiàn)了租約和鎖,可用于主備場(chǎng)景切換。
以上就是rocketmq-streams的ILeaseService使用示例詳解的詳細(xì)內(nèi)容,更多關(guān)于rocketmq streams ILeaseService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 關(guān)于springboot使用rocketmq?RocketMQMessageListener參數(shù)問(wèn)題
- RocketMQ?offset確認(rèn)機(jī)制示例詳解
- rocketmq的AclClientRPCHook權(quán)限控制使用技巧示例詳解
- springboot集成RocketMQ過(guò)程及使用示例詳解
- RocketMQ?源碼分析Broker消息刷盤(pán)服務(wù)
- RocketMQ?Broker消息如何刷盤(pán)源碼解析
- RocketMQMessageListener注解對(duì)rocketmq消息的消費(fèi)實(shí)現(xiàn)機(jī)制
相關(guān)文章
解決docker報(bào)錯(cuò):docker:invalid?reference?format.
在導(dǎo)入鏡像的時(shí)候出現(xiàn)問(wèn)題:invalid reference format,這里總結(jié)下,這篇文章主要給大家介紹了關(guān)于解決docker報(bào)錯(cuò):docker:invalid?reference?format的相關(guān)資料,需要的朋友可以參考下2024-01-01docker如何安裝帶postgis插件的postgresql數(shù)據(jù)庫(kù)
這篇文章主要介紹了docker如何安裝帶postgis插件的postgresql數(shù)據(jù)庫(kù)問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12使用TLS加密通訊遠(yuǎn)程連接Docker的示例詳解
這篇文章主要介紹了使用TLS加密通訊遠(yuǎn)程連接Docker的示例詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12詳解Docker私有倉(cāng)庫(kù)最簡(jiǎn)便的搭建方法
本篇文章主要介紹了Docker私有倉(cāng)庫(kù)最簡(jiǎn)便的搭建方法,具有一定的參考價(jià)值,有興趣的可以了解一下。2017-02-02詳解在Docker容器內(nèi)外互相拷貝數(shù)據(jù)的方法
本篇文章主要介紹了詳解在Docker容器內(nèi)外互相拷貝數(shù)據(jù)的方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下。2017-03-03在Docker中構(gòu)建并運(yùn)行Nginx容器的完整教程
Docker 作為一種強(qiáng)大的容器化平臺(tái),使得開(kāi)發(fā)、測(cè)試和部署變得更加高效和靈活,Nginx 是一款廣泛使用的高性能 Web 服務(wù)器和反向代理服務(wù)器,適用于各種場(chǎng)景,在本教程中,我們將詳細(xì)介紹如何在 Docker 中構(gòu)建并運(yùn)行一個(gè) Nginx 容器,需要的朋友可以參考下2024-09-09Docker安裝配置ES(elasticsearch)步驟詳解
這篇文章主要給大家介紹了關(guān)于Docker安裝配置ES(elasticsearch)的相關(guān)資料,本文主要介紹了如何在指定目錄下安裝ES和Kibana,以及如何用IK分詞器進(jìn)行分詞,需要的朋友可以參考下2024-10-10云原生自動(dòng)化應(yīng)用于docker倉(cāng)庫(kù)私有憑據(jù)secret創(chuàng)建
這篇文章主要為大家介紹了云原生自動(dòng)化應(yīng)用于docker倉(cāng)庫(kù)私有憑據(jù)secret創(chuàng)建,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03