欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)

 更新時間:2022年08月17日 14:07:09   作者:奔跑的毛球  
這篇文章主要為大家介紹了RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

Broker中,事務(wù)消息的初始化是通過BrokerController.initialTransaction()方法執(zhí)行的。

private void initialTransaction() {
    this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);
    if (null == this.transactionalMessageService) {
        this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));
        LOG.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());
    }
    this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);
    if (null == this.transactionalMessageCheckListener) {
        this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();
        LOG.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());
    }
    this.transactionalMessageCheckListener.setBrokerController(this);
    this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}

這里有三個核心的初始化變量

TransactionalMessageService

事務(wù)消息主要處理服務(wù)。默認實現(xiàn)類是TransactionalMessageServiceImpl也可以自己定義事務(wù)消息處理實現(xiàn)類,通過ServiceProvider.loadClass()方法進行加載。

TransactionalMessageService類定義如下。內(nèi)部屬性已加注釋標明。

public interface TransactionalMessageService {
    //用于保存Half事務(wù)消息
    PutMessageResult prepareMessage(MessageExtBrokerInner messageInner);
    CompletableFuture<PutMessageResult> asyncPrepareMessage(MessageExtBrokerInner messageInner);
    //刪除事務(wù)消息
    boolean deletePrepareMessage(MessageExt messageExt);
    //提交事務(wù)消息
    OperationResult commitMessage(EndTransactionRequestHeader requestHeader);
    //回滾事務(wù)消息
    OperationResult rollbackMessage(EndTransactionRequestHeader requestHeader);
    void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener);
    //打開事務(wù)消息
    boolean open();
    //關(guān)閉事務(wù)消息
    void close();
}

transactionalMessageCheckListener

事務(wù)消息回查監(jiān)聽器

transactionalMessageCheckService

事務(wù)消息回查服務(wù),啟動一個線程定時檢查超時的Half消息是否需要回查。

處理事務(wù)消息

當初始化完成之后,Broker就可以處理事務(wù)消息了。

Broker存儲事務(wù)消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,這和普通消息其實是一樣的。

但是有兩點針對事務(wù)消息的特殊處理

第一處:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//獲取擴展字段的值,若是該值為true則為事務(wù)消息
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
boolean sendTransactionPrepareMessage = false;
if (Boolean.parseBoolean(traFlag)
    && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel() > 0)) { 
    //判斷當前Broker配置是否支持事務(wù)消息
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
            "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                + "] sending transaction message is forbidden");
        return response;
    }
    sendTransactionPrepareMessage = true;
}
if (sendTransactionPrepareMessage) {
    //保存Half信息
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}

第二處:

存儲事務(wù)消息前的預(yù)處理,對應(yīng)方法是

org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //將原消息的topic保存在擴展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    //將原消息的QueueId保存在擴展字段中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    //將原消息的SysFlag保存在擴展字段中
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    //修改topic的值為RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    //修改Queueid為0
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

完成上述步驟之后,調(diào)用DefaultMessageStole.putMessage()方法將其保存到CommitLog中。

CommitLog存儲成功之后,通過org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法對其進行處理。

final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
    // Prepared and Rollback message is not consumed, will not enter the consume queue
    case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
    case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
        queueOffset = 0L;
        break;
    case MessageSysFlag.TRANSACTION_NOT_TYPE:
    case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
    default:
        break;
}

這里的邏輯是這樣的,當讀到的消息類型為事務(wù)消息時,設(shè)置當前消息的位點值為0,而不是設(shè)置真實的位點。這樣該位點就不會建立ConsumeQueue索引,也不會被消費。

以上就是RocketMQ特性Broker存儲事務(wù)消息實現(xiàn)的詳細內(nèi)容,更多關(guān)于RocketMQ Broker存儲事務(wù)消息的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • java簡單工廠模式實例及講解

    java簡單工廠模式實例及講解

    這篇文章主要為大家詳細介紹了java簡單工廠模式實例,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • Java微信公眾平臺之素材管理

    Java微信公眾平臺之素材管理

    這篇文章主要為大家詳細介紹了Java微信公眾平臺之素材管理,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-05-05
  • SpringBoot3中數(shù)據(jù)庫集成實踐詳解

    SpringBoot3中數(shù)據(jù)庫集成實踐詳解

    項目工程中,集成數(shù)據(jù)庫實現(xiàn)對數(shù)據(jù)的增曬改查管理,是最基礎(chǔ)的能力,所以下面小編就來和大家講講SpringBoot3如何實現(xiàn)數(shù)據(jù)庫集成,需要的可以參考下
    2023-08-08
  • maven <repositories>標簽和<pluginRepositories>標簽的使用

    maven <repositories>標簽和<pluginRepositories>標簽的使用

    這篇文章主要介紹了maven <repositories>標簽和<pluginRepositories>標簽的使用,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-07-07
  • 基于mybatis注解動態(tài)sql中foreach工具的方法

    基于mybatis注解動態(tài)sql中foreach工具的方法

    這篇文章主要介紹了mybatis注解動態(tài)sql中foreach工具方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java開發(fā)實例之圖書管理系統(tǒng)的實現(xiàn)

    Java開發(fā)實例之圖書管理系統(tǒng)的實現(xiàn)

    圖書管理的功能大體包括:增加書籍、借閱書籍、刪除書籍、查看書籍列表、退出系統(tǒng)、查找書籍、返還書籍這些,本文主要給大家介紹該系統(tǒng)的數(shù)據(jù)庫語句,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-10-10
  • 淺談緩沖字符流 BufferedReader BufferedWriter用法

    淺談緩沖字符流 BufferedReader BufferedWriter用法

    這篇文章主要介紹了緩沖字符流 BufferedReader BufferedWriter的用法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Layui前后臺交互數(shù)據(jù)獲取java實例

    Layui前后臺交互數(shù)據(jù)獲取java實例

    下面小編就為大家分享一篇Layui前后臺交互數(shù)據(jù)獲取java實例,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-01-01
  • Spring?Boot?整合?Thymeleaf?實例分享

    Spring?Boot?整合?Thymeleaf?實例分享

    這篇文章主要分享了Spring?Boot整合Thymeleaf,Thymeleaf是新一代的Java模板引擎,類似于Velocity、FreeMarker等傳統(tǒng)引擎,關(guān)于其更多相關(guān)內(nèi)容,需要的小伙伴可以參考一下
    2022-05-05
  • Java e.printStackTrace()案例講解

    Java e.printStackTrace()案例講解

    這篇文章主要介紹了Java e.printStackTrace()案例講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下
    2021-08-08

最新評論