欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

elasticsearch數(shù)據(jù)信息索引操作action?support示例分析

 更新時間:2022年04月22日 08:30:09   作者:zziawan  
這篇文章主要為大家介紹了elasticsearch數(shù)據(jù)信息索引操作action?support示例分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

抽象類分析

Action這一部分主要是數(shù)據(jù)(索引)的操作和部分集群信息操作。 所有的請求通過client轉(zhuǎn)發(fā)到對應(yīng)的action上然后再由對應(yīng)的TransportAction來執(zhí)行相關(guān)請求。如果請求能在本機(jī)上執(zhí)行則在本機(jī)上執(zhí)行,否則使用Transport進(jìn)行轉(zhuǎn)發(fā)到對應(yīng)的節(jié)點。action support部分是對action的抽象,所有的具體action都繼承了support action中的某個類。這里將對這些抽象類進(jìn)行分析。

這一部分總共分為broadcast(廣播),master,nodes,replication及single幾個部分。broadcast主要針對一些無具體目標(biāo)主機(jī)的操作,如查詢index是否存在,所有繼承這個類的action都具有這種類似的性質(zhì);nodes主要是對節(jié)點的操作,如熱點線程查詢(hotThread)查詢節(jié)點上的繁忙線程;replication的子類主要是需要或可以在副本上進(jìn)行的操作,如索引操作,數(shù)據(jù)不僅要發(fā)送到主shard還要發(fā)送到各個副本。single則主要是目標(biāo)明確的單shard操作,如get操作,根據(jù)doc的id取doc,doc 的id能夠確定它在哪個shard上,因此操作也在此shard上執(zhí)行。

doExecute方法

這些support action的實現(xiàn)可以分為兩類,第一類就是實現(xiàn)一個內(nèi)部類作為異步操作器,子類執(zhí)行doExecute時,初始化該操作器并啟動。另外一種就是直接實現(xiàn)一個方法,子類doExecute方法調(diào)用該方法進(jìn)行。TransportBroadcastOperationAction就屬于前者,它實現(xiàn)了內(nèi)部操作器AsyncBroadcastAction。TransportCountAction繼承于它,它doExecute方法如下所示:

@Override
    protected void doExecute(CountRequest request, ActionListener<CountResponse> listener) {
        request.nowInMillis = System.currentTimeMillis();
        super.doExecute(request, listener);
    }

調(diào)用父類的doExecute方法,也就是TransportBroadcastOperationAction的方法,它的實現(xiàn)如下所示:

@Override
    protected void doExecute(Request request, ActionListener&lt;Response&gt; listener) {
        new AsyncBroadcastAction(request, listener).start();
    }

可以看到它初始化了AsyncBroadcastAction并啟動。AsyncBroadcastAction只是確定了操作的流程,及操作完成如何返回response,并未涉及到具體的操作邏輯。因為這些邏輯都在每個子action中實現(xiàn),不同的action需要進(jìn)行不同的操作。如count需要count每個shard并且返回最后的總數(shù)值,而IndexExistAction則需要對比所有索引查看查詢的索引是否存在。start方法的代碼如下所示:

public void start() {
      //沒有shards
            if (shardsIts.size() == 0) {
                // no shards
                try {
                    listener.onResponse(newResponse(request, new AtomicReferenceArray(0), clusterState));
                } catch (Throwable e) {
                    listener.onFailure(e);
                }
                return;
            }
            request.beforeStart();
            // count the local operations, and perform the non local ones
            int shardIndex = -1;
       //遍歷對每個shards進(jìn)行操作
            for (final ShardIterator shardIt : shardsIts) {
                shardIndex++;
                final ShardRouting shard = shardIt.nextOrNull();
                if (shard != null) {
                    performOperation(shardIt, shard, shardIndex);
                } else {
                    // really, no shards active in this group
                    onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                }
            }
        }

start方法就是遍歷所有shards,如果shard存在則執(zhí)行performOperation方法,在這個方法中會區(qū)分該請求能否在本機(jī)上進(jìn)行,能執(zhí)行則調(diào)用shardOperation方法得到結(jié)果。這個方法在這是抽象的,每個子類都有實現(xiàn)。否則發(fā)送到對應(yīng)的主機(jī)上。,如果shard為null則進(jìn)行onOperation操作,遍歷該shard的其它副本看能否找到可以操作的shard。

performOperation代碼

如下所示:

protected void performOperation(final ShardIterator shardIt, final ShardRouting shard, final int shardIndex) {
            if (shard == null) {//shard 為null拋出異常
                // no more active shards... (we should not really get here, just safety)
                onOperation(null, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
            } else {
                try {
                    final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
                    if (shard.currentNodeId().equals(nodes.localNodeId())) {//shard在本地執(zhí)行shardOperation方法,并通過onOperation方法封裝結(jié)果
                        threadPool.executor(executor).execute(new Runnable() {
                            @Override
                            public void run() {
                                try {
                                    onOperation(shard, shardIndex, shardOperation(shardRequest));
                                } catch (Throwable e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            }
                        });
                    } else {//不是本地shard,發(fā)送到對應(yīng)節(jié)點。
                        DiscoveryNode node = nodes.get(shard.currentNodeId());
                        if (node == null) {
                            // no node connected, act as failure
                            onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
                        } else {
                            transportService.sendRequest(node, transportShardAction, shardRequest, new BaseTransportResponseHandler&lt;ShardResponse&gt;() {
                                @Override
                                public ShardResponse newInstance() {
                                    return newShardResponse();
                                }
                                @Override
                                public String executor() {
                                    return ThreadPool.Names.SAME;
                                }
                                @Override
                                public void handleResponse(ShardResponse response) {
                                    onOperation(shard, shardIndex, response);
                                }
                                @Override
                                public void handleException(TransportException e) {
                                    onOperation(shard, shardIt, shardIndex, e);
                                }
                            });
                        }
                    }
                } catch (Throwable e) {
                    onOperation(shard, shardIt, shardIndex, e);
                }
            }
        }

方法shardOperation在countTransportAction的實現(xiàn)如下所示:

@Override
    protected ShardCountResponse shardOperation(ShardCountRequest request) throws ElasticsearchException {
        IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());//
        IndexShard indexShard = indexService.shardSafe(request.shardId().id());
    //構(gòu)造查詢context
        SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
        SearchContext context = new DefaultSearchContext(0,
                new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
                shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
                scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
        SearchContext.setCurrent(context);
        try {
            // TODO: min score should move to be "null" as a value that is not initialized...
            if (request.minScore() != -1) {
                context.minimumScore(request.minScore());
            }
            BytesReference source = request.querySource();
            if (source != null &amp;&amp; source.length() &gt; 0) {
                try {
                    QueryParseContext.setTypes(request.types());
                    context.parsedQuery(indexService.queryParserService().parseQuery(source));
                } finally {
                    QueryParseContext.removeTypes();
                }
            }
            final boolean hasTerminateAfterCount = request.terminateAfter() != DEFAULT_TERMINATE_AFTER;
            boolean terminatedEarly = false;
            context.preProcess();
            try {
                long count;
                if (hasTerminateAfterCount) {//調(diào)用lucene的封裝接口執(zhí)行查詢并返回結(jié)果
                    final Lucene.EarlyTerminatingCollector countCollector =
                            Lucene.createCountBasedEarlyTerminatingCollector(request.terminateAfter());
                    terminatedEarly = Lucene.countWithEarlyTermination(context.searcher(), context.query(), countCollector);
                    count = countCollector.count();
                } else {
                    count = Lucene.count(context.searcher(), context.query());
                }
                return new ShardCountResponse(request.shardId(), count, terminatedEarly);
            } catch (Exception e) {
                throw new QueryPhaseExecutionException(context, "failed to execute count", e);
            }
        } finally {
            // this will also release the index searcher
            context.close();
            SearchContext.removeCurrent();
        }
    }

可以看到這里是每個action真正的邏輯實現(xiàn)。因為這里涉及到index部分的內(nèi)容,這里就不詳細(xì)分析。后面關(guān)于index的分析會有涉及。這就是support action中的第一種實現(xiàn)。

master的相關(guān)操作

第二種就master的相關(guān)操作,因此沒有實現(xiàn)對應(yīng)的操作類,而只是實現(xiàn)了一個方法。該方法的作用跟操作器作用相同,唯一的不同是它沒有操作器這么多的變量, 而且它不是異步的。master的操作需要實時進(jìn)行,執(zhí)行過程中需要阻塞某些操作,保證集群狀態(tài)一致性。這里就不再說明,請參考TransportMasterNodeOperationAction原碼。

總結(jié)

本篇概括說了support action,并以countTransportAction為例說明了support Action中的異步操作器實現(xiàn),最后簡單的分析了master的同步操作。因為這里涉及到很多action不可能一一分析,有興趣可以參考對應(yīng)的代碼。而且這里有以下index部分的內(nèi)容,所以沒有更深入的分析。在后面分析完index的相關(guān)功能后,會挑出幾個重要的action做詳細(xì)分析。

以上就是elasticsearch數(shù)據(jù)信息索引操作action support示例分析的詳細(xì)內(nèi)容,更多關(guān)于elasticsearch數(shù)據(jù)信息索引操作action support的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評論