RocketMQ?offset確認(rèn)機(jī)制示例詳解
消息存儲(chǔ)
offset?
本文要探討的offset指的是上圖中的Queue Offset。
為了保存消費(fèi)的消費(fèi)進(jìn)度,避免重復(fù)消費(fèi),我們需要將offset保存下來(lái)。
針對(duì)集群消費(fèi),offset保存在broker,在客戶端使用RemoteBrokerOffsetStore。
針對(duì)廣播消費(fèi),offset保存在本地,在客戶端使用LocalFileOffsetStore。
最后,比較重要的一點(diǎn)是,保存的offset指的是下一條消息的offset,而不是消費(fèi)完最后一條消息的offset。
比如,你消費(fèi)了上圖中第一個(gè)Queue的offset為0的消息,其實(shí)保存的offset為1,表示下次我從offset=1的位置進(jìn)行消費(fèi)。
broker端
在broker端,通過(guò)ConsumerOffsetManager中的offsetTable來(lái)保存Topic下各個(gè)ConsumerGroup的消費(fèi)進(jìn)度。
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
從offsetTable的雙層Map結(jié)構(gòu)也是能夠看出,我上面說(shuō)的消費(fèi)進(jìn)度,細(xì)指為ConsumerGroup在Topic下每個(gè)queue的消費(fèi)進(jìn)度。
offsetTable畢竟只是內(nèi)存結(jié)構(gòu),因此ConsumerOffsetManager繼承了ConfigManager實(shí)現(xiàn)了持久化功能。
實(shí)現(xiàn)了encode,decode,configFilePath三個(gè)模板方法。用于指定序列化,反序列化的邏輯以及保存位置
public String encode() { return this.encode(false); } @Override public String configFilePath() { return BrokerPathConfigHelper.getConsumerOffsetPath(this.brokerController.getMessageStoreConfig().getStorePathRootDir()); } @Override public void decode(String jsonString) { if (jsonString != null) { ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class); if (obj != null) { this.offsetTable = obj.offsetTable; } } } public String encode(final boolean prettyFormat) { return RemotingSerializable.toJson(this, prettyFormat); }
其中序列化,反序列化的邏輯很簡(jiǎn)單,就是使用到了我們的FastJson。
保存文件名為consumerOffset.json。
offset加載
broker啟動(dòng)時(shí)從本地文件加載
org.apache.rocketmq.broker.BrokerController#initialize
result = result && this.consumerOffsetManager.load();
offset持久化
定時(shí)觸發(fā),持久化到磁盤
org.apache.rocketmq.broker.BrokerController#initialize
//定期將consumeroffset持久化到磁盤 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
- shutdown觸發(fā)
BrokerController#shutdown
offset更新
- ConsumerManageProcessor#updateConsumerOffset
用于consumer定時(shí)同步offset
- PullMessageProcessor
拉取消息時(shí)會(huì)順帶確認(rèn)offset
- TransactionalMessageBridge
事務(wù)回查觸發(fā),暫不深入研究
consumer端
本文只討論P(yáng)USH模式的集群消費(fèi),本地的offset緩存到RemoteBrokerOffsetStore的offsetTable中,定期同步到broker。
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable = new ConcurrentHashMap<MessageQueue, AtomicLong>();
因?yàn)閏onsumer每次重啟都會(huì)重新拉取offset,只是一個(gè)臨時(shí)存儲(chǔ),因此RemoteBrokerOffsetStore的offsetTable的設(shè)計(jì)沒(méi)有像ConsumerOffsetManager那么復(fù)雜。
offset拉取
consumer啟動(dòng)后會(huì)進(jìn)行第一次rebalance,并且之后都會(huì)定期rebalance。
在rebalance分配好messagequeue之后,會(huì)根據(jù)messagequeue生成processqueue進(jìn)行消息拉取。
而在進(jìn)行消息拉取前,有一個(gè)關(guān)鍵的操作,拉取對(duì)應(yīng)messagequeue的offset。
RebalanceImpl#updateProcessQueueTableInRebalance
for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { //如果是順序消費(fèi) 但是lock失敗 那么跳過(guò) if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } //從offsetstore刪除之前的數(shù)據(jù),可能之前有一段時(shí)間屬于該消費(fèi)者 this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); // 獲取該mq應(yīng)該從哪里開始消費(fèi) // pull模式 默認(rèn)是0 // push模式 動(dòng)態(tài)計(jì)算 long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //創(chuàng)建新的pullRequest PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } }
public long computePullFromWhere(MessageQueue mq) { long result = -1; final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere(); final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore(); switch (consumeFromWhere) { case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST: case CONSUME_FROM_MIN_OFFSET: case CONSUME_FROM_MAX_OFFSET: case CONSUME_FROM_LAST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } // First start,no offset else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { result = 0L; } else { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } case CONSUME_FROM_FIRST_OFFSET: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { result = 0L; } else { result = -1; } break; } case CONSUME_FROM_TIMESTAMP: { long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; } else if (-1 == lastOffset) { if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { try { result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq); } catch (MQClientException e) { result = -1; } } else { try { long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(), UtilAll.YYYYMMDDHHMMSS).getTime(); result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp); } catch (MQClientException e) { result = -1; } } } else { result = -1; } break; } default: break; } return result; }
其中獲取消息拉取初始位置有三種策略
CONSUME_FROM_LAST_OFFSET 最新的offset
CONSUME_FROM_FIRST_OFFSET 第一個(gè)offset
CONSUME_FROM_TIMESTAMP 根據(jù)時(shí)間戳獲取offset
但是從源碼中可以看出來(lái),實(shí)際上的邏輯和我們想象的有點(diǎn)不同,上面三個(gè)的邏輯的觸發(fā)前提是,從broker拉取不到offset進(jìn)度。
這應(yīng)該是為了防止重復(fù)消費(fèi)以及少消費(fèi),畢竟rocketmq是業(yè)務(wù)相關(guān)的mq。
long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE); if (lastOffset >= 0) { result = lastOffset; }
offset更新
在consumer端,針對(duì)offsetTable的更新,當(dāng)然通過(guò)消費(fèi)消息觸發(fā)。
并發(fā)消費(fèi)
ConsumeMessageConcurrentlyService#processConsumeResult
//removeMessage會(huì)返回offset //不管消費(fèi)成功還是失敗 都會(huì)確認(rèn)offset //失敗的消息會(huì)在重試topic long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); //更新offsetstore if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { //increaseOnly 表示offset只能增加 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }
針對(duì)并發(fā)消費(fèi)的offset,更新值來(lái)源于ProcessQueue#removeMessage方法
/** * 移除pq中的msg,并且返回剩下第一條消息的offset * 如果消息全部消費(fèi)完,返回this.queueOffsetMax + 1 * 如果msgs為空 返回-1 * @param msgs * @return */ public long removeMessage(final List<MessageExt> msgs) { long result = -1; final long now = System.currentTimeMillis(); try { this.lockTreeMap.writeLock().lockInterruptibly(); this.lastConsumeTimestamp = now; try { if (!msgTreeMap.isEmpty()) { result = this.queueOffsetMax + 1; int removedCnt = 0; for (MessageExt msg : msgs) { //通過(guò)map的key移除,也就是queueoffset MessageExt prev = msgTreeMap.remove(msg.getQueueOffset()); if (prev != null) { removedCnt--; msgSize.addAndGet(0 - msg.getBody().length); } } msgCount.addAndGet(removedCnt); //如果消息沒(méi)有全部消費(fèi)完畢,剩下消息中最小的那個(gè) if (!msgTreeMap.isEmpty()) { result = msgTreeMap.firstKey(); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (Throwable t) { log.error("removeMessage exception", t); } return result; }
removeMessage的邏輯,用到了滑動(dòng)窗口的算法。
比如10條消息,offset為 0 - 9。
在多線程并發(fā)消費(fèi)的場(chǎng)景下
比如我第一個(gè)線程消費(fèi)了offset為0的消息,那么offsetTable中的offset更新為1
然后我第二個(gè)線程消費(fèi)了offset為5的消息,removeMessage返回的offset還是為1
只有前面的消息全被消費(fèi)了,窗口才會(huì)滑動(dòng)
順序消費(fèi)
ConsumeMessageOrderlyService#processConsumeResult
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); }
順序消費(fèi),暫不研究。
offset持久化
最終的offset以broker為準(zhǔn),因此本地的offset要定期持久化到offset。
主要持久化邏輯在persistAll和persist方法。
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#persist
public void persist(MessageQueue mq) { AtomicLong offset = this.offsetTable.get(mq); if (offset != null) { try { this.updateConsumeOffsetToBroker(mq, offset.get()); log.info("[persist] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}", this.groupName, this.mQClientFactory.getClientId(), mq, offset.get()); } catch (Exception e) { log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e); } } }
persistAll和persist邏輯大致相同,核心邏輯都是通過(guò)updateConsumeOffsetToBroker持久化到broker。
觸發(fā)持久化邏輯的時(shí)機(jī)有以下4個(gè)
定時(shí)同步
MQClientInstance#startScheduledTask
//定時(shí)同步offset到broker //除了拉消息的時(shí)候會(huì)同步下 也有定時(shí) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //持久化消費(fèi)進(jìn)度 MQClientInstance.this.persistAllConsumerOffset(); } catch (Exception e) { log.error("ScheduledTask persistAllConsumerOffset exception", e); } } }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
shutdown
DefaultMQPullConsumerImpl#shutdown
public synchronized void shutdown() { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: this.persistConsumerOffset();//here this.mQClientFactory.unregisterConsumer(this.defaultMQPullConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); log.info("the consumer [{}] shutdown OK", this.defaultMQPullConsumer.getConsumerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; case SHUTDOWN_ALREADY: break; default: break; } }
DefaultMQPushConsumerImpl#shutdown
public synchronized void shutdown() { switch (this.serviceState) { case CREATE_JUST: break; case RUNNING: this.consumeMessageService.shutdown(); this.persistConsumerOffset();//here this.mQClientFactory.unregisterConsumer(this.defaultMQPushConsumer.getConsumerGroup()); this.mQClientFactory.shutdown(); log.info("the consumer [{}] shutdown OK", this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.destroy(); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; case SHUTDOWN_ALREADY: break; default: break; } }
Rebalance
當(dāng)一個(gè)queue不再屬于當(dāng)前consumer的時(shí)候,需要同步進(jìn)步給broker,以便于新拿到queue的consumer從最新未消費(fèi)的消息開始拉取
RebalancePullImpl#removeUnnecessaryMessageQueue
public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue pq) { this.defaultMQPullConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPullConsumerImpl.getOffsetStore().removeOffset(mq); return true; }
拉取消息
拉取消息的時(shí)候會(huì)順帶commit offset
DefaultMQPushConsumerImpl#pullMessage
// 拉取消息的時(shí)候順帶ack消息進(jìn)度 boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { //READ_FROM_MEMORY 獲取本地緩存的消費(fèi)進(jìn)度 commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } }
this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(),//需要拉取的下個(gè)offset ,拉取到了 ,不代表被消費(fèi)到 this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue,//確認(rèn)的offset BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, CommunicationMode.ASYNC, //異步拉取消息 pullCallback //注意這個(gè)回調(diào)也傳過(guò)去了 );
PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend; //需要hasCommitOffsetFlag=true storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; //需要當(dāng)前broker是master storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; if (storeOffsetEnable) { //這邊應(yīng)該是保存改group在該topic下面的消費(fèi)進(jìn)度 this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); }
總結(jié)
- 針對(duì)集群消費(fèi)offset保存在broker,針對(duì)廣播消費(fèi)offset保存在本地
- offset保存的值為消息下次開始消費(fèi)的位置,而不是上次消費(fèi)結(jié)束的位置
- 針對(duì)offset的提交采取基于TreeMap的滑動(dòng)窗口機(jī)制
- 消費(fèi)者啟動(dòng)會(huì)先從broker拉取對(duì)應(yīng)Topic的offset進(jìn)度,然后在進(jìn)行消息拉取
- offset持久化觸發(fā)的幾種方式
問(wèn)題
消息消費(fèi)失敗是否影響窗口滑動(dòng)
正常情況下,消息消費(fèi)失敗不會(huì)影響窗口滑動(dòng),因?yàn)獒槍?duì)消費(fèi)失敗的消息,client會(huì)進(jìn)行sendback。
sendback之后,消息經(jīng)過(guò)延遲之后會(huì)發(fā)往Topic=%RETRY%{CONSUMERGROUP}的Retry隊(duì)列
每個(gè)ConsumerGroup會(huì)強(qiáng)制監(jiān)聽Retry隊(duì)列的消息
ConsumeMessageConcurrentlyService#processConsumeResult
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); //因?yàn)閙sgsize=1,所以只有失敗的時(shí)候才會(huì)進(jìn)入下面的循環(huán) for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); //消費(fèi)失敗的消息 重新插入到commitlog 發(fā)送到group對(duì)應(yīng)重試topic boolean result = this.sendMessageBack(msg, context); //如果發(fā)送請(qǐng)求失敗 那么本地再消費(fèi)一次試試 if (!result) { //本地的重試也算重試次數(shù)?。?! msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } //發(fā)送到重試隊(duì)列失敗的消息,重新消費(fèi) if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); //如果消費(fèi)失敗 重新消費(fèi) this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); }
而sendMessageBack失敗的消息,會(huì)重新封裝成另一個(gè)ConsumeRequest在本地再次消費(fèi)。
這些失敗的消息會(huì)從之前的consumeRequest移除,因此也就影響到了ProcessQueue#removeMessage的返回值。
但是這是一個(gè)優(yōu)化,重試之后窗口大概率上還是會(huì)正?;瑒?dòng)。
消費(fèi)者并發(fā)消費(fèi)如何保證提交位置偏移量正確,保證消費(fèi)消費(fèi)不丟失? 比如a線程消費(fèi)msgid=1,b線程消費(fèi)msgid=2,b線程消費(fèi)速度比a線程快。如何避免a線程消費(fèi)失敗,消息不丟失?
如何保證并發(fā)消費(fèi)提交偏移量正確?
基于TreeMap的滑動(dòng)窗口
如何保證消息消費(fèi)不丟失?
滑動(dòng)窗口+broker遠(yuǎn)端保存+sendback+本地重試兜底
應(yīng)用重啟,消息從哪里開始消費(fèi)
如果broker保存了offset
那么從對(duì)應(yīng)offset重新拉取消息
如果broker沒(méi)有保存offset,或者其他情況丟失
那么根據(jù)配置的策略,從對(duì)應(yīng)的offset開始拉取
以上就是RocketMQ offset確認(rèn)機(jī)制示例詳解的詳細(xì)內(nèi)容,更多關(guān)于RocketMQ offset確認(rèn)機(jī)制的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- 關(guān)于springboot使用rocketmq?RocketMQMessageListener參數(shù)問(wèn)題
- rocketmq的AclClientRPCHook權(quán)限控制使用技巧示例詳解
- rocketmq-streams的ILeaseService使用示例詳解
- springboot集成RocketMQ過(guò)程及使用示例詳解
- RocketMQ?源碼分析Broker消息刷盤服務(wù)
- RocketMQ?Broker消息如何刷盤源碼解析
- RocketMQMessageListener注解對(duì)rocketmq消息的消費(fèi)實(shí)現(xiàn)機(jī)制
相關(guān)文章
MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增)
本文主要介紹了MyBatis批量插入的五種方式小結(jié)(MyBatis以集合方式批量新增),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01Springboot mybais配置多數(shù)據(jù)源過(guò)程解析
這篇文章主要介紹了Springboot+mybais配置多數(shù)據(jù)源過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例
下面小編就為大家?guī)?lái)一篇Maven在Windows中的配置以及IDE中的項(xiàng)目創(chuàng)建實(shí)例。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-09-09Java微服務(wù)Filter過(guò)濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過(guò)程詳解
這篇文章主要介紹了Java微服務(wù)Filter過(guò)濾器集成Sentinel實(shí)現(xiàn)網(wǎng)關(guān)限流過(guò)程,首先Sentinel規(guī)則的存儲(chǔ)默認(rèn)是存儲(chǔ)在內(nèi)存的,應(yīng)用重啟之后規(guī)則會(huì)丟失。因此我們通過(guò)配置中心Nacos保存規(guī)則,然后通過(guò)定時(shí)拉取Nacos數(shù)據(jù)來(lái)獲取規(guī)則配置,可以做到動(dòng)態(tài)實(shí)時(shí)的刷新規(guī)則2023-02-02Assert.assertEquals的使用方法及注意事項(xiàng)說(shuō)明
這篇文章主要介紹了Assert.assertEquals的使用方法及注意事項(xiàng)說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解
這篇文章主要介紹了java 中設(shè)計(jì)模式(值對(duì)象)的實(shí)例詳解的相關(guān)資料,希望通過(guò)本文能幫助到大家,需要的朋友可以參考下2017-09-09SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)源的實(shí)現(xiàn)
本文主要介紹了SpringBoot+Jpa項(xiàng)目配置雙數(shù)據(jù)庫(kù)源的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12