欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RocketMQ?offset確認(rèn)機(jī)制示例詳解

 更新時(shí)間:2023年09月11日 14:17:33   作者:土豆肉絲蓋澆飯  
這篇文章主要為大家介紹了RocketMQ?offset確認(rèn)機(jī)制示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

消息存儲(chǔ)

offset?

本文要探討的offset指的是上圖中的Queue Offset。

為了保存消費(fèi)的消費(fèi)進(jìn)度,避免重復(fù)消費(fèi),我們需要將offset保存下來(lái)。

針對(duì)集群消費(fèi),offset保存在broker,在客戶端使用RemoteBrokerOffsetStore。

針對(duì)廣播消費(fèi),offset保存在本地,在客戶端使用LocalFileOffsetStore。

最后,比較重要的一點(diǎn)是,保存的offset指的是下一條消息的offset,而不是消費(fèi)完最后一條消息的offset。

比如,你消費(fèi)了上圖中第一個(gè)Queue的offset為0的消息,其實(shí)保存的offset為1,表示下次我從offset=1的位置進(jìn)行消費(fèi)。

broker端

在broker端,通過(guò)ConsumerOffsetManager中的offsetTable來(lái)保存Topic下各個(gè)ConsumerGroup的消費(fèi)進(jìn)度。

private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
    new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

從offsetTable的雙層Map結(jié)構(gòu)也是能夠看出,我上面說(shuō)的消費(fèi)進(jìn)度,細(xì)指為ConsumerGroup在Topic下每個(gè)queue的消費(fèi)進(jìn)度。

offsetTable畢竟只是內(nèi)存結(jié)構(gòu),因此ConsumerOffsetManager繼承了ConfigManager實(shí)現(xiàn)了持久化功能。

實(shí)現(xiàn)了encode,decode,configFilePath三個(gè)模板方法。用于指定序列化,反序列化的邏輯以及保存位置

public String encode() {
    return this.encode(false);
}
@Override
public String configFilePath() {
    return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
}
@Override
public void decode(String jsonString) {
    if (jsonString != null) {
        ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
        if (obj != null) {
            this.offsetTable = obj.offsetTable;
        }
    }
}
public String encode(final boolean prettyFormat) {
    return RemotingSerializable.toJson(this, prettyFormat);
}

其中序列化,反序列化的邏輯很簡(jiǎn)單,就是使用到了我們的FastJson。

保存文件名為consumerOffset.json。

offset加載

broker啟動(dòng)時(shí)從本地文件加載

org.apache.rocketmq.broker.BrokerController#initialize

result = result && this.consumerOffsetManager.load();

offset持久化

定時(shí)觸發(fā),持久化到磁盤

org.apache.rocketmq.broker.BrokerController#initialize

//定期將consumeroffset持久化到磁盤
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  • shutdown觸發(fā)

BrokerController#shutdown

offset更新

  • ConsumerManageProcessor#updateConsumerOffset

用于consumer定時(shí)同步offset

  • PullMessageProcessor

拉取消息時(shí)會(huì)順帶確認(rèn)offset

  • TransactionalMessageBridge

事務(wù)回查觸發(fā),暫不深入研究

consumer端

本文只討論P(yáng)USH模式的集群消費(fèi),本地的offset緩存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。

private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
    new ConcurrentHashMap<MessageQueue, AtomicLong>();

因?yàn)閏onsumer每次重啟都會(huì)重新拉取offset,只是一個(gè)臨時(shí)存儲(chǔ),因此RemoteBrokerOffsetStore的offsetTable的設(shè)計(jì)沒(méi)有像ConsumerOffsetManager那么復(fù)雜。

offset拉取

consumer啟動(dòng)后會(huì)進(jìn)行第一次rebalance,并且之后都會(huì)定期rebalance。

在rebalance分配好messagequeue之后,會(huì)根據(jù)messagequeue生成processqueue進(jìn)行消息拉取。

而在進(jìn)行消息拉取前,有一個(gè)關(guān)鍵的操作,拉取對(duì)應(yīng)messagequeue的offset。

RebalanceImpl#updateProcessQueueTableInRebalance

for (MessageQueue mq : mqSet) {
    if (!this.processQueueTable.containsKey(mq)) {
        //如果是順序消費(fèi) 但是lock失敗 那么跳過(guò)
        if (isOrder && !this.lock(mq)) {
            log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
            continue;
        }
        //從offsetstore刪除之前的數(shù)據(jù),可能之前有一段時(shí)間屬于該消費(fèi)者
        this.removeDirtyOffset(mq);
        ProcessQueue pq = new ProcessQueue();
        // 獲取該mq應(yīng)該從哪里開始消費(fèi)
        // pull模式 默認(rèn)是0
        // push模式 動(dòng)態(tài)計(jì)算
        long nextOffset = this.computePullFromWhere(mq);
        if (nextOffset >= 0) {
            ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
            if (pre != null) {
                log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
            } else {
                log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                //創(chuàng)建新的pullRequest
                PullRequest pullRequest = new PullRequest();
                pullRequest.setConsumerGroup(consumerGroup);
                pullRequest.setNextOffset(nextOffset);
                pullRequest.setMessageQueue(mq);
                pullRequest.setProcessQueue(pq);
                pullRequestList.add(pullRequest);
                changed = true;
            }
        } else {
            log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
        }
    }
}
public long computePullFromWhere(MessageQueue mq) {
    long result = -1;
    final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();
    final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
    switch (consumeFromWhere) {
        case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
        case CONSUME_FROM_MIN_OFFSET:
        case CONSUME_FROM_MAX_OFFSET:
        case CONSUME_FROM_LAST_OFFSET: {
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            }
            // First start,no offset
            else if (-1 == lastOffset) {
                if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    result = 0L;
                } else {
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                }
            } else {
                result = -1;
            }
            break;
        }
        case CONSUME_FROM_FIRST_OFFSET: {
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                result = 0L;
            } else {
                result = -1;
            }
            break;
        }
        case CONSUME_FROM_TIMESTAMP: {
            long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
            if (lastOffset >= 0) {
                result = lastOffset;
            } else if (-1 == lastOffset) {
                if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    try {
                        result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                } else {
                    try {
                        long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                                                           UtilAll.YYYYMMDDHHMMSS).getTime();
                        result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
                    } catch (MQClientException e) {
                        result = -1;
                    }
                }
            } else {
                result = -1;
            }
            break;
        }
        default:
            break;
    }
    return result;
}

其中獲取消息拉取初始位置有三種策略

CONSUME_FROM_LAST_OFFSET 最新的offset
CONSUME_FROM_FIRST_OFFSET 第一個(gè)offset
CONSUME_FROM_TIMESTAMP 根據(jù)時(shí)間戳獲取offset

但是從源碼中可以看出來(lái),實(shí)際上的邏輯和我們想象的有點(diǎn)不同,上面三個(gè)的邏輯的觸發(fā)前提是,從broker拉取不到offset進(jìn)度。

這應(yīng)該是為了防止重復(fù)消費(fèi)以及少消費(fèi),畢竟rocketmq是業(yè)務(wù)相關(guān)的mq。

long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);
if (lastOffset >= 0) {
    result = lastOffset;
}

offset更新

在consumer端,針對(duì)offsetTable的更新,當(dāng)然通過(guò)消費(fèi)消息觸發(fā)。

并發(fā)消費(fèi)

ConsumeMessageConcurrentlyService#processConsumeResult

//removeMessage會(huì)返回offset
//不管消費(fèi)成功還是失敗 都會(huì)確認(rèn)offset
//失敗的消息會(huì)在重試topic
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//更新offsetstore
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    //increaseOnly 表示offset只能增加
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

針對(duì)并發(fā)消費(fèi)的offset,更新值來(lái)源于ProcessQueue#removeMessage方法

/**
* 移除pq中的msg,并且返回剩下第一條消息的offset
* 如果消息全部消費(fèi)完,返回this.queueOffsetMax + 1
* 如果msgs為空 返回-1
* @param msgs
* @return
*/
public long removeMessage(final List<MessageExt> msgs) {
    long result = -1;
    final long now = System.currentTimeMillis();
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        this.lastConsumeTimestamp = now;
        try {
            if (!msgTreeMap.isEmpty()) {
                result = this.queueOffsetMax + 1;
                int removedCnt = 0;
                for (MessageExt msg : msgs) {
                    //通過(guò)map的key移除,也就是queueoffset
                    MessageExt prev = msgTreeMap.remove(msg.getQueueOffset());
                    if (prev != null) {
                        removedCnt--;
                        msgSize.addAndGet(0 - msg.getBody().length);
                    }
                }
                msgCount.addAndGet(removedCnt);
                //如果消息沒(méi)有全部消費(fèi)完畢,剩下消息中最小的那個(gè)
                if (!msgTreeMap.isEmpty()) {
                    result = msgTreeMap.firstKey();
                }
            }
        } finally {
            this.lockTreeMap.writeLock().unlock();
        }
    } catch (Throwable t) {
        log.error("removeMessage exception", t);
    }
    return result;
}

removeMessage的邏輯,用到了滑動(dòng)窗口的算法。

比如10條消息,offset為 0 - 9。

在多線程并發(fā)消費(fèi)的場(chǎng)景下

比如我第一個(gè)線程消費(fèi)了offset為0的消息,那么offsetTable中的offset更新為1

然后我第二個(gè)線程消費(fèi)了offset為5的消息,removeMessage返回的offset還是為1

只有前面的消息全被消費(fèi)了,窗口才會(huì)滑動(dòng)

順序消費(fèi)

ConsumeMessageOrderlyService#processConsumeResult

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

順序消費(fèi),暫不研究。

offset持久化

最終的offset以broker為準(zhǔn),因此本地的offset要定期持久化到offset。

主要持久化邏輯在persistAll和persist方法。

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist

public void persist(MessageQueue mq) {
    AtomicLong offset = this.offsetTable.get(mq);
    if (offset != null) {
        try {
            this.updateConsumeOffsetToBroker(mq, offset.get());
            log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                     this.groupName,
                     this.mQClientFactory.getClientId(),
                     mq,
                     offset.get());
        } catch (Exception e) {
            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
        }
    }
}

persistAll和persist邏輯大致相同,核心邏輯都是通過(guò)updateConsumeOffsetToBroker持久化到broker。

觸發(fā)持久化邏輯的時(shí)機(jī)有以下4個(gè)

定時(shí)同步

MQClientInstance#startScheduledTask

//定時(shí)同步offset到broker
//除了拉消息的時(shí)候會(huì)同步下 也有定時(shí)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            //持久化消費(fèi)進(jìn)度
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

shutdown

DefaultMQPullConsumerImpl#shutdown

public synchronized void shutdown() {
    switch (this.serviceState) {
        case CREATE_JUST:
            break;
        case RUNNING:
            this.persistConsumerOffset();//here
            this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup());
            this.mQClientFactory.shutdown();
            log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup());
            this.serviceState = ServiceState.SHUTDOWN_ALREADY;
            break;
        case SHUTDOWN_ALREADY:
            break;
        default:
            break;
    }
}

DefaultMQPushConsumerImpl#shutdown

public synchronized void shutdown() {
    switch (this.serviceState) {
        case CREATE_JUST:
            break;
        case RUNNING:
            this.consumeMessageService.shutdown();
            this.persistConsumerOffset();//here
            this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup());
            this.mQClientFactory.shutdown();
            log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup());
            this.rebalanceImpl.destroy();
            this.serviceState = ServiceState.SHUTDOWN_ALREADY;
            break;
        case SHUTDOWN_ALREADY:
            break;
        default:
            break;
    }
}

Rebalance

當(dāng)一個(gè)queue不再屬于當(dāng)前consumer的時(shí)候,需要同步進(jìn)步給broker,以便于新拿到queue的consumer從最新未消費(fèi)的消息開始拉取

RebalancePullImpl#removeUnnecessaryMessageQueue

public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) {
    this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq);
    this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq);
    return true;
}

拉取消息

拉取消息的時(shí)候會(huì)順帶commit offset

DefaultMQPushConsumerImpl#pullMessage

// 拉取消息的時(shí)候順帶ack消息進(jìn)度
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
    //READ_FROM_MEMORY 獲取本地緩存的消費(fèi)進(jìn)度
    commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
    if (commitOffsetValue &gt; 0) {
        commitOffsetEnable = true;
    }
}
this.pullAPIWrapper.pullKernelImpl(
    pullRequest.getMessageQueue(),
    subExpression,
    subscriptionData.getExpressionType(),
    subscriptionData.getSubVersion(),
    pullRequest.getNextOffset(),//需要拉取的下個(gè)offset ,拉取到了 ,不代表被消費(fèi)到
    this.defaultMQPushConsumer.getPullBatchSize(),
    sysFlag,
    commitOffsetValue,//確認(rèn)的offset
    BROKER_SUSPEND_MAX_TIME_MILLIS,
    CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
    CommunicationMode.ASYNC, //異步拉取消息
    pullCallback //注意這個(gè)回調(diào)也傳過(guò)去了
);

PullMessageProcessor#processRequest

boolean storeOffsetEnable = brokerAllowSuspend;
//需要hasCommitOffsetFlag=true
storeOffsetEnable = storeOffsetEnable &amp;&amp; hasCommitOffsetFlag;
//需要當(dāng)前broker是master
storeOffsetEnable = storeOffsetEnable
            &amp;&amp; this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    //這邊應(yīng)該是保存改group在該topic下面的消費(fèi)進(jìn)度
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),
                                                                  requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

總結(jié)

  • 針對(duì)集群消費(fèi)offset保存在broker,針對(duì)廣播消費(fèi)offset保存在本地
  • offset保存的值為消息下次開始消費(fèi)的位置,而不是上次消費(fèi)結(jié)束的位置
  • 針對(duì)offset的提交采取基于TreeMap的滑動(dòng)窗口機(jī)制
  • 消費(fèi)者啟動(dòng)會(huì)先從broker拉取對(duì)應(yīng)Topic的offset進(jìn)度,然后在進(jìn)行消息拉取
  • offset持久化觸發(fā)的幾種方式

問(wèn)題

消息消費(fèi)失敗是否影響窗口滑動(dòng)

正常情況下,消息消費(fèi)失敗不會(huì)影響窗口滑動(dòng),因?yàn)獒槍?duì)消費(fèi)失敗的消息,client會(huì)進(jìn)行sendback。

sendback之后,消息經(jīng)過(guò)延遲之后會(huì)發(fā)往Topic=%RETRY%{CONSUMERGROUP}的Retry隊(duì)列

每個(gè)ConsumerGroup會(huì)強(qiáng)制監(jiān)聽Retry隊(duì)列的消息

ConsumeMessageConcurrentlyService#processConsumeResult

List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
//因?yàn)閙sgsize=1,所以只有失敗的時(shí)候才會(huì)進(jìn)入下面的循環(huán)
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
    MessageExt msg = consumeRequest.getMsgs().get(i);
    //消費(fèi)失敗的消息  重新插入到commitlog  發(fā)送到group對(duì)應(yīng)重試topic
    boolean result = this.sendMessageBack(msg, context);
    //如果發(fā)送請(qǐng)求失敗 那么本地再消費(fèi)一次試試
    if (!result) {
        //本地的重試也算重試次數(shù)?。?!
        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
        msgBackFailed.add(msg);
    }
}
//發(fā)送到重試隊(duì)列失敗的消息,重新消費(fèi)
if (!msgBackFailed.isEmpty()) {
    consumeRequest.getMsgs().removeAll(msgBackFailed);
    //如果消費(fèi)失敗 重新消費(fèi)
    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}

而sendMessageBack失敗的消息,會(huì)重新封裝成另一個(gè)ConsumeRequest在本地再次消費(fèi)。

這些失敗的消息會(huì)從之前的consumeRequest移除,因此也就影響到了ProcessQueue#removeMessage的返回值。

但是這是一個(gè)優(yōu)化,重試之后窗口大概率上還是會(huì)正?;瑒?dòng)。

消費(fèi)者并發(fā)消費(fèi)如何保證提交位置偏移量正確,保證消費(fèi)消費(fèi)不丟失? 比如a線程消費(fèi)msgid=1,b線程消費(fèi)msgid=2,b線程消費(fèi)速度比a線程快。如何避免a線程消費(fèi)失敗,消息不丟失?

如何保證并發(fā)消費(fèi)提交偏移量正確?

基于TreeMap的滑動(dòng)窗口

如何保證消息消費(fèi)不丟失?

滑動(dòng)窗口+broker遠(yuǎn)端保存+sendback+本地重試兜底

應(yīng)用重啟,消息從哪里開始消費(fèi)

如果broker保存了offset

那么從對(duì)應(yīng)offset重新拉取消息

如果broker沒(méi)有保存offset,或者其他情況丟失

那么根據(jù)配置的策略,從對(duì)應(yīng)的offset開始拉取

以上就是RocketMQ offset確認(rèn)機(jī)制示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ offset確認(rèn)機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • IDEA中Maven爆紅以及依賴下載失敗的最全解決方案

    IDEA中Maven爆紅以及依賴下載失敗的最全解決方案

    這篇文章主要介紹了IDEA中Maven爆紅,依賴下載失敗的最全解決方案,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-08-08
  • MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增)

    MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增)

    本文主要介紹了MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • Springboot mybais配置多數(shù)據(jù)源過(guò)程解析

    Springboot mybais配置多數(shù)據(jù)源過(guò)程解析

    這篇文章主要介紹了Springboot+mybais配置多數(shù)據(jù)源過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-03-03
  • Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例

    Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例

    下面小編就為大家?guī)?lái)一篇Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-09-09
  • Java微服務(wù)Filter過(guò)濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過(guò)程詳解

    Java微服務(wù)Filter過(guò)濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過(guò)程詳解

    這篇文章主要介紹了Java微服務(wù)Filter過(guò)濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過(guò)程,首先Sentinel規(guī)則的存儲(chǔ)默認(rèn)是存儲(chǔ)在內(nèi)存的,應(yīng)用重啟之后規(guī)則會(huì)丟失。因此我們通過(guò)配置中心Nacos保存規(guī)則,然后通過(guò)定時(shí)拉取Nacos數(shù)據(jù)來(lái)獲取規(guī)則配置,可以做到動(dòng)態(tài)實(shí)時(shí)的刷新規(guī)則
    2023-02-02
  • Assert.assertEquals的使用方法及注意事項(xiàng)說(shuō)明

    Assert.assertEquals的使用方法及注意事項(xiàng)說(shuō)明

    這篇文章主要介紹了Assert.assertEquals的使用方法及注意事項(xiàng)說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-05-05
  • java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解

    java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解

    這篇文章主要介紹了java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解的相關(guān)資料,希望通過(guò)本文能幫助到大家,需要的朋友可以參考下
    2017-09-09
  • SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)源的實(shí)現(xiàn)

    SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)源的實(shí)現(xiàn)

    本文主要介紹了SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)庫(kù)源的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • 全網(wǎng)最詳細(xì)Hutool工具詳解

    全網(wǎng)最詳細(xì)Hutool工具詳解

    Hutool的目標(biāo)是使用一個(gè)工具方法代替一段復(fù)雜代碼,從而最大限度的避免“復(fù)制粘貼”代碼的問(wèn)題,徹底改變我們寫代碼的方式。這篇文章主要介紹了全文最詳細(xì)Hutool工具詳解,需要的朋友可以參考下
    2021-12-12
  • 解析Java并發(fā)Exchanger的使用

    解析Java并發(fā)Exchanger的使用

    Exchanger是java 5引入的并發(fā)類,Exchanger顧名思義就是用來(lái)做交換的。這里主要是兩個(gè)線程之間交換持有的對(duì)象。當(dāng)Exchanger在一個(gè)線程中調(diào)用exchange方法之后,會(huì)等待另外的線程調(diào)用同樣的exchange方法。兩個(gè)線程都調(diào)用exchange方法之后,傳入的參數(shù)就會(huì)交換。
    2021-06-06

最新評(píng)論