elasticsearch數(shù)據(jù)信息索引操作action?support示例分析
抽象類分析
Action這一部分主要是數(shù)據(jù)(索引)的操作和部分集群信息操作。 所有的請求通過client轉(zhuǎn)發(fā)到對應(yīng)的action上然后再由對應(yīng)的TransportAction來執(zhí)行相關(guān)請求。如果請求能在本機(jī)上執(zhí)行則在本機(jī)上執(zhí)行,否則使用Transport進(jìn)行轉(zhuǎn)發(fā)到對應(yīng)的節(jié)點。action support部分是對action的抽象,所有的具體action都繼承了support action中的某個類。這里將對這些抽象類進(jìn)行分析。
這一部分總共分為broadcast(廣播),master,nodes,replication及single幾個部分。broadcast主要針對一些無具體目標(biāo)主機(jī)的操作,如查詢index是否存在,所有繼承這個類的action都具有這種類似的性質(zhì);nodes主要是對節(jié)點的操作,如熱點線程查詢(hotThread)查詢節(jié)點上的繁忙線程;replication的子類主要是需要或可以在副本上進(jìn)行的操作,如索引操作,數(shù)據(jù)不僅要發(fā)送到主shard還要發(fā)送到各個副本。single則主要是目標(biāo)明確的單shard操作,如get操作,根據(jù)doc的id取doc,doc 的id能夠確定它在哪個shard上,因此操作也在此shard上執(zhí)行。
doExecute方法
這些support action的實現(xiàn)可以分為兩類,第一類就是實現(xiàn)一個內(nèi)部類作為異步操作器,子類執(zhí)行doExecute時,初始化該操作器并啟動。另外一種就是直接實現(xiàn)一個方法,子類doExecute方法調(diào)用該方法進(jìn)行。TransportBroadcastOperationAction就屬于前者,它實現(xiàn)了內(nèi)部操作器AsyncBroadcastAction。TransportCountAction繼承于它,它doExecute方法如下所示:
@Override protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) { request.nowInMillis = System.currentTimeMillis(); super.doExecute(request, listener); }
調(diào)用父類的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的實現(xiàn)如下所示:
@Override protected void doExecute(Request request, ActionListener<Response> listener) { new AsyncBroadcastAction(request, listener).start(); }
可以看到它初始化了AsyncBroadcastAction并啟動。AsyncBroadcastAction只是確定了操作的流程,及操作完成如何返回response,并未涉及到具體的操作邏輯。因為這些邏輯都在每個子action中實現(xiàn),不同的action需要進(jìn)行不同的操作。如count需要count每個shard并且返回最后的總數(shù)值,而IndexExistAction則需要對比所有索引查看查詢的索引是否存在。start方法的代碼如下所示:
public void start() { //沒有shards if (shardsIts.size() == 0) { // no shards try { listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState)); } catch (Throwable e) { listener.onFailure(e); } return; } request.beforeStart(); // count the local operations, and perform the non local ones int shardIndex = -1; //遍歷對每個shards進(jìn)行操作 for (final ShardIterator shardIt : shardsIts) { shardIndex++; final ShardRouting shard = shardIt.nextOrNull(); if (shard != null) { performOperation(shardIt, shard, shardIndex); } else { // really, no shards active in this group onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } } }
start方法就是遍歷所有shards,如果shard存在則執(zhí)行performOperation方法,在這個方法中會區(qū)分該請求能否在本機(jī)上進(jìn)行,能執(zhí)行則調(diào)用shardOperation方法得到結(jié)果。這個方法在這是抽象的,每個子類都有實現(xiàn)。否則發(fā)送到對應(yīng)的主機(jī)上。,如果shard為null則進(jìn)行onOperation操作,遍歷該shard的其它副本看能否找到可以操作的shard。
performOperation代碼
如下所示:
protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) { if (shard == null) {//shard 為null拋出異常 // no more active shards... (we should not really get here, just safety) onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } else { try { final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request); if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地執(zhí)行shardOperation方法,并通過onOperation方法封裝結(jié)果 threadPool.executor(executor).execute(new Runnable() { @Override public void run() { try { onOperation(shard, shardIndex, shardOperation(shardRequest)); } catch (Throwable e) { onOperation(shard, shardIt, shardIndex, e); } } }); } else {//不是本地shard,發(fā)送到對應(yīng)節(jié)點。 DiscoveryNode node = nodes.get(shard.currentNodeId()); if (node == null) { // no node connected, act as failure onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId())); } else { transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler<ShardResponse>() { @Override public ShardResponse newInstance() { return newShardResponse(); } @Override public String executor() { return ThreadPool.Names.SAME; } @Override public void handleResponse(ShardResponse response) { onOperation(shard, shardIndex, response); } @Override public void handleException(TransportException e) { onOperation(shard, shardIt, shardIndex, e); } }); } } } catch (Throwable e) { onOperation(shard, shardIt, shardIndex, e); } } }
方法shardOperation在countTransportAction的實現(xiàn)如下所示:
@Override protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException { IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());// IndexShard indexShard = indexService.shardSafe(request.shardId().id()); //構(gòu)造查詢context SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id()); SearchContext context = new DefaultSearchContext(0, new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()), shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()); SearchContext.setCurrent(context); try { // TODO: min score should move to be "null" as a value that is not initialized... if (request.minScore() != -1) { context.minimumScore(request.minScore()); } BytesReference source = request.querySource(); if (source != null && source.length() > 0) { try { QueryParseContext.setTypes(request.types()); context.parsedQuery(indexService.queryParserService().parseQuery(source)); } finally { QueryParseContext.removeTypes(); } } final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER; boolean terminatedEarly = false; context.preProcess(); try { long count; if (hasTerminateAfterCount) {//調(diào)用lucene的封裝接口執(zhí)行查詢并返回結(jié)果 final Lucene.EarlyTerminatingCollector countCollector = Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter()); terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector); count = countCollector.count(); } else { count = Lucene.count(context.searcher(), context.query()); } return new ShardCountResponse(request.shardId(), count, terminatedEarly); } catch (Exception e) { throw new QueryPhaseExecutionException(context, "failed to execute count", e); } } finally { // this will also release the index searcher context.close(); SearchContext.removeCurrent(); } }
可以看到這里是每個action真正的邏輯實現(xiàn)。因為這里涉及到index部分的內(nèi)容,這里就不詳細(xì)分析。后面關(guān)于index的分析會有涉及。這就是support action中的第一種實現(xiàn)。
master的相關(guān)操作
第二種就master的相關(guān)操作,因此沒有實現(xiàn)對應(yīng)的操作類,而只是實現(xiàn)了一個方法。該方法的作用跟操作器作用相同,唯一的不同是它沒有操作器這么多的變量, 而且它不是異步的。master的操作需要實時進(jìn)行,執(zhí)行過程中需要阻塞某些操作,保證集群狀態(tài)一致性。這里就不再說明,請參考TransportMasterNodeOperationAction原碼。
總結(jié)
本篇概括說了support action,并以countTransportAction為例說明了support Action中的異步操作器實現(xiàn),最后簡單的分析了master的同步操作。因為這里涉及到很多action不可能一一分析,有興趣可以參考對應(yīng)的代碼。而且這里有以下index部分的內(nèi)容,所以沒有更深入的分析。在后面分析完index的相關(guān)功能后,會挑出幾個重要的action做詳細(xì)分析。
以上就是elasticsearch數(shù)據(jù)信息索引操作action support示例分析的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch數(shù)據(jù)信息索引操作action support的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
RabbitMQ消息隊列實現(xiàn)延遲任務(wù)示例
這篇文章主要為大家介紹了RabbitMQ消息隊列實現(xiàn)延遲任務(wù)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步早日升職加薪2022-04-04maven坐標(biāo)Dependencies和Exclusions的使用
這篇文章主要介紹了maven坐標(biāo)Dependencies和Exclusions的使用,很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12深入Parquet文件格式設(shè)計原理及實現(xiàn)細(xì)節(jié)
這篇文章主要介紹了深入Parquet文件格式設(shè)計原理及實現(xiàn)細(xì)節(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08Java和Rust實現(xiàn)JSON序列化互轉(zhuǎn)的解決方案詳解
這篇文章主要為大家詳細(xì)介紹了Java和Rust實現(xiàn)JSON序列化互轉(zhuǎn)的解決方案,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-03-03