elasticsearch數(shù)據(jù)信息索引操作action?support示例分析
抽象類分析
Action這一部分主要是數(shù)據(jù)(索引)的操作和部分集群信息操作。 所有的請(qǐng)求通過(guò)client轉(zhuǎn)發(fā)到對(duì)應(yīng)的action上然后再由對(duì)應(yīng)的TransportAction來(lái)執(zhí)行相關(guān)請(qǐng)求。如果請(qǐng)求能在本機(jī)上執(zhí)行則在本機(jī)上執(zhí)行,否則使用Transport進(jìn)行轉(zhuǎn)發(fā)到對(duì)應(yīng)的節(jié)點(diǎn)。action support部分是對(duì)action的抽象,所有的具體action都繼承了support action中的某個(gè)類。這里將對(duì)這些抽象類進(jìn)行分析。
這一部分總共分為broadcast(廣播),master,nodes,replication及single幾個(gè)部分。broadcast主要針對(duì)一些無(wú)具體目標(biāo)主機(jī)的操作,如查詢index是否存在,所有繼承這個(gè)類的action都具有這種類似的性質(zhì);nodes主要是對(duì)節(jié)點(diǎn)的操作,如熱點(diǎn)線程查詢(hotThread)查詢節(jié)點(diǎn)上的繁忙線程;replication的子類主要是需要或可以在副本上進(jìn)行的操作,如索引操作,數(shù)據(jù)不僅要發(fā)送到主shard還要發(fā)送到各個(gè)副本。single則主要是目標(biāo)明確的單shard操作,如get操作,根據(jù)doc的id取doc,doc 的id能夠確定它在哪個(gè)shard上,因此操作也在此shard上執(zhí)行。
doExecute方法
這些support action的實(shí)現(xiàn)可以分為兩類,第一類就是實(shí)現(xiàn)一個(gè)內(nèi)部類作為異步操作器,子類執(zhí)行doExecute時(shí),初始化該操作器并啟動(dòng)。另外一種就是直接實(shí)現(xiàn)一個(gè)方法,子類doExecute方法調(diào)用該方法進(jìn)行。TransportBroadcastOperationAction就屬于前者,它實(shí)現(xiàn)了內(nèi)部操作器AsyncBroadcastAction。TransportCountAction繼承于它,它doExecute方法如下所示:
@Override
protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
request.nowInMillis = System.currentTimeMillis();
super.doExecute(request, listener);
}調(diào)用父類的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的實(shí)現(xiàn)如下所示:
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
new AsyncBroadcastAction(request, listener).start();
}可以看到它初始化了AsyncBroadcastAction并啟動(dòng)。AsyncBroadcastAction只是確定了操作的流程,及操作完成如何返回response,并未涉及到具體的操作邏輯。因?yàn)檫@些邏輯都在每個(gè)子action中實(shí)現(xiàn),不同的action需要進(jìn)行不同的操作。如count需要count每個(gè)shard并且返回最后的總數(shù)值,而IndexExistAction則需要對(duì)比所有索引查看查詢的索引是否存在。start方法的代碼如下所示:
public void start() {
//沒(méi)有shards
if (shardsIts.size() == 0) {
// no shards
try {
listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
} catch (Throwable e) {
listener.onFailure(e);
}
return;
}
request.beforeStart();
// count the local operations, and perform the non local ones
int shardIndex = -1;
//遍歷對(duì)每個(gè)shards進(jìn)行操作
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performOperation(shardIt, shard, shardIndex);
} else {
// really, no shards active in this group
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
}
}
}start方法就是遍歷所有shards,如果shard存在則執(zhí)行performOperation方法,在這個(gè)方法中會(huì)區(qū)分該請(qǐng)求能否在本機(jī)上進(jìn)行,能執(zhí)行則調(diào)用shardOperation方法得到結(jié)果。這個(gè)方法在這是抽象的,每個(gè)子類都有實(shí)現(xiàn)。否則發(fā)送到對(duì)應(yīng)的主機(jī)上。,如果shard為null則進(jìn)行onOperation操作,遍歷該shard的其它副本看能否找到可以操作的shard。
performOperation代碼
如下所示:
protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
if (shard == null) {//shard 為null拋出異常
// no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
try {
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地執(zhí)行shardOperation方法,并通過(guò)onOperation方法封裝結(jié)果
threadPool.executor(executor).execute(new Runnable() {
@Override
public void run() {
try {
onOperation(shard, shardIndex, shardOperation(shardRequest));
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
});
} else {//不是本地shard,發(fā)送到對(duì)應(yīng)節(jié)點(diǎn)。
DiscoveryNode node = nodes.get(shard.currentNodeId());
if (node == null) {
// no node connected, act as failure
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
} else {
transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() {
@Override
public ShardResponse newInstance() {
return newShardResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ShardResponse response) {
onOperation(shard, shardIndex, response);
}
@Override
public void handleException(TransportException e) {
onOperation(shard, shardIt, shardIndex, e);
}
});
}
}
} catch (Throwable e) {
onOperation(shard, shardIt, shardIndex, e);
}
}
}方法shardOperation在countTransportAction的實(shí)現(xiàn)如下所示:
@Override
protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
IndexShard indexShard = indexService.shardSafe(request.shardId().id());
//構(gòu)造查詢context
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
SearchContext context = new DefaultSearchContext(0,
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
try {
// TODO: min score should move to be "null" as a value that is not initialized...
if (request.minScore() != -1) {
context.minimumScore(request.minScore());
}
BytesReference source = request.querySource();
if (source != null && source.length() > 0) {
try {
QueryParseContext.setTypes(request.types());
context.parsedQuery(indexService.queryParserService().parseQuery(source));
} finally {
QueryParseContext.removeTypes();
}
}
final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
boolean terminatedEarly = false;
context.preProcess();
try {
long count;
if (hasTerminateAfterCount) {//調(diào)用lucene的封裝接口執(zhí)行查詢并返回結(jié)果
final Lucene.EarlyTerminatingCollector countCollector =
Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
count = countCollector.count();
} else {
count = Lucene.count(context.searcher(), context.query());
}
return new ShardCountResponse(request.shardId(), count, terminatedEarly);
} catch (Exception e) {
throw new QueryPhaseExecutionException(context, "failed to execute count", e);
}
} finally {
// this will also release the index searcher
context.close();
SearchContext.removeCurrent();
}
}可以看到這里是每個(gè)action真正的邏輯實(shí)現(xiàn)。因?yàn)檫@里涉及到index部分的內(nèi)容,這里就不詳細(xì)分析。后面關(guān)于index的分析會(huì)有涉及。這就是support action中的第一種實(shí)現(xiàn)。
master的相關(guān)操作
第二種就master的相關(guān)操作,因此沒(méi)有實(shí)現(xiàn)對(duì)應(yīng)的操作類,而只是實(shí)現(xiàn)了一個(gè)方法。該方法的作用跟操作器作用相同,唯一的不同是它沒(méi)有操作器這么多的變量, 而且它不是異步的。master的操作需要實(shí)時(shí)進(jìn)行,執(zhí)行過(guò)程中需要阻塞某些操作,保證集群狀態(tài)一致性。這里就不再說(shuō)明,請(qǐng)參考TransportMasterNodeOperationAction原碼。
總結(jié)
本篇概括說(shuō)了support action,并以countTransportAction為例說(shuō)明了support Action中的異步操作器實(shí)現(xiàn),最后簡(jiǎn)單的分析了master的同步操作。因?yàn)檫@里涉及到很多action不可能一一分析,有興趣可以參考對(duì)應(yīng)的代碼。而且這里有以下index部分的內(nèi)容,所以沒(méi)有更深入的分析。在后面分析完index的相關(guān)功能后,會(huì)挑出幾個(gè)重要的action做詳細(xì)分析。
以上就是elasticsearch數(shù)據(jù)信息索引操作action support示例分析的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch數(shù)據(jù)信息索引操作action support的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
RabbitMQ消息隊(duì)列實(shí)現(xiàn)延遲任務(wù)示例
這篇文章主要為大家介紹了RabbitMQ消息隊(duì)列實(shí)現(xiàn)延遲任務(wù)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04
Java與Unix時(shí)間戳的相互轉(zhuǎn)換詳解
這篇文章主要為大家詳細(xì)介紹了Java與Unix時(shí)間戳的相互轉(zhuǎn)換,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-12-12
maven坐標(biāo)Dependencies和Exclusions的使用
這篇文章主要介紹了maven坐標(biāo)Dependencies和Exclusions的使用,很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
深入Parquet文件格式設(shè)計(jì)原理及實(shí)現(xiàn)細(xì)節(jié)
這篇文章主要介紹了深入Parquet文件格式設(shè)計(jì)原理及實(shí)現(xiàn)細(xì)節(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
Java和Rust實(shí)現(xiàn)JSON序列化互轉(zhuǎn)的解決方案詳解
這篇文章主要為大家詳細(xì)介紹了Java和Rust實(shí)現(xiàn)JSON序列化互轉(zhuǎn)的解決方案,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-03-03

