Apache Omid TSO 組件源碼實現(xiàn)原理解析
Apache Omid TSO 組件實現(xiàn)原理
作用
獨立進程,處理全局事務(wù)之間的并發(fā)沖突。
流程
TSOChannelHandler#channelRead -> AbstractRequestProcessor -> PersistenceProcessorHandler
總體流程
thread1 TSOChannelHandler#channelRead AbstractRequestProcessor#timestampRequest 接收 client 請求,創(chuàng)建 RequestEvent 并 publish thread2 AbstractRequestProcessor#onEvent 處理 RequestEvent 請求 AbstractRequestProcessor#handleRequest PersistenceProcessorImpl#addTimestampToBatch 創(chuàng)建 PersistEvent,當(dāng) batch 滿了發(fā)送事件 thread3 PersistenceProcessorHandler#onEvent 持久化事件處理
類
TSOChannelHandler
繼承自 Netty 的 ChannelInboundHandlerAdapter,用于處理 TSO 的入站請求
。
channelRead
委托 requestProcessor 創(chuàng)建 timestampRequest 和 commitRequest 請求事件。
AbstractRequestProcessor
處理 timestamp 和 commit 事件。
onEvent
處理 RequestEvent 事件,按照事件類型派發(fā)給 handleTimestamp 和 handleCommit 方法進行處理。
handleTimestamp
1.通過 timestampOracle 獲取下一個時間戳;
2.PersistenceProcessorImpl#addBatch 事件添加到 batch,但是后續(xù)對 timestamp 請求不會額外處理。
handleCommit
主要通過 hasConflictsWithCommittedTransactions 判斷 writeSet 和 CommitHashMap 里是否有事務(wù)寫沖突,如果沒有則可以提交事務(wù),分配 commitTimestamp。
private void handleCommit(RequestEvent event) throws Exception { long startTimestamp = event.getStartTimestamp(); // startTimestamp Iterable<Long> writeSet = event.writeSet(); // 寫入集,存儲的是 cellIds Collection<Long> tableIdSet = event.getTableIdSet(); boolean isCommitRetry = event.isCommitRetry(); boolean nonEmptyWriteSet = writeSet.iterator().hasNext(); // 檢查寫集合是否為空,即事務(wù)是否有寫操作 if (startTimestamp > lowWatermark && !hasConflictsWithFences(startTimestamp, tableIdSet) && !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) { // 檢查事務(wù)是否滿足提交條件,通過 hasConflictsWithCommittedTransactions 判斷是否有事務(wù)寫沖突 // 可以進行事務(wù)提交 long commitTimestamp = timestampOracle.next(); // 獲取提交時間戳 Optional<Long> forwardNewWaterMark = Optional.absent(); if (nonEmptyWriteSet) { // 寫集合非空 long newLowWatermark = lowWatermark; for (long r : writeSet) { // 遍歷寫集合中的每個元素,更新其最新的寫入時間戳,并計算新的低水位線 long removed = hashmap.putLatestWriteForCell(r, commitTimestamp); // 更新 cellId 對應(yīng)的 commitTimestamp, 返回之前的 oldest commitTimestamp newLowWatermark = Math.max(removed, newLowWatermark); // 更新低水位線 } if (newLowWatermark != lowWatermark) { // 更新低水位線 lowWatermark = newLowWatermark; forwardNewWaterMark = Optional.of(lowWatermark); } } forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark); // 持久化 commit 請求 } else { // 事務(wù)不滿足提交條件 if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying forwardCommitRetry(startTimestamp, c, event.getMonCtx()); // 若是提交重試,再次檢查是否已提交以避免因響應(yīng)延遲導(dǎo)致的重復(fù)提交 } else { forwardAbort(startTimestamp, c, event.getMonCtx()); // 否則,中止事務(wù) } } }
CommitHashMap
通過 LongCache 緩存 cellId -> lastCommittedTimestamp 的映射。
getLatestWriteForCell 方法:
根據(jù) cellId 獲取 lastCommittedTimestamp。
putLatestWriteForCell 方法:
更新 cellId 對應(yīng)的 lastCommittedTimestamp。
LongCache
緩存 cellId -> lastCommittedTimestamp 的映射。
get 和 set 操作都是先將原始 cellId 進行 hash 操作找到位置,所以可能存在沖突。
set
更新 cellId 對應(yīng)的 lastCommittedTimestamp。
public long set(long key, long value) { final int index = index(key); // cellId 取模返回下標(biāo),可能會沖突 int oldestIndex = 0; long oldestValue = Long.MAX_VALUE; for (int i = 0; i < associativity; ++i) { int currIndex = 2 * (index + i); // 計算 key 下標(biāo) if (cache[currIndex] == key) { // 相同事務(wù) cellId, 替換場景 oldestValue = 0; oldestIndex = currIndex; break; } if (cache[currIndex + 1] <= oldestValue) { // 沒找到相同的key.通過和 oldestValue 比較會將最小的 timestamp 剔除 oldestValue = cache[currIndex + 1]; oldestIndex = currIndex; } } // 替換最舊的鍵值對,將其更新為新的鍵值對 cache[oldestIndex] = key; cache[oldestIndex + 1] = value; return oldestValue; }
get
獲取 cellId 對應(yīng)的 lastCommittedTimestamp,找不到則返回 0.
public long get(long key) { final int index = index(key); for (int i = 0; i < associativity; ++i) { // associativity 里存儲的元素key應(yīng)該是相同的 int currIndex = 2 * (index + i); // 計算 key 的下標(biāo) if (cache[currIndex] == key) { // 找到 cache key return cache[currIndex + 1]; // 返回對應(yīng)的 value } } return 0; }
PersistenceProcessorImpl
將 startTimestamp 和 commitTimestamp 放入 batch.
addCommitToBatch
創(chuàng)建 event,添加到 current batch 如果 current batch is full triggerCurrentBatchFlush
triggerCurrentBatchFlush
創(chuàng)建 PersistBatchEvent 并發(fā)送事件
PersistenceProcessorHandler
處理上面 PersistenceProcessorImpl 發(fā)送過來的事件,進行持久化處理。
onEvent
實際上只處理 commit 事件,會創(chuàng)建 put 對象將事務(wù)信息持久化到 hbase 的 commitTable (OMID_COMMIT_TABLE).
HBaseCommitTable
構(gòu)造方法: 根據(jù) HBaseCommitTableConfig 配置初始化
到此這篇關(guān)于Apache Omid TSO 組件源碼實現(xiàn)原理的文章就介紹到這了,更多相關(guān)Apache Omid TSO 組件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
ubuntu下的虛擬環(huán)境中安裝Django的操作方法
這篇文章主要介紹了ubuntu下的虛擬環(huán)境中安裝Django的操作方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2019-09-09Linux+php+apache+oracle環(huán)境搭建之CentOS下安裝Oracle數(shù)據(jù)庫
研究了兩天Linux下安裝Oracle,重裝了兩次虛擬機,終于安裝成功。很有收獲的。記錄下安裝過程。大神們?nèi)缬懈玫姆绞?,請?lián)系我!2014-08-08DDNS 的工作原理及其在 Linux 上的實現(xiàn)
DDNS (Dynamic DNS) 擴展了 DNS 將客戶端 IP 與其域名進行靜態(tài)映射的功能,它可以將同一域名實時地解析為不同的動態(tài) IP,而不需要額外的人工干預(yù)2016-09-09