欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ?Broker如何保存消息源碼解析

 更新時間:2023年05月09日 11:45:55   作者:林師傅  
這篇文章主要為大家介紹了RocketMQ源碼分析Broker如何保存消息詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

前言

前面我們介紹了RocketMQ是如何接收消息的,下面我們來介紹Broker是如何保存消息的。

消息存儲格式總覽

Broker消息存儲主要包括CommitLog,ConsumerQueue和Index三個部分。

  • CommitLog

CommitLog主要用于消息存儲,所有topic的消息按順序都存儲在CommitLog中。

  • ConsumerQueue

ConsumerQueue對應消費隊列,消息存儲到CommitLog后,會異步轉發(fā)到ConsumerQueue文件中

  • Index

消息索引,只要存儲消息key與offset的關系

CommitLog介紹

CommitLog是消息和消息數(shù)據存儲的主體,CommitLog存儲的文件目錄在${user.home}/store/commitlog中,它其實是一個目錄,消息并不是直接存儲在CommitLog中,而是存儲在由20位數(shù)字構成的文件中。

MappedFile詳解

commitlog文件夾中文件單元是MappedFile,我們可以把MappedFile理解成一個文件管理的工具,如果需要將數(shù)據存儲到磁盤,或者快速查找數(shù)據,都可以通過MappedFile。

每個MappedFile文件大小默認是1GB,文件名是由20位數(shù)字構成,文件名其實是MappedFile的起始偏移量,如果偏移量不足20位,則將偏移量的左邊補0。上圖中MappedFile的文件名是00000000000000000000,它代表的是CommitLog中的第一個文件,由于每個MappedFile文件大小是1GB,因此第二個文件的偏移量為1024*1024*1024(1GB),計算后的結果為1073741824,因此第二個文件的文件名為00000000001073741824,可依此類推其他文件的文件名。

消息存儲格式介紹

消息在commitLog中存儲的格式如下所示

  • totalSize

消息總長度,4字節(jié)

  • magicCode

魔數(shù),4字節(jié),固定值十六進制是0xdaa320a7,10進制是-875286124

  • bodyCRC

消息體crc校驗碼,4字節(jié)

  • queueId

消息隊列id,4字節(jié)

  • flag

消息標記,RocketMQ不做處理,默認4字節(jié)

  • queueOffset

消息在ConsumeQueue文件中的物理偏移量,默認8字節(jié)

  • physicalOffset

消息在CommitLog文件中的物理偏移量,默認8字節(jié)

  • sysFlag

消息系統(tǒng)標記,例如是否壓縮、是否是事務消息等,4字節(jié)

  • bornTimestamp

消息生產者調用消息API的時間戳,8字節(jié)

  • bornHost

BORNHOST 消息生產者IP和端口號,8字節(jié)

  • storeTimestamp

消息存儲時間戳,8字節(jié)

  • storeHostAddress

STOREHOSTADDRESS 消息存儲Broker的IP和端口號,8字節(jié)

  • reconsumeTimes

消息重試次數(shù) 4字節(jié)

  • Prepared Transaction Offset

事務消息偏移量,8字節(jié)

  • bodyLength

消息體長度,4字節(jié)

  • body

消息體內容,它是變長的,長度為bodyLength中存儲的值

  • TopicLength

topicLength表示topic占用的長度,topicLength占用1字節(jié),也就是255,也就是說topic長度最長不能超過255字節(jié)

  • Topic

topic是消息主題名稱,topic是變長的,實際占用topicLength字節(jié)

  • PropertiesLength

propertiesLength表示properties占用的長度,propertiesLength占用2字節(jié),也就是說properties長度最長不超過65536字節(jié)

  • Properties

properties是消息屬性,properties是變長的,實際占用propertiesLength字節(jié)

DefaultMessageStore介紹

Broker保存消息是通過消息存儲默認實現(xiàn)類org.apache.rocketmq.store.DefaultMessageStore執(zhí)行的,它是Broker存儲模塊中最最最重要的一個類,提供了很多存儲文件的API。DefaultMessageStore中和消息存儲相關的屬性如下所示,

// 消息存儲配置
private final MessageStoreConfig messageStoreConfig;
// CommitLog文件的存儲實現(xiàn)類
private final CommitLog commitLog;
// 消息隊列存儲緩存表,key是topic
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// MappedFile分配服務
private final AllocateMappedFileService allocateMappedFileService;
// 直接內存暫存池
private final TransientStorePool transientStorePool;
// broker狀態(tài)管理器
private final BrokerStatsManager brokerStatsManager;
// 鎖文件
// 目錄: ${user.home}/store/lock
private RandomAccessFile lockFile;

消息存儲源碼分析

發(fā)送消息存儲流程

發(fā)送消息存儲的入口函數(shù)是DefaultMessageStore#asyncPutMessage,它主要分為下面三步

  • 存儲狀態(tài)校驗
  • 校驗消息存儲服務是否關閉,當前Broker是否是從節(jié)點,queue是否可寫
  • 消息校驗
  • 校驗topic名稱長度是否超過了127字節(jié)和property長度是否超過了32767
  • 將消息保存到commitLog
// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    // 1. 存儲狀態(tài)校驗
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    // 2. 校驗topic名稱和property長度
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }
    // ...
    long beginTime = this.getSystemClock().now();
    // 3. 保存到commitLog
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    //...
    return putResultFuture;
}

CommitLog#asyncPutMessage保存消息

CommitLog#asyncPutMessage保存消息可以分為三個階段

  • 消息預處理階段
  • 消息保存階段
  • 消息保存結果處理階段

消息預處理階段

消息預處理階段可以分為下面三個步驟

  • 設置消息存儲時間戳和消息體CSC32信息
  • 如果是延遲消息,則設置延遲信息

如果是非事務消息或者是提交的事務消息,并且設置了消息的延遲級別,說明當前消息是延遲消息,Broker在處理延遲消息時會將消息投遞到名為SCHEDULE_TOPIC_XXXX的Topic。在消息預處理的階段,會先將當前消息的topic設置為SCHEDULE_TOPIC_XXXX,queueId設置為延遲級別-1,并且將原來的Topic和queueId設置到消息的REAL_TOPICREAL_QID屬性中。

  • 設置ip及構建存儲消息上下文
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 1. 設置消息存儲時間戳和消息體CSC32信息
    msg.setStoreTimestamp(System.currentTimeMillis());     // 設置消息存儲時間
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));					 // 設置消息體CRC32校驗值
    // 2. 如果是非事務消息,或者是事務提交消息,判斷是否是是否是延遲消息,如果是延遲消息則設置延遲相關信息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        // 如果延遲級別>0,說明是延遲消息
        if (msg.getDelayTimeLevel() > 0) {
            // 如果大于最大的延遲級別,則取最大的延遲級別
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 消息topic改成延遲消息topic(SCHEDULE_TOPIC_XXXX)
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 延遲topic的queueId:延遲級別-1
            int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            // 消息屬性中設置真實的QueueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 把SCHEDULE_TOPIC_XXXX設置為當前消息的topic,消息先投遞到這個隊列中
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
  	// 3. 設置ip并構建存儲消息上下文信息
    msg.setBornHostV6Flag(); // 如果producer的ip是IpV6,則設置生產者IpV6 flag
    msg.setStoreHostAddressV6Flag(); // 如果如果broker的ip是IpV6,則設置BrokerIpV6 flag
    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    // 構建存消息上下文
    PutMessageContext putMessageContext = new PutMessageContext(/*key值:topic-queueId*/generateKey(putMessageThreadLocal.getKeyBuilder()/*StringBuilder*/, msg));
  	// ... 省略部分代碼
}

消息保存階段

消息保存階段可以分為如下步驟

  • 獲取消息保存鎖
  • 獲取最新的mappedFile

獲取MappedFile調用的是MappedFileQueue中的方法,獲取最新的MappedFile

  • 如果最新的mappedFile為空或者已經滿了,則創(chuàng)建新的MappedFile
  • 將消息保存的mappedFile中
  • 處理消息保存結果
  • 釋放消息保存鎖
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // ... 省略部分代碼
  	// 1. 消息保存鎖,默認是ReentrantLock互斥鎖
    putMessageLock.lock(); 
    try {
        // 2. 獲取最新的mappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // 3. 如果獲取到的mappedFile是null說明之前沒有存儲消息
        // 如果mappedFile滿了,說明需要創(chuàng)建一個新的MappedFile
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
        }
				// 如果創(chuàng)建mappedFile失敗,則返回異常信息
        if (null == mappedFile) {
            // 創(chuàng)建mappedFile失敗
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 4. 將消息保存的mappedFile中
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        // 5. 處理消息保存結果
      	switch (result.getStatus()) {
            case PUT_OK:
                break;
            // mappedFile滿了,重新創(chuàng)建mappedFile后再寫入消息
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // 創(chuàng)建一個新的文件,然后重新寫入
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
								//...
     						// 寫消息
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                break;
            // ...
        }
    } finally {
      	// 6. 釋放鎖
        putMessageLock.unlock();
    }
		// ... 省略部分代碼
}

上面第4步MappedFile#appendMessage邏輯主要有三步

  • 獲取當前寫文件位置

如果寫指針小于文件大小,則對消息進行追加處理

  • 獲取寫緩沖

  • 調用AppendMessageCallback的doAppend將消息寫到內存緩沖中

回調函數(shù)doAppend方法分為單條處理邏輯和批量消息處理邏輯,下面僅展示了單條消息處理邏輯

  • 消息保存完成后會更新當前寫文件的位置和消息保存時間戳
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    // 獲取當前寫文件位置
    int currentPos = this.wrotePosition.get();
    // 如果寫文件位置小于文件size
    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 如果是單條消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件長度-當前寫位置,可以寫的長度*/,(MessageExtBrokerInner) messageExt, putMessageContext);
        } 
        //...
        // 更新當前寫文件位置和消息保存時間戳
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
}

上面保存消息回調函數(shù)中的doAppend實際調用的是CommitLog中內部類DefaultAppendMessageCallback的doAppend方法,這里大致可以分為下面幾個步驟

  • 獲取消息物理偏移量,并且創(chuàng)建消息id生成器,從topicQueueTable中獲取Queue的最大相對便宜量。

消息id的格式如下所示,它由ip,端口和消息偏移量公共構成,長度是16字節(jié),為了保證消息的可讀性,返回給應用程序的Id轉成了字符串。

消息id這么設計的原因是可以根據消息id快速找到broker的IP,端口,以及消息在的物理偏移量,通過它可以快速找到消息

  • 如果消息長度加上消息結束符(8字節(jié))大于maxBlank,則表示該mappedFile已經沒有足夠的空間保存該消息了,那么就會將消息結束符寫入緩沖中,并返回END_OF_FILE,mappedFile消息結束符如下所示

  • 如果空間足夠,將queue的相對偏移量,物理偏移量,sysflag,消息創(chuàng)建時間,消息創(chuàng)建ip,消息保存時間及消息體等按照上面消息格式保存到緩沖中。
  • 創(chuàng)建AppendMessageResult對象并返回,它包括消息追加狀態(tài)、消息寫入物理偏移量、消息寫入長度、消息ID生成器、消息開始追加的時間戳、消息隊列偏移量、消息開始寫入的時間戳等屬性。
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset/*消息文件起始偏移量*/, final ByteBuffer byteBuffer, final int maxBlank/*文件可寫長度*/,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    // 1. 物理offset,文件起始offset+寫offset
    long wroteOffset = fileFromOffset + byteBuffer.position();
    // 創(chuàng)建消息id supplier
    Supplier<String> msgIdSupplier = () -> {
        int sysflag = msgInner.getSysFlag();
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        return UtilAll.bytes2string(msgIdBuffer.array());
    };
    // topic-ququeId
    String key = putMessageContext.getTopicQueueTableKey();
    // 獲取消息queue offset
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    // 如果queueOffset是null,則將其置0
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }
    // 獲取寫緩沖
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    final int msgLen = preEncodeBuffer.getInt(0);
    // 2. 判斷空間是否足夠,如果剩余空間不足,則保存TOTAL+MAGICCODE之后,返回BLANK_MAGIC_CODE
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        this.msgStoreItemMemory.clear();
        // 1 TOTALSIZE 寫消息總長度
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE 寫魔數(shù)
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
        return new AppendMessageResult(/*...*/);
    }
    int pos = 4/*totalSize*/ + 4/*magicCode*/ + 4/*bodyCRC*/ + 4/*queueId*/ + 4/*flag*/;
    // set隊列的offset,
    preEncodeBuffer.putLong(pos, queueOffset);
    pos += 8;
    // 設置物理offset: 文件起始offset+當前文件寫消息的offset
    preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
    int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    // set 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
    pos += 8 + 4 + 8 + ipLen;
    // 設置存儲消息ip地址
    preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
    // 寫消息到隊列緩沖
    byteBuffer.put(preEncodeBuffer);
    msgInner.setEncodedBuff(null);
  	// 4. 返回消息保存結果
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    return result;
}

消息保存結果處理階段

消息保存結果處理階段主要包括下面三個

  • 提交刷盤請求

如果是同步刷盤,則會創(chuàng)建刷盤請求并返回CompleteFuture,如果是異步刷盤,則會喚醒刷盤服務,然后返回消息保存成功的CompleteFuture

  • 提交消息復制請求

如果是同步復制,則創(chuàng)建消息同步請求然后返回CompleteFuture,如果是異步復制則直接放回消息保存成功的CompleteFuture

  • 合并提交刷盤請求和提交消息復制請求

CompleteFuture#thenCombine是將兩個CompleteFuture(提交刷盤請求,提交消息復制請求)組合起來,等提交刷盤請求和提交消息復制請求都執(zhí)行完了之后再執(zhí)行后續(xù)任務

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
		// ... 省略部分代碼
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    // 1. 提交刷盤請求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 2. 提交復制請求
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 3. 合并提交刷盤請求和提交復制請求結果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

總結

消息保存到commitLog實際上是保存到byteBuffer中,消息是在回調結果時根據配置決定同步/異步刷盤以及同步/異步同步到從節(jié)點。消息在這個階段也并不會將消息分發(fā)到comsumeQueue以及Index中。

以上就是RocketMQ | 源碼分析】Broker是如何保存消息的?的詳細內容,更多關于RocketMQ Broker保存消息的資料請關注腳本之家其它相關文章!

相關文章

  • SpringBoot結合Redis配置工具類實現(xiàn)動態(tài)切換庫

    SpringBoot結合Redis配置工具類實現(xiàn)動態(tài)切換庫

    本文主要介紹了SpringBoot結合Redis配置工具類實現(xiàn)動態(tài)切換庫,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-08-08
  • Java+Springboot搭建一個在線網盤文件分享系統(tǒng)

    Java+Springboot搭建一個在線網盤文件分享系統(tǒng)

    本主要介紹了通過springboot+freemark+jpa+MySQL實現(xiàn)的在線網盤文件分享系統(tǒng),其功能跟百度網盤非常類似,可以實現(xiàn)文件的上傳、移動、復制、下載等,需要的可以參考一下
    2021-11-11
  • Java設計模式七大原則之單一職責原則詳解

    Java設計模式七大原則之單一職責原則詳解

    單一職責原則(Single Responsibility Principle, SRP),有且僅有一個原因引起類的變更。簡單來說,就是針對一個java類,它應該只負責一項職責。本文將詳細介紹一下Java設計模式七大原則之一的單一職責原則,需要的可以參考一下
    2022-02-02
  • java基礎理論Stream的Filter與謂詞邏輯

    java基礎理論Stream的Filter與謂詞邏輯

    這篇文章主要為大家介紹了java基礎理論Stream的Filter與謂詞邏輯,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-03-03
  • Java通過PropertyDescriptor反射調用set和get方法

    Java通過PropertyDescriptor反射調用set和get方法

    這篇文章主要為大家詳細介紹了Java通過PropertyDescriptor反射調用set和get方法,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-03-03
  • 基于java中泛型的總結分析

    基于java中泛型的總結分析

    本篇文章介紹了,在java中泛型的總結分析。需要的朋友參考下
    2013-05-05
  • Java實現(xiàn)作業(yè)調度的示例代碼

    Java實現(xiàn)作業(yè)調度的示例代碼

    這篇文章主要為大家詳細介紹了如何利用Java實現(xiàn)SJF算法調度,要求測試數(shù)據可以隨即輸入或從文件中讀入,文中的示例代碼講解詳細,需要的可以參考一下
    2023-04-04
  • SpringMVC和Spring的配置文件掃描包詳解

    SpringMVC和Spring的配置文件掃描包詳解

    這篇文章主要介紹了SpringMVC和Spring的配置文件掃描包,本文給大家介紹的非常詳細,具有一定的參考借鑒價值 ,需要的朋友可以參考下
    2019-05-05
  • java8中:: 用法示例(JDK8雙冒號用法)

    java8中:: 用法示例(JDK8雙冒號用法)

    這篇文章主要給大家介紹了關于java8 中的:: 用法(JDK8雙冒號用法)的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用java8具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧
    2019-09-09
  • mybatis-plus雪花算法生成Id使用詳解

    mybatis-plus雪花算法生成Id使用詳解

    本文主要介紹了mybatis-plus雪花算法生成Id使用詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07

最新評論