RocketMQ?Broker實現(xiàn)高可用高并發(fā)的消息中轉(zhuǎn)服務(wù)
RocketMq-broker
broker主要作用就是存儲消息。所以重點就放在它對于消息的處理上面。我提出幾個問題,后續(xù)看代碼解答。
- broker啟動的時候是怎么向nameserv進行注冊的?
- productor發(fā)送過來的消息是怎么儲存的?
- comsumer是怎么在broker拉取數(shù)據(jù)的?
- 高可用怎么做的?broker掛了怎么辦,數(shù)據(jù)肯定要有備份的
注冊
注冊的時候,就是在啟動的時候,向所有的nameService注冊自己的信息。其中nameService的地址是可以在啟動的時候配置的。代碼在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll。這里我省略了其他代碼
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final boolean oneway,
final int timeoutMills,
final boolean enableActingMaster,
final boolean compressed,
final Long heartbeatTimeoutMillis,
final BrokerIdentity brokerIdentity) {
final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();
List<String> nameServerAddressList = this.remotingClient.getAvailableNameSrvList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(new AbstractBrokerRunnable(brokerIdentity) {
@Override
public void run2() {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
LOGGER.info("Registering current broker to name server completed. TargetHost={}", namesrvAddr);
} catch (Exception e) {
LOGGER.error("Failed to register current broker to name server. TargetHost={}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
}
});
}
try {
if (!countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS)) {
LOGGER.warn("Registration to one or more name servers does NOT complete within deadline. Timeout threshold: {}ms", timeoutMills);
}
} catch (InterruptedException ignore) {
}
}
return registerBrokerResultList;
}這里用了countDownLatch來判斷一下所有broker注冊完成是否超時,超時就打印一個warn。
消息存儲
具體可以看官網(wǎng)的文檔設(shè)計。我這里貼一部分內(nèi)容。
消息存儲架構(gòu)圖中主要有下面三個跟消息存儲相關(guān)的文件構(gòu)成。
(1) CommitLog:消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容,消息內(nèi)容不是定長的。單個文件大小默認(rèn)1G, 文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募?dāng)文件滿了,寫入下一個文件;
(2) ConsumeQueue:消息消費索引,引入的目的主要是提高消息消費的性能。ConsumeQueue作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu)
(3) IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。Index文件的存儲位置是:$HOME/store/index/{fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu),故RocketMQ的索引文件其底層實現(xiàn)為hash索引。
具體請求是通過netty來處理的
NettyRemotingAbstract#processRequestCommand里面會根據(jù)請求code拿到具體的processor。
其中
- SendMessageProcessor 負(fù)責(zé)處理 Producer 發(fā)送消息的請求;
- PullMessageProcessor 負(fù)責(zé)處理 Consumer 消費消息的請求;
- QueryMessageProcessor 負(fù)責(zé)處理按照消息 Key 等查詢消息的請求。
數(shù)據(jù)寫入主要是在DefaultMessageStore#asyncPutMessage里面
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
......
topicQueueLock.lock(topicQueueKey);
try {
boolean needAssignOffset = true;
if (defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()
&& defaultMessageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
needAssignOffset = false;
}
if (needAssignOffset) {
defaultMessageStore.assignOffset(msg, getMessageNum(msg));
}
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null) {
return CompletableFuture.completedFuture(encodeResult);
}
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().getEncoderBuffer());
PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
if (!defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
msg.setStoreTimestamp(beginLockTimestamp);
}
if (null == mappedFile || mappedFile.isFull()) {
// 首先獲取mappedFile
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
if (null == mappedFile) {
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, null));
}
// 寫入數(shù)據(jù)
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus()) {
case PUT_OK:
onCommitLogAppend(msg, result, mappedFile);
break;
case END_OF_FILE:
onCommitLogAppend(msg, result, mappedFile);
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPPED_FILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
onCommitLogAppend(msg, result, mappedFile);
}
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock();
}
} finally {
topicQueueLock.unlock(topicQueueKey);
}
if (elapsedTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(result.getMsgNum());
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
// 刷盤策略
return handleDiskFlushAndHA(putMessageResult, msg, needAckNums, needHandleHA);
}首先獲取mappedFile,可以理解就是commitLog文件的一個映射。創(chuàng)建mappedFile會同時提前創(chuàng)建兩個文件,避免了下次創(chuàng)建文件等待。
org.apache.rocketmq.store.AllocateMappedFileService#mmapOperation
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;
}
if (req.getMappedFile() == null) {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new DefaultMappedFile(req.getFilePath(), req.getFileSize());
}
long elapsedTime = UtilAll.computeElapsedTimeMilliseconds(beginTime);
if (elapsedTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + elapsedTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();
}
return true;
}這里會去初始化mapperFile
org.apache.rocketmq.store.logfile.DefaultMappedFile#init
private void init(final String fileName, final int fileSize) throws IOException {
......
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
這里其實就是用java的map創(chuàng)建文件。
如果開啟了堆外對象池,會用writeBuffer來寫入數(shù)據(jù)。讀取文件還是用mappedByteBuffer。
@Override
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
在創(chuàng)建好maperFile后,還有個預(yù)熱的操作
public void warmMappedFile(FlushDiskType type, int pages) {
this.mappedByteBufferAccessCountSinceLastSwap++;
long beginTime = System.currentTimeMillis();
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
int flush = 0;
long time = System.currentTimeMillis();
//通過寫入 1G 的字節(jié) 0 來讓操作系統(tǒng)分配物理內(nèi)存空間,如果沒有填充值,操作系統(tǒng)不會實際分配物理內(nèi)存,防止在寫入消息時發(fā)生缺頁異常
for (int i = 0, j = 0; i < this.fileSize; i += DefaultMappedFile.OS_PAGE_SIZE, j++) {
byteBuffer.put(i, (byte) 0);
// force flush when flush disk type is sync
if (type == FlushDiskType.SYNC_FLUSH) {
if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
flush = i;
mappedByteBuffer.force();
}
}
// 這里就是每隔一段時間sleep一下,這樣讓其他線程有執(zhí)行的機會,這其中也包括gc線程,讓gc線程有機會在循環(huán)的中途可以執(zhí)行g(shù)c。避免很久才執(zhí)行一次gc
// prevent gc
if (j % 1000 == 0) {
log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
time = System.currentTimeMillis();
try {
Thread.sleep(0);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}
}
// force flush when prepare load finished
if (type == FlushDiskType.SYNC_FLUSH) {
log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
this.getFileName(), System.currentTimeMillis() - beginTime);
mappedByteBuffer.force();
}
log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
System.currentTimeMillis() - beginTime);
this.mlock();
}因為通過 mmap 映射,只是建立了進程虛擬內(nèi)存地址與物理內(nèi)存地址之間的映射關(guān)系,并沒有將 Page Cache 加載至內(nèi)存。讀寫數(shù)據(jù)時如果沒有命中寫 Page Cache 則發(fā)生缺頁中斷,從磁盤重新加載數(shù)據(jù)至內(nèi)存,這樣會影響讀寫性能。為了防止缺頁異常,阻止操作系統(tǒng)將相關(guān)的內(nèi)存頁調(diào)度到交換空間(swap space),RocketMQ 通過對文件預(yù)熱,將對應(yīng)page cache提前加載到內(nèi)存中。
然后中間循環(huán)會sleep一下,就是讓gc可以運行。我復(fù)制一下chatGpt的回答:
這段代碼中的if (j % 1000 == 0)語句是為了防止頻繁的GC。在每次循環(huán)中,當(dāng)j的值是1000的倍數(shù)時,會執(zhí)行一次Thread.sleep(0),這個操作會讓當(dāng)前線程暫停一小段時間,從而讓JVM有機會回收一些不再使用的對象。這樣做的目的是為了減少GC的頻率,從而提高程序的性能。
最后還有一個鎖定
public void mlock() {
final long beginTime = System.currentTimeMillis();
final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
Pointer pointer = new Pointer(address);
{
// 通過系統(tǒng)調(diào)用 mlock 鎖定該文件的 Page Cache,防止其被交換到 swap 空間
int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
{
// 通過系統(tǒng)調(diào)用 madvise 給操作系統(tǒng)建議,說明該文件在不久的將來要被訪問
int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
}
}然后就是對mapperFile進行寫入消息。就是拿著buffer寫入具體的數(shù)據(jù)。
接著就是處理刷盤方式和高可用。
org.apache.rocketmq.store.CommitLog#handleDiskFlushAndHA
private CompletableFuture<PutMessageResult> handleDiskFlushAndHA(PutMessageResult putMessageResult,
MessageExt messageExt, int needAckNums, boolean needHandleHA) {
// 處理刷盤機制
CompletableFuture<PutMessageStatus> flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt);
CompletableFuture<PutMessageStatus> replicaResultFuture;
if (!needHandleHA) {
replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
} else {
// 處理HA
replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums);
}
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
if (flushStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(flushStatus);
}
if (replicaStatus != PutMessageStatus.PUT_OK) {
putMessageResult.setPutMessageStatus(replicaStatus);
}
return putMessageResult;
});
}
處理刷盤
org.apache.rocketmq.store.CommitLog.DefaultFlushManager#handleDiskFlush
@Override
public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
// Synchronization flush
if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
if (messageExt.isWaitStoreMsgOK()) {
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(), CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
flushDiskWatcher.add(request);
service.putRequest(request);
return request.future();
} else {
service.wakeup();
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
// Asynchronous flush
else {
if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
根據(jù)配置的同步刷盤或者異步刷盤的機制來決定具體的刷盤策略。
處理高可用
org.apache.rocketmq.store.CommitLog#handleHA
private CompletableFuture<PutMessageStatus> handleHA(AppendMessageResult result, PutMessageResult putMessageResult,
int needAckNums) {
if (needAckNums >= 0 && needAckNums <= 1) {
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
HAService haService = this.defaultMessageStore.getHaService();
long nextOffset = result.getWroteOffset() + result.getWroteBytes();
// Wait enough acks from different slaves
GroupCommitRequest request = new GroupCommitRequest(nextOffset, this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout(), needAckNums);
haService.putRequest(request);
haService.getWaitNotifyObject().wakeupAll();
return request.future();
}
其實后臺一直有一個同步線程去處理消息同步的事情,只要比較一下master和salve的commitLog的offset就可以比較出來差多少數(shù)據(jù)了。所以把slave沒有的數(shù)據(jù)同步過去就可以了,這塊后面再寫一篇文章細(xì)講。
那還有一個問題,consumeQueue和indexFile是怎么處理的呢?
ReputMessageService里面會去讀取commitLog的數(shù)據(jù),寫入到comsunerQueue和IndexFile
根據(jù)各個dispatch,分別處理兩個文件。這里就不細(xì)講了。
ConsumeQueue的處理是在這里面
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
文件的名字其實就是topic/queueid。寫入的數(shù)據(jù)是
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
其實就是commitLog的一個offset,根據(jù)這個值就可以拿到具體的消息了。
org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex
indexFile就是寫入這些數(shù)據(jù)
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
包括key的hash值,還有物理偏移,還有時間等信息。首先文件是按照每個毫秒創(chuàng)建的,所以天然就是按照時間順序排列。根據(jù)key查詢的話,寫入文件的位置是根據(jù)key的hash來的,所以可以馬上知道是哪個位置。
好了,到這里數(shù)據(jù)存儲就差不多了。來看看怎么讀消息的
消息讀取
消費者拉取消息
拉取消息有自己的處理器:
org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest
里面有很多額外的邏輯,具體在下面的方法中:
org.apache.rocketmq.store.DefaultMessageStore#getMessage
消息讀取很簡單,就是從根據(jù)topic和queueId去consumeQueue里面讀,消費者知道上次拉取到了哪里,所以就直接根據(jù)consumeQueue的offset去讀內(nèi)容,consumeQueue里面存的是commitLog的offset和size,根據(jù)這兩個值就可以從commitLog里面拿到消息,返回。然后更新下次的offset,返回給productor。
按照key查詢
org.apache.rocketmq.store.DefaultMessageStore#queryMessage
主要是查的indexFile,前面提到indexFile就是按照時間來創(chuàng)建文件的,所以先按照時間篩選出符合條件的indexFile,然后根據(jù)key的hash,找到文件對應(yīng)的寫入位置,因為對應(yīng)的hash會有沖突,就一個個遍歷,找到所有hash值相等的數(shù)據(jù)。然后再根據(jù)indexFile記錄的offset,去commitLog里面去查消息。
到此這篇關(guān)于RocketMQ Broker實現(xiàn)高可用高并發(fā)的消息中轉(zhuǎn)服務(wù)的文章就介紹到這了,更多相關(guān)RocketMq Broker內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring?Boot整合Bootstrap的超詳細(xì)步驟
之前做前端開發(fā),在使用bootstrap的時候都是去官網(wǎng)下載,然后放到項目中,在頁面引用,下面這篇文章主要給大家介紹了關(guān)于Spring?Boot整合Bootstrap的超詳細(xì)步驟,需要的朋友可以參考下2023-05-05
java Swing JFrame框架類中setDefaultCloseOperation的參數(shù)含義與用法示例
這篇文章主要介紹了java Swing JFrame框架類中setDefaultCloseOperation的參數(shù)含義與用法,結(jié)合實例形式分析了Swing組件的JFrame框架類中setDefaultCloseOperation方法的簡單使用技巧,需要的朋友可以參考下2017-11-11
SpringBoot整合Mybatis簡單實現(xiàn)增刪改查
這篇文章主要介紹了SpringBoot整合Mybatis簡單實現(xiàn)增刪改查,文章為圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08
Java排序算法三之歸并排序的遞歸與非遞歸的實現(xiàn)示例解析
這篇文章主要介紹了Java排序算法三之歸并排序的遞歸與非遞歸的實現(xiàn)示例解析,文章通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
2種Java刪除ArrayList中的重復(fù)元素的方法
這篇文章主要介紹了2種Java刪除ArrayList中的重復(fù)元素的方法,感興趣的朋友可以參考下2015-08-08
Spring通過ApplicationContext主動獲取bean的方法講解
今天小編就為大家分享一篇關(guān)于Spring通過ApplicationContext主動獲取bean的方法講解,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2019-03-03

