elasticsearch索引的創(chuàng)建過程index?create邏輯分析
索引的創(chuàng)建過程
從本篇開始,就進入了Index的核心代碼部分。這里首先分析一下索引的創(chuàng)建過程。elasticsearch中的索引是多個分片的集合,它只是邏輯上的索引,并不具備實際的索引功能,所有對數(shù)據(jù)的操作最終還是由每個分片完成。
創(chuàng)建索引的過程,從elasticsearch集群上來說就是寫入索引元數(shù)據(jù)的過程,這一操作只能在master節(jié)點上完成。這是一個阻塞式動作,在加上分配在集群上均衡的過程也非常耗時,因此在一次創(chuàng)建大量索引的過程master節(jié)點會出現(xiàn)單點性能瓶頸,能夠看到響應過程很慢。
在開始具體源碼分析之前,首先回顧一下Action部分的內(nèi)容(參考index action分析),elasticsearch的每一個功能都對應兩個Action,*action和Transport*action。*action中定義了每個功能對應的路徑,同時Action的instance綁定對應的Transport*Action。所有功能請求都需要在集群上轉發(fā),這大概也是每個功能都有Transport*Action的原因吧。對于create當然也不例外,它的開始點也是TransportCreateAction。另外,在action support分析中分析過,不同的action需要經(jīng)過和需要操作的節(jié)點也不同。create index只能由master節(jié)點進行,而且也只在master節(jié)點上進行,保證集群數(shù)據(jù)的一致性。
materOperation方法實現(xiàn)
因此TransportCreateAction繼承了TransportMasterNodeOperationAction,并實現(xiàn)了materOperation方法。它的方法如下所示:
protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
String cause = request.cause();
if (cause.length() == 0) {
cause = "api";
}
final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases()).customs(request.customs());
createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Throwable t) {
if (t instanceof IndexAlreadyExistsException) {
logger.trace("[{}] failed to create", t, request.index());
} else {
logger.debug("[{}] failed to create", t, request.index());
}
listener.onFailure(t);
}
});
}這里看上很簡單,只是調(diào)用了createIndexService(它其實是MetaDataCreateIndexService)的方法,就是修改集群matedata過程。
clusterservice處理
修改前首先獲取到index名稱對應的lock,這樣保證操作數(shù)據(jù)一致性,然后生成updatetask,交給clusterservice處理。代碼如下所示:
public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
// 獲取鎖,只對該索引的操作加鎖,而不是整個cluster
final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());
// 如果能夠獲取鎖離開創(chuàng)建索引,否則在下面啟動新的線程進行
if (mdLock.tryAcquire()) {
createIndex(request, listener, mdLock);
return;
}
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
@Override
public void doRun() throws InterruptedException {
if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
return;
}
createIndex(request, listener, mdLock);
}
});
}createIndex方法,會封裝create請求,然后向cluster發(fā)送一個updatetask。代碼如下所示:
private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {
ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
request.settings(updatedSettingsBuilder.build());
clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener)建立索引 修改配置
增加或者修改mapping都是對集群狀態(tài)修改,它們的過程都很相似,都是通過clusterService提交一個更新操作,同時附帶有優(yōu)先級。clusterservice會根據(jù)優(yōu)先級和更新狀態(tài)task的類型來進行對應的操作。如下所示:
public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
if (!lifecycle.started()) {
return;
}
try {
final UpdateTask task = new UpdateTask(source, priority, updateTask);//根據(jù)優(yōu)先級新建不同的task
if (updateTask instanceof TimeoutClusterStateUpdateTask) {//超時任務,這類任務需要即時返回,因此立刻執(zhí)行。
final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
@Override
public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
}
});
}
});
} else {//其它類型,可以延遲執(zhí)行,則交給線程池來執(zhí)行。
updateTasksExecutor.execute(task);
}
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting
// to be done here...
if (!lifecycle.stoppedOrClosed()) {
throw e;
}
}
}說完它們的執(zhí)行過程,再來看一下create index的具體邏輯。這個邏輯在matedataservice所提交的AckedClusterStateUpdateTask中的execute方法中??傮w來說,這一過程就是將request中關于索引的配置mapping等取出來加入到當前的clustermatedata中,構造一個新的matedata的過程。這一過程還是比較復雜,限于篇幅將在下次中進行分析。
總結
創(chuàng)建索引的過程就是master節(jié)點更新集群matedata的過程,為了保證數(shù)據(jù)一致性,需要獲取鎖。
因此存在單點瓶頸。對于外部調(diào)用來說,跟其它功能一樣,外部接口調(diào)用CreateIndexAction的相關方法,然后通過TransPortCreateIndexAction講請求發(fā)送到集群上,進行索引創(chuàng)建。
以上就是elasticsearch索引創(chuàng)建過程index create的詳細內(nèi)容,更多關于elasticsearch索引創(chuàng)建過程index create的資料請關注腳本之家其它相關文章!
相關文章
iOS獲取AppIcon and LaunchImage''s name(app圖標和啟動圖片名字)
這篇文章主要介紹了iOS獲取AppIcon and LaunchImage's name(app圖標和啟動圖片名字)的相關資料,非常不錯,具有參考借鑒價值,感興趣的朋友一起學習吧2016-08-08
關于log4j日志擴展---自定義PatternLayout
這篇文章主要介紹了關于log4j日志擴展---自定義PatternLayout,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
SpringBoot多環(huán)境切換的配置實現(xiàn)
在日常的開發(fā)中,一般都會分好幾種環(huán)境,本文就來介紹一下SpringBoot多環(huán)境切換的配置實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2024-03-03
Java實現(xiàn)經(jīng)典游戲黃金礦工的示例代碼
《黃金礦工》游戲是一個經(jīng)典的抓金子小游戲,它可以鍛煉人的反應能力。本文將用Java實現(xiàn)這一經(jīng)典的游戲,感興趣的小伙伴可以了解一下2022-02-02

