elasticsearch數(shù)據(jù)信息索引操作action?support示例分析
抽象類分析
Action這一部分主要是數(shù)據(jù)(索引)的操作和部分集群信息操作。 所有的請求通過client轉(zhuǎn)發(fā)到對應的action上然后再由對應的TransportAction來執(zhí)行相關(guān)請求。如果請求能在本機上執(zhí)行則在本機上執(zhí)行,否則使用Transport進行轉(zhuǎn)發(fā)到對應的節(jié)點。action support部分是對action的抽象,所有的具體action都繼承了support action中的某個類。這里將對這些抽象類進行分析。
這一部分總共分為broadcast(廣播),master,nodes,replication及single幾個部分。broadcast主要針對一些無具體目標主機的操作,如查詢index是否存在,所有繼承這個類的action都具有這種類似的性質(zhì);nodes主要是對節(jié)點的操作,如熱點線程查詢(hotThread)查詢節(jié)點上的繁忙線程;replication的子類主要是需要或可以在副本上進行的操作,如索引操作,數(shù)據(jù)不僅要發(fā)送到主shard還要發(fā)送到各個副本。single則主要是目標明確的單shard操作,如get操作,根據(jù)doc的id取doc,doc 的id能夠確定它在哪個shard上,因此操作也在此shard上執(zhí)行。
doExecute方法
這些support action的實現(xiàn)可以分為兩類,第一類就是實現(xiàn)一個內(nèi)部類作為異步操作器,子類執(zhí)行doExecute時,初始化該操作器并啟動。另外一種就是直接實現(xiàn)一個方法,子類doExecute方法調(diào)用該方法進行。TransportBroadcastOperationAction就屬于前者,它實現(xiàn)了內(nèi)部操作器AsyncBroadcastAction。TransportCountAction繼承于它,它doExecute方法如下所示:
@Override
protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
request.nowInMillis = System.currentTimeMillis();
super.doExecute(request, listener);
}調(diào)用父類的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的實現(xiàn)如下所示:
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncBroadcastAction(request, listener).start();
}可以看到它初始化了AsyncBroadcastAction并啟動。AsyncBroadcastAction只是確定了操作的流程,及操作完成如何返回response,并未涉及到具體的操作邏輯。因為這些邏輯都在每個子action中實現(xiàn),不同的action需要進行不同的操作。如count需要count每個shard并且返回最后的總數(shù)值,而IndexExistAction則需要對比所有索引查看查詢的索引是否存在。start方法的代碼如下所示:
public void start() {
//沒有shards
if (shardsIts.size() == 0) {
// no shards
try {
listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
} catch (Throwable e) {
listener.onFailure(e);
}
return;
}
request.beforeStart();
// count the local operations, and perform the non local ones
int shardIndex = -1;
//遍歷對每個shards進行操作
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performOperation(shardIt, shard, shardIndex);
} else {
// really, no shards active in this group
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}start方法就是遍歷所有shards,如果shard存在則執(zhí)行performOperation方法,在這個方法中會區(qū)分該請求能否在本機上進行,能執(zhí)行則調(diào)用shardOperation方法得到結(jié)果。這個方法在這是抽象的,每個子類都有實現(xiàn)。否則發(fā)送到對應的主機上。,如果shard為null則進行onOperation操作,遍歷該shard的其它副本看能否找到可以操作的shard。
performOperation代碼
如下所示:
protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
if (shard == null) {//shard 為null拋出異常
// no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地執(zhí)行shardOperation方法,并通過onOperation方法封裝結(jié)果
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
});
} else {//不是本地shard,發(fā)送到對應節(jié)點。
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse newInstance() {
return newShardResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
}
@Override
public void handleException(TransportException e) {
onOperation(shard, shardIt, shardIndex, e);
}
});
}
}
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
}方法shardOperation在countTransportAction的實現(xiàn)如下所示:
@Override
protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
//構(gòu)造查詢context
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
SearchContext context = new DefaultSearchContext(0,
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
try {
// TODO: min score should move to be "null" as a value that is not initialized...
if (request.minScore() != -1) {
context.minimumScore(request.minScore());
}
BytesReference source = request.querySource();
if (source != null && source.length() > 0) {
try {
QueryParseContext.setTypes(request.types());
context.parsedQuery(indexService.queryParserService().parseQuery(source));
} finally {
QueryParseContext.removeTypes();
}
}
final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
boolean terminatedEarly = false;
context.preProcess();
try {
long count;
if (hasTerminateAfterCount) {//調(diào)用lucene的封裝接口執(zhí)行查詢并返回結(jié)果
final Lucene.EarlyTerminatingCollector countCollector =
Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
count = countCollector.count();
} else {
count = Lucene.count(context.searcher(), context.query());
}
return new ShardCountResponse(request.shardId(), count, terminatedEarly);
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "failed to execute count", e);
}
} finally {
// this will also release the index searcher
context.close();
SearchContext.removeCurrent();
}
}可以看到這里是每個action真正的邏輯實現(xiàn)。因為這里涉及到index部分的內(nèi)容,這里就不詳細分析。后面關(guān)于index的分析會有涉及。這就是support action中的第一種實現(xiàn)。
master的相關(guān)操作
第二種就master的相關(guān)操作,因此沒有實現(xiàn)對應的操作類,而只是實現(xiàn)了一個方法。該方法的作用跟操作器作用相同,唯一的不同是它沒有操作器這么多的變量, 而且它不是異步的。master的操作需要實時進行,執(zhí)行過程中需要阻塞某些操作,保證集群狀態(tài)一致性。這里就不再說明,請參考TransportMasterNodeOperationAction原碼。
總結(jié)
本篇概括說了support action,并以countTransportAction為例說明了support Action中的異步操作器實現(xiàn),最后簡單的分析了master的同步操作。因為這里涉及到很多action不可能一一分析,有興趣可以參考對應的代碼。而且這里有以下index部分的內(nèi)容,所以沒有更深入的分析。在后面分析完index的相關(guān)功能后,會挑出幾個重要的action做詳細分析。
以上就是elasticsearch數(shù)據(jù)信息索引操作action support示例分析的詳細內(nèi)容,更多關(guān)于elasticsearch數(shù)據(jù)信息索引操作action support的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
maven坐標Dependencies和Exclusions的使用
這篇文章主要介紹了maven坐標Dependencies和Exclusions的使用,很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
深入Parquet文件格式設(shè)計原理及實現(xiàn)細節(jié)
這篇文章主要介紹了深入Parquet文件格式設(shè)計原理及實現(xiàn)細節(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08
Java和Rust實現(xiàn)JSON序列化互轉(zhuǎn)的解決方案詳解
這篇文章主要為大家詳細介紹了Java和Rust實現(xiàn)JSON序列化互轉(zhuǎn)的解決方案,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學習一下2024-03-03

