RocketMQ設計之同步刷盤
同步刷盤方式:在返回寫成功狀態(tài)時,消息已經(jīng)被寫入磁盤。具體流程是,消息寫入內(nèi)存的PAGECACHE后,立刻通知刷盤線程刷盤,然后等待刷盤完成,刷盤線程執(zhí)行完成后喚醒等待的線程,返回消息寫成功的狀態(tài)。

在同步刷盤模式下,當消息寫到內(nèi)存后,會等待數(shù)據(jù)寫到磁盤的CommitLog文件。
CommitLog的handleDiskFlush方法:
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
? ? // Synchronization flush
? ? if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
? ? ? ? final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
? ? ? ? if (messageExt.isWaitStoreMsgOK()) {
? ? ? ? ? ? GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
? ? ? ? ? ? service.putRequest(request);
? ? ? ? ? ? boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
? ? ? ? ? ? ? ? ? ? + " client address: " + messageExt.getBornHostString());
? ? ? ? ? ? ? ? putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? service.wakeup();
? ? ? ? }
? ? }
? ? // Asynchronous flush
? ? else {
? ? ? ? if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
? ? ? ? ? ? flushCommitLogService.wakeup();
? ? ? ? } else {
? ? ? ? ? ? commitLogService.wakeup();
? ? ? ? }
? ? }
}
class GroupCommitService extends FlushCommitLogService {
? ? ? ? private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
? ? ? ? private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
? ? ?? ?//提交刷盤任務到任務列表
? ? ? ? public synchronized void putRequest(final GroupCommitRequest request) {
? ? ? ? ? ? synchronized (this.requestsWrite) {
? ? ? ? ? ? ? ? this.requestsWrite.add(request);
? ? ? ? ? ? }
? ? ? ? ? ? if (hasNotified.compareAndSet(false, true)) {
? ? ? ? ? ? ? ? waitPoint.countDown(); // notify
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? private void swapRequests() {
? ? ? ? ? ? List<GroupCommitRequest> tmp = this.requestsWrite;
? ? ? ? ? ? this.requestsWrite = this.requestsRead;
? ? ? ? ? ? this.requestsRead = tmp;
? ? ? ? }
? ? ? ? private void doCommit() {
? ? ? ? ? ? synchronized (this.requestsRead) {
? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) {
? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) {
? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of
? ? ? ? ? ? ? ? ? ? ? ? // two times the flush
? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false;
? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? this.requestsRead.clear();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it
? ? ? ? ? ? ? ? ? ? // will come to this process
? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? public void run() {
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service started");
? ? ? ? ? ? while (!this.isStopped()) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? this.waitForRunning(10);
? ? ? ? ? ? ? ? ? ? this.doCommit();
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // Under normal circumstances shutdown, wait for the arrival of the
? ? ? ? ? ? // request, and then flush
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(10);
? ? ? ? ? ? } catch (InterruptedException e) {
? ? ? ? ? ? ? ? CommitLog.log.warn("GroupCommitService Exception, ", e);
? ? ? ? ? ? }
? ? ? ? ? ? synchronized (this) {
? ? ? ? ? ? ? ? this.swapRequests();
? ? ? ? ? ? }
? ? ? ? ? ? this.doCommit();
? ? ? ? ? ? CommitLog.log.info(this.getServiceName() + " service end");
? ? ? ? }
? ? ? ? @Override
? ? ? ? protected void onWaitEnd() {
? ? ? ? ? ? this.swapRequests();
? ? ? ? }
? ? ? ? @Override
? ? ? ? public String getServiceName() {
? ? ? ? ? ? return GroupCommitService.class.getSimpleName();
? ? ? ? }
? ? ? ? @Override
? ? ? ? public long getJointime() {
? ? ? ? ? ? return 1000 * 60 * 5;
? ? ? ? }
? ? }GroupCommitRequest是刷盤任務,提交刷盤任務后,會在刷盤隊列中等待刷盤,而刷盤線程
GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。之所以不直接寫是磁盤io壓力大,寫入性能低,每隔10毫秒寫一次可以提升磁盤io效率和寫入性能。
- putRequest(request) 提交刷盤任務到任務列表
- request.waitForFlush同步等待
GroupCommitService將任務列表中的任務刷盤完成。
兩個隊列讀寫分離,requestsWrite是寫隊列,用戶保存添加進來的刷盤任務,requestsRead是讀隊列,在刷盤之前會把寫隊列的數(shù)據(jù)放入讀隊列。
CommitLog的doCommit方法:
private void doCommit() {
? ? ? ? ? ? synchronized (this.requestsRead) {
? ? ? ? ? ? ? ? if (!this.requestsRead.isEmpty()) {
? ? ? ? ? ? ? ? ? ? for (GroupCommitRequest req : this.requestsRead) {
? ? ? ? ? ? ? ? ? ? ? ? // There may be a message in the next file, so a maximum of
? ? ? ? ? ? ? ? ? ? ? ? // two times the flush
? ? ? ? ? ? ? ? ? ? ? ? boolean flushOK = false;
? ? ? ? ? ? ? ? ? ? ? ? for (int i = 0; i < 2 && !flushOK; i++) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? //根據(jù)offset確定是否已經(jīng)刷盤
? ? ? ? ? ? ? ? ? ? ? ? ? ? flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
? ? ? ? ? ? ? ? ? ? ? ? ? ? if (!flushOK) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? ? ? req.wakeupCustomer(flushOK);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
? ? ? ? ? ? ? ? ? ? if (storeTimestamp > 0) {
? ? ? ? ? ? ? ? ? ? ? ? CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
? ? ? ? ? ? ? ? ? ? }
?? ??? ??? ??? ??? ?//清空已刷盤的列表
? ? ? ? ? ? ? ? ? ? this.requestsRead.clear();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // Because of individual messages is set to not sync flush, it
? ? ? ? ? ? ? ? ? ? // will come to this process
? ? ? ? ? ? ? ? ? ? CommitLog.this.mappedFileQueue.flush(0);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }- 刷盤的時候依次讀取
requestsRead中的數(shù)據(jù)寫入磁盤, - 寫入完成后清空
requestsRead。
讀寫分離設計的目的是在刷盤時不影響任務提交到列表。
CommitLog.this.mappedFileQueue.flush(0);是刷盤操作:
public boolean flush(final int flushLeastPages) {
? ? boolean result = true;
? ? MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
? ? if (mappedFile != null) {
? ? ? ? long tmpTimeStamp = mappedFile.getStoreTimestamp();
? ? ? ? int offset = mappedFile.flush(flushLeastPages);
? ? ? ? long where = mappedFile.getFileFromOffset() + offset;
? ? ? ? result = where == this.flushedWhere;
? ? ? ? this.flushedWhere = where;
? ? ? ? if (0 == flushLeastPages) {
? ? ? ? ? ? this.storeTimestamp = tmpTimeStamp;
? ? ? ? }
? ? }
? ? return result;
}通過MappedFile映射的CommitLog文件寫入磁盤
這就是RocketMQ高可用設計之同步刷盤的基本情況了,大體思路就是一個讀寫分離的隊列來刷盤,同步刷盤任務提交后會在刷盤隊列中等待刷盤完成后再返回,而GroupCommitService每隔10毫秒寫一批數(shù)據(jù)到磁盤。
到此這篇關(guān)于RocketMQ設計之同步刷盤的文章就介紹到這了,更多相關(guān)RocketMQ同步刷盤內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis?saveAndUpdate空值不更新問題及解決
這篇文章主要介紹了Mybatis?saveAndUpdate空值不更新問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02
淺談Java中Collection和Collections的區(qū)別
下面小編就為大家?guī)硪黄獪\談Java中Collection和Collections的區(qū)別。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-08-08
Spring 中jdbcTemplate 實現(xiàn)執(zhí)行多條sql語句示例
本篇文章主要介紹了Spring 中jdbcTemplate 實現(xiàn)執(zhí)行多條sql語句示例,可以對多個表執(zhí)行多個sql語句,有興趣的可以了解一下。2017-01-01
深入了解Java中Cookie和Session的區(qū)別
會話跟蹤是Web程序中常用的技術(shù),用來跟蹤用戶的整個會話,常用的會話跟蹤技術(shù)是Cookie與Session,本文就詳細的介紹一下Java中Cookie和Session的區(qū)別,感興趣的可以了解一下2023-06-06

