欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

詳解Java ES多節(jié)點(diǎn)任務(wù)的高效分發(fā)與收集實(shí)現(xiàn)

 更新時(shí)間:2021年06月30日 14:37:18   作者:等你歸去來(lái)  
ElasticSearch 是一個(gè)高可用開(kāi)源全文檢索和分析組件。提供存儲(chǔ)服務(wù),搜索服務(wù),大數(shù)據(jù)準(zhǔn)實(shí)時(shí)分析等。一般用于提供一些提供復(fù)雜搜索的應(yīng)用

一、概述

我們知道,當(dāng)我們對(duì)es發(fā)起search請(qǐng)求或其他操作時(shí),往往都是隨機(jī)選擇一個(gè)coordinator發(fā)起請(qǐng)求。而這請(qǐng)求,可能是該節(jié)點(diǎn)能處理,也可能是該節(jié)點(diǎn)不能處理的,也可能是需要多節(jié)點(diǎn)共同處理的,可以說(shuō)是情況比較復(fù)雜。

所以,coordinator的重要工作是,做請(qǐng)求分發(fā)與結(jié)果收集。那么,如何高性能和安全準(zhǔn)確地實(shí)現(xiàn)這一功能則至關(guān)重要。

二、請(qǐng)求分發(fā)的簡(jiǎn)單思路

我們這里所說(shuō)的請(qǐng)求分發(fā),一般是針對(duì)多個(gè)網(wǎng)絡(luò)節(jié)點(diǎn)而言的。那么,如何將請(qǐng)求發(fā)往多節(jié)點(diǎn),并在最終將結(jié)果合并起來(lái)呢?

同步請(qǐng)求各節(jié)點(diǎn),當(dāng)?shù)谝粋€(gè)節(jié)點(diǎn)響應(yīng)后,再向第二個(gè)節(jié)點(diǎn)發(fā)起請(qǐng)求,以此類推,直到所有節(jié)點(diǎn)請(qǐng)求完成,然后再將結(jié)果聚合起來(lái)。就完成了需求了,不費(fèi)吹灰之力。簡(jiǎn)單不?

無(wú)腦處理自有無(wú)腦處理的缺點(diǎn)。依次請(qǐng)求各節(jié)點(diǎn),無(wú)法很好利用系統(tǒng)的分布式特點(diǎn),變并行為串行了,好不厲害。另外,對(duì)于當(dāng)前請(qǐng)求,當(dāng)其未處理完成這所有節(jié)點(diǎn)的分發(fā)收集工作時(shí),當(dāng)前線程將會(huì)一直被占用。從而,下游請(qǐng)求將無(wú)法再接入,從而將你了并發(fā)能力,使其與線程池大小同日而語(yǔ)了。這可不好。

我們依次想辦法優(yōu)化下。

首先,我們可以將串行分發(fā)請(qǐng)求變成并行分發(fā),即可以使用多線程,向多節(jié)點(diǎn)發(fā)起請(qǐng)求,當(dāng)某線程處理完成時(shí),就返回結(jié)果。使用類似于CountDownLatch的同步工具,保證所有節(jié)點(diǎn)都處理完成后,再由外單主線程進(jìn)行結(jié)果合并操作。

以上優(yōu)化,看起來(lái)不錯(cuò),避免了同步的性能問(wèn)題。但是,當(dāng)有某個(gè)節(jié)點(diǎn)響應(yīng)非常慢時(shí),它將阻塞后續(xù)節(jié)點(diǎn)的工作,從而使整個(gè)請(qǐng)求變慢,從而同樣變成線程池的大小即是并發(fā)能力的瓶頸??梢哉f(shuō),治標(biāo)不治本。

再來(lái),繼續(xù)優(yōu)化。我們可以釋放掉主線程的持有,讓每個(gè)分發(fā)線程處理完成當(dāng)前任務(wù)時(shí),都去檢查任務(wù)隊(duì)列,是否已完成。如果未完成則忽略,如果已完成,則啟動(dòng)合并任務(wù)。

看起來(lái)不錯(cuò),已經(jīng)有完全并發(fā)樣子了。但還能不能再優(yōu)化?各節(jié)點(diǎn)的分發(fā),同樣是同步請(qǐng)求,雖然處理簡(jiǎn)單,但在這server響應(yīng)期間,該線程仍是無(wú)法被使用的,如果類似請(qǐng)求過(guò)多,則必然是不小的消耗。如果能將單節(jié)點(diǎn)的請(qǐng)求,能夠做到異步處理,那樣豈不完美?但這恐怕不好做吧!不過(guò),終歸是一個(gè)不錯(cuò)的想法了。

三、es中search的多節(jié)點(diǎn)分發(fā)收集

我們以search的分發(fā)收集為出發(fā)點(diǎn),觀看es如何辦成這件。原因是search在es中最為普遍與經(jīng)典,雖說(shuō)不得每個(gè)地方實(shí)現(xiàn)都一樣,但至少參考意義還是有的。故以search為切入點(diǎn)。search的框架工作流程,我們之前已經(jīng)研究過(guò),本節(jié)就直接以核心開(kāi)始講解,它是在 TransportSearchAction.executeRequest() 中的。

// org.elasticsearch.action.search.TransportSearchAction#executeRequest
    private void executeRequest(Task task, SearchRequest searchRequest,
                                SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
        final long relativeStartNanos = System.nanoTime();
        final SearchTimeProvider timeProvider =
            new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
        ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
            if (source != searchRequest.source()) {
                // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
                // situations when source is rewritten to null due to a bug
                searchRequest.source(source);
            }
            final ClusterState clusterState = clusterService.state();
            final SearchContextId searchContext;
            final Map<String, OriginalIndices> remoteClusterIndices;
            if (searchRequest.pointInTimeBuilder() != null) {
                searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
                remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
            } else {
                searchContext = null;
                remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
                    searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState));
            }
            OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
            if (remoteClusterIndices.isEmpty()) {
                executeLocalSearch(
                    task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
            } else {
                // 多節(jié)點(diǎn)數(shù)據(jù)請(qǐng)求
                if (shouldMinimizeRoundtrips(searchRequest)) {
                    // 通過(guò) parentTaskId 關(guān)聯(lián)所有子任務(wù)
                    final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
                    ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
                        searchService.aggReduceContextBuilder(searchRequest),
                        remoteClusterService, threadPool, listener,
                        (r, l) -> executeLocalSearch(
                            task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
                } else {
                    AtomicInteger skippedClusters = new AtomicInteger(0);
                    // 直接分發(fā)多shard請(qǐng)求到各節(jié)點(diǎn)
                    collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
                        skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
                        ActionListener.wrap(
                            searchShardsResponses -> {
                                // 當(dāng)所有節(jié)點(diǎn)都響應(yīng)后,再做后續(xù)邏輯處理,即此處的后置監(jiān)聽(tīng)
                                final BiFunction<String, String, DiscoveryNode> clusterNodeLookup =
                                    getRemoteClusterNodeLookup(searchShardsResponses);
                                final Map<String, AliasFilter> remoteAliasFilters;
                                final List<SearchShardIterator> remoteShardIterators;
                                if (searchContext != null) {
                                    remoteAliasFilters = searchContext.aliasFilter();
                                    remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
                                        searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
                                } else {
                                    remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
                                    remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
                                        remoteAliasFilters);
                                }
                                int localClusters = localIndices == null ? 0 : 1;
                                int totalClusters = remoteClusterIndices.size() + localClusters;
                                int successfulClusters = searchShardsResponses.size() + localClusters;
                                // 至于后續(xù)搜索實(shí)現(xiàn)如何,不在此間
                                executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
                                    clusterNodeLookup, clusterState, remoteAliasFilters, listener,
                                    new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
                                    searchContext, searchAsyncActionProvider);
                            },
                            listener::onFailure));
                }
            }
        }, listener::onFailure);
        if (searchRequest.source() == null) {
            rewriteListener.onResponse(searchRequest.source());
        } else {
            Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
                rewriteListener);
        }
    }

可以看到,es的search功能,會(huì)被劃分為幾種類型,有點(diǎn)會(huì)走集群分發(fā),而有的則不需要。我們自然是希望走集群分發(fā)的,所以,只需看 collectSearchShards() 即可。這里面其實(shí)就是對(duì)多個(gè)集群節(jié)點(diǎn)的依次請(qǐng)求,當(dāng)然還有結(jié)果收集。

// org.elasticsearch.action.search.TransportSearchAction#collectSearchShards
static void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, AtomicInteger skippedClusters,
                                Map<String, OriginalIndices> remoteIndicesByCluster, RemoteClusterService remoteClusterService,
                                ThreadPool threadPool, ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
    // 使用該計(jì)數(shù)器進(jìn)行結(jié)果控制
    final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
    final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
    final AtomicReference<Exception> exceptions = new AtomicReference<>();
    // 迭代各節(jié)點(diǎn),依次發(fā)送請(qǐng)求
    for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
        final String clusterAlias = entry.getKey();
        boolean skipUnavailable = remoteClusterService.isSkipUnavailable(clusterAlias);
        Client clusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
        final String[] indices = entry.getValue().indices();
        ClusterSearchShardsRequest searchShardsRequest = new ClusterSearchShardsRequest(indices)
            .indicesOptions(indicesOptions).local(true).preference(preference).routing(routing);
        // 向集群中 clusterAlias 異步發(fā)起請(qǐng)求處理 search
        clusterClient.admin().cluster().searchShards(searchShardsRequest,
            new CCSActionListener<ClusterSearchShardsResponse, Map<String, ClusterSearchShardsResponse>>(
                clusterAlias, skipUnavailable, responsesCountDown, skippedClusters, exceptions, listener) {
                @Override
                void innerOnResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                    // 每次單節(jié)點(diǎn)響應(yīng)時(shí),將結(jié)果存放到 searchShardsResponses 中
                    searchShardsResponses.put(clusterAlias, clusterSearchShardsResponse);
                }

                @Override
                Map<String, ClusterSearchShardsResponse> createFinalResponse() {
                    // 所有節(jié)點(diǎn)都返回時(shí),將結(jié)果集返回
                    return searchShardsResponses;
                }
            }
        );
    }
}
// org.elasticsearch.client.support.AbstractClient.ClusterAdmin#searchShards
@Override
public void searchShards(final ClusterSearchShardsRequest request, final ActionListener<ClusterSearchShardsResponse> listener) {
    // 發(fā)起請(qǐng)求 indices:admin/shards/search_shards, 其對(duì)應(yīng)處理器為 TransportClusterSearchShardsAction
    execute(ClusterSearchShardsAction.INSTANCE, request, listener);
}

以上是es向集群中多節(jié)點(diǎn)發(fā)起請(qǐng)求的過(guò)程,其重點(diǎn)在于所有的請(qǐng)求都是異步請(qǐng)求,即向各節(jié)點(diǎn)發(fā)送完成請(qǐng)求后,當(dāng)前線程即為斷開(kāi)狀態(tài)。這就體現(xiàn)了無(wú)阻塞的能力了,以listner形式進(jìn)行處理后續(xù)業(yè)務(wù)。這對(duì)于發(fā)送自然沒(méi)有問(wèn)題,但如何進(jìn)行結(jié)果收集呢?實(shí)際上就是通過(guò)listner來(lái)處理的。在遠(yuǎn)程節(jié)點(diǎn)響應(yīng)后,listener.onResponse()將被調(diào)用。

3.1、多節(jié)點(diǎn)響應(yīng)結(jié)果處理

這是我們本文討論的重點(diǎn)。前面我們看到es已經(jīng)異步發(fā)送請(qǐng)求出去了(且不論其如何發(fā)送),所以如何收集結(jié)果也很關(guān)鍵。而es中的做法則很簡(jiǎn)單,使用一個(gè) ConcurrentHashMap 收集每個(gè)結(jié)果,一個(gè)CountDown標(biāo)識(shí)是否已處理完成。

// org.elasticsearch.action.search.TransportSearchAction.CCSActionListener#CCSActionListener
CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters,
                    AtomicReference<Exception> exceptions, ActionListener<FinalResponse> originalListener) {
    this.clusterAlias = clusterAlias;
    this.skipUnavailable = skipUnavailable;
    this.countDown = countDown;
    this.skippedClusters = skippedClusters;
    this.exceptions = exceptions;
    this.originalListener = originalListener;
}

// 成功時(shí)的響應(yīng)
@Override
public final void onResponse(Response response) {
    // inner響應(yīng)為將結(jié)果放入 searchShardsResponses 中
    innerOnResponse(response);
    // maybeFinish 則進(jìn)行結(jié)果是否完成判定,如果完成,則調(diào)用回調(diào)方法,構(gòu)造結(jié)果
    maybeFinish();
}

private void maybeFinish() {
    // 使用一個(gè) AtomicInteger 進(jìn)行控制
    if (countDown.countDown()) {
        Exception exception = exceptions.get();
        if (exception == null) {
            FinalResponse response;
            try {
                // 創(chuàng)建響應(yīng)結(jié)果,此處 search 即為 searchShardsResponses
                response = createFinalResponse();
            } catch(Exception e) {
                originalListener.onFailure(e);
                return;
            }
            // 成功響應(yīng)回調(diào),實(shí)現(xiàn)結(jié)果收集后的其他業(yè)務(wù)處理
            originalListener.onResponse(response);
        } else {
            originalListener.onFailure(exceptions.get());
        }
    }
}
// CountDown 實(shí)現(xiàn)比較簡(jiǎn)單,只有最后一個(gè)返回true, 其他皆為false, 即實(shí)現(xiàn)了 At Most Once 語(yǔ)義
/**
* Decrements the count-down and returns <code>true</code> iff this call
* reached zero otherwise <code>false</code>
*/
public boolean countDown() {
    assert originalCount > 0;
    for (;;) {
        final int current = countDown.get();
        assert current >= 0;
        if (current == 0) {
            return false;
        }
        if (countDown.compareAndSet(current, current - 1)) {
            return current == 1;
        }
    }
}

可見(jiàn),ES中的結(jié)果收集,是以一個(gè) AtomicInteger 實(shí)現(xiàn)的CountDown來(lái)處理的,當(dāng)所有節(jié)點(diǎn)都響應(yīng)時(shí),就處理最終結(jié)果,否則將每個(gè)節(jié)點(diǎn)的數(shù)據(jù)放入ConcurrentHashMap中暫存起來(lái)。

而通過(guò)一個(gè)Client通用的異步調(diào)用框架,實(shí)現(xiàn)多節(jié)點(diǎn)的異步提交。整個(gè)節(jié)點(diǎn)響應(yīng)以 CCSActionListener 作為接收者??梢哉f(shuō)是比較簡(jiǎn)潔的了,好像也沒(méi)有我們前面討論的復(fù)雜性。因?yàn)椋捍蟮乐梁?jiǎn)。

3.2、異步提交請(qǐng)求實(shí)現(xiàn)

我們知道,如果本地想實(shí)現(xiàn)異步提交請(qǐng)求,只需使用另一個(gè)線程或者線程池技術(shù),即可實(shí)現(xiàn)。而對(duì)于遠(yuǎn)程Client的異步提交,則還需要借助于外部工具了。此處借助于Netty的channel.write()實(shí)現(xiàn),節(jié)點(diǎn)響應(yīng)時(shí)再回調(diào)回來(lái),從而恢復(fù)上下文。整個(gè)過(guò)程,沒(méi)有一點(diǎn)阻塞同步,從而達(dá)到了高效的處理能力,當(dāng)然還有其他的一些異常處理,自不必說(shuō)。

具體樣例大致如下:因最終的處理器是以 TransportClusterSearchShardsAction 進(jìn)行處理的,所以直接轉(zhuǎn)到 TransportClusterSearchShardsAction。

// org.elasticsearch.action.admin.cluster.shards.TransportClusterSearchShardsAction
public class TransportClusterSearchShardsAction extends
    TransportMasterNodeReadAction<ClusterSearchShardsRequest, ClusterSearchShardsResponse> {

    private final IndicesService indicesService;

    @Inject
    public TransportClusterSearchShardsAction(TransportService transportService, ClusterService clusterService,
                                              IndicesService indicesService, ThreadPool threadPool, ActionFilters actionFilters,
                                              IndexNameExpressionResolver indexNameExpressionResolver) {
        super(ClusterSearchShardsAction.NAME, transportService, clusterService, threadPool, actionFilters,
            ClusterSearchShardsRequest::new, indexNameExpressionResolver, ClusterSearchShardsResponse::new, ThreadPool.Names.SAME);
        this.indicesService = indicesService;
    }

    @Override
    protected ClusterBlockException checkBlock(ClusterSearchShardsRequest request, ClusterState state) {
        return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ,
                indexNameExpressionResolver.concreteIndexNames(state, request));
    }

    @Override
    protected void masterOperation(final ClusterSearchShardsRequest request, final ClusterState state,
                                   final ActionListener<ClusterSearchShardsResponse> listener) {
        ClusterState clusterState = clusterService.state();
        String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request);
        Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(state, request.routing(), request.indices());
        Map<String, AliasFilter> indicesAndFilters = new HashMap<>();
        Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
        for (String index : concreteIndices) {
            final AliasFilter aliasFilter = indicesService.buildAliasFilter(clusterState, index, indicesAndAliases);
            final String[] aliases = indexNameExpressionResolver.indexAliases(clusterState, index, aliasMetadata -> true, true,
                indicesAndAliases);
            indicesAndFilters.put(index, new AliasFilter(aliasFilter.getQueryBuilder(), aliases));
        }

        Set<String> nodeIds = new HashSet<>();
        GroupShardsIterator<ShardIterator> groupShardsIterator = clusterService.operationRouting()
            .searchShards(clusterState, concreteIndices, routingMap, request.preference());
        ShardRouting shard;
        ClusterSearchShardsGroup[] groupResponses = new ClusterSearchShardsGroup[groupShardsIterator.size()];
        int currentGroup = 0;
        for (ShardIterator shardIt : groupShardsIterator) {
            ShardId shardId = shardIt.shardId();
            ShardRouting[] shardRoutings = new ShardRouting[shardIt.size()];
            int currentShard = 0;
            shardIt.reset();
            while ((shard = shardIt.nextOrNull()) != null) {
                shardRoutings[currentShard++] = shard;
                nodeIds.add(shard.currentNodeId());
            }
            groupResponses[currentGroup++] = new ClusterSearchShardsGroup(shardId, shardRoutings);
        }
        DiscoveryNode[] nodes = new DiscoveryNode[nodeIds.size()];
        int currentNode = 0;
        for (String nodeId : nodeIds) {
            nodes[currentNode++] = clusterState.getNodes().get(nodeId);
        }
        listener.onResponse(new ClusterSearchShardsResponse(groupResponses, nodes, indicesAndFilters));
    }
}
// doExecute 在父類中完成
// org.elasticsearch.action.support.master.TransportMasterNodeAction#doExecute
@Override
protected void doExecute(Task task, final Request request, ActionListener<Response> listener) {
    ClusterState state = clusterService.state();
    logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version());
    if (task != null) {
        request.setParentTask(clusterService.localNode().getId(), task.getId());
    }
    new AsyncSingleAction(task, request, listener).doStart(state);
}

// org.elasticsearch.action.support.master.TransportMasterNodeAction.AsyncSingleAction#doStart
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
    this.task = task;
    this.request = request;
    this.listener = listener;
    this.startTime = threadPool.relativeTimeInMillis();
}

protected void doStart(ClusterState clusterState) {
    try {
        final DiscoveryNodes nodes = clusterState.nodes();
        if (nodes.isLocalNodeElectedMaster() || localExecute(request)) {
            // check for block, if blocked, retry, else, execute locally
            final ClusterBlockException blockException = checkBlock(request, clusterState);
            if (blockException != null) {
                if (!blockException.retryable()) {
                    listener.onFailure(blockException);
                } else {
                    logger.debug("can't execute due to a cluster block, retrying", blockException);
                    // 重試處理
                    retry(clusterState, blockException, newState -> {
                        try {
                            ClusterBlockException newException = checkBlock(request, newState);
                            return (newException == null || !newException.retryable());
                        } catch (Exception e) {
                            // accept state as block will be rechecked by doStart() and listener.onFailure() then called
                            logger.trace("exception occurred during cluster block checking, accepting state", e);
                            return true;
                        }
                    });
                }
            } else {
                ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
                    if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
                        logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
                            "stepped down before publishing action [{}], scheduling a retry", actionName), t);
                        retryOnMasterChange(clusterState, t);
                    } else {
                        delegatedListener.onFailure(t);
                    }
                });
                // 本地節(jié)點(diǎn)執(zhí)行結(jié)果,直接以異步線程處理即可
                threadPool.executor(executor)
                    .execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));
            }
        } else {
            if (nodes.getMasterNode() == null) {
                logger.debug("no known master node, scheduling a retry");
                retryOnMasterChange(clusterState, null);
            } else {
                DiscoveryNode masterNode = nodes.getMasterNode();
                final String actionName = getMasterActionName(masterNode);
                // 發(fā)送到master節(jié)點(diǎn),以netty作為通訊工具,完成后回調(diào) 當(dāng)前l(fā)istner
                transportService.sendRequest(masterNode, actionName, request,
                    new ActionListenerResponseHandler<Response>(listener, responseReader) {
                        @Override
                        public void handleException(final TransportException exp) {
                            Throwable cause = exp.unwrapCause();
                            if (cause instanceof ConnectTransportException ||
                                (exp instanceof RemoteTransportException && cause instanceof NodeClosedException)) {
                                // we want to retry here a bit to see if a new master is elected
                                logger.debug("connection exception while trying to forward request with action name [{}] to " +
                                        "master node [{}], scheduling a retry. Error: [{}]",
                                    actionName, nodes.getMasterNode(), exp.getDetailedMessage());
                                retryOnMasterChange(clusterState, cause);
                            } else {
                                listener.onFailure(exp);
                            }
                        }
                });
            }
        }
    } catch (Exception e) {
        listener.onFailure(e);
    }
}

可見(jiàn),es中確實(shí)有兩種異步的提交方式,一種是當(dāng)前節(jié)點(diǎn)就是執(zhí)行節(jié)點(diǎn),直接使用線程池提交;另一種是遠(yuǎn)程節(jié)點(diǎn)則起網(wǎng)絡(luò)調(diào)用,最終如何實(shí)現(xiàn)異步且往下看。

// org.elasticsearch.transport.TransportService#sendRequest
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
                                                            final TransportRequest request,
                                                            final TransportRequestOptions options,
                                                            TransportResponseHandler<T> handler) {
    final Transport.Connection connection;
    try {
        // 假設(shè)不是本節(jié)點(diǎn),則獲取遠(yuǎn)程的一個(gè) connection, channel
        connection = getConnection(node);
    } catch (final NodeNotConnectedException ex) {
        // the caller might not handle this so we invoke the handler
        handler.handleException(ex);
        return;
    }
    sendRequest(connection, action, request, options, handler);
}
// org.elasticsearch.transport.TransportService#getConnection
/**
    * Returns either a real transport connection or a local node connection if we are using the local node optimization.
    * @throws NodeNotConnectedException if the given node is not connected
    */
public Transport.Connection getConnection(DiscoveryNode node) {
    if (isLocalNode(node)) {
        return localNodeConnection;
    } else {
        return connectionManager.getConnection(node);
    }
}

// org.elasticsearch.transport.TransportService#sendRequest
/**
    * Sends a request on the specified connection. If there is a failure sending the request, the specified handler is invoked.
    *
    * @param connection the connection to send the request on
    * @param action     the name of the action
    * @param request    the request
    * @param options    the options for this request
    * @param handler    the response handler
    * @param <T>        the type of the transport response
    */
public final <T extends TransportResponse> void sendRequest(final Transport.Connection connection, final String action,
                                                            final TransportRequest request,
                                                            final TransportRequestOptions options,
                                                            final TransportResponseHandler<T> handler) {
    try {
        final TransportResponseHandler<T> delegate;
        if (request.getParentTask().isSet()) {
            // If the connection is a proxy connection, then we will create a cancellable proxy task on the proxy node and an actual
            // child task on the target node of the remote cluster.
            //  ----> a parent task on the local cluster
            //        |
            //         ----> a proxy task on the proxy node on the remote cluster
            //               |
            //                ----> an actual child task on the target node on the remote cluster
            // To cancel the child task on the remote cluster, we must send a cancel request to the proxy node instead of the target
            // node as the parent task of the child task is the proxy task not the parent task on the local cluster. Hence, here we
            // unwrap the connection and keep track of the connection to the proxy node instead of the proxy connection.
            final Transport.Connection unwrappedConn = unwrapConnection(connection);
            final Releasable unregisterChildNode = taskManager.registerChildConnection(request.getParentTask().getId(), unwrappedConn);
            delegate = new TransportResponseHandler<T>() {
                @Override
                public void handleResponse(T response) {
                    unregisterChildNode.close();
                    handler.handleResponse(response);
                }

                @Override
                public void handleException(TransportException exp) {
                    unregisterChildNode.close();
                    handler.handleException(exp);
                }

                @Override
                public String executor() {
                    return handler.executor();
                }

                @Override
                public T read(StreamInput in) throws IOException {
                    return handler.read(in);
                }

                @Override
                public String toString() {
                    return getClass().getName() + "/[" + action + "]:" + handler.toString();
                }
            };
        } else {
            delegate = handler;
        }
        asyncSender.sendRequest(connection, action, request, options, delegate);
    } catch (final Exception ex) {
        // the caller might not handle this so we invoke the handler
        final TransportException te;
        if (ex instanceof TransportException) {
            te = (TransportException) ex;
        } else {
            te = new TransportException("failure to send", ex);
        }
        handler.handleException(te);
    }
}

// org.elasticsearch.transport.TransportService#sendRequestInternal
private <T extends TransportResponse> void sendRequestInternal(final Transport.Connection connection, final String action,
                                                                final TransportRequest request,
                                                                final TransportRequestOptions options,
                                                                TransportResponseHandler<T> handler) {
    if (connection == null) {
        throw new IllegalStateException("can't send request to a null connection");
    }
    DiscoveryNode node = connection.getNode();

    Supplier<ThreadContext.StoredContext> storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true);
    ContextRestoreResponseHandler<T> responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler);
    // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring
    final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action));
    final TimeoutHandler timeoutHandler;
    if (options.timeout() != null) {
        timeoutHandler = new TimeoutHandler(requestId, connection.getNode(), action);
        responseHandler.setTimeoutHandler(timeoutHandler);
    } else {
        timeoutHandler = null;
    }
    try {
        if (lifecycle.stoppedOrClosed()) {
            /*
                * If we are not started the exception handling will remove the request holder again and calls the handler to notify the
                * caller. It will only notify if toStop hasn't done the work yet.
                */
            throw new NodeClosedException(localNode);
        }
        if (timeoutHandler != null) {
            assert options.timeout() != null;
            timeoutHandler.scheduleTimeout(options.timeout());
        }
        connection.sendRequest(requestId, action, request, options); // local node optimization happens upstream
    } catch (final Exception e) {
        // usually happen either because we failed to connect to the node
        // or because we failed serializing the message
        final Transport.ResponseContext<? extends TransportResponse> contextToNotify = responseHandlers.remove(requestId);
        // If holderToNotify == null then handler has already been taken care of.
        if (contextToNotify != null) {
            if (timeoutHandler != null) {
                timeoutHandler.cancel();
            }
            // callback that an exception happened, but on a different thread since we don't
            // want handlers to worry about stack overflows. In the special case of running into a closing node we run on the current
            // thread on a best effort basis though.
            final SendRequestTransportException sendRequestException = new SendRequestTransportException(node, action, e);
            final String executor = lifecycle.stoppedOrClosed() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
            threadPool.executor(executor).execute(new AbstractRunnable() {
                @Override
                public void onRejection(Exception e) {
                    // if we get rejected during node shutdown we don't wanna bubble it up
                    logger.debug(
                        () -> new ParameterizedMessage(
                            "failed to notify response handler on rejection, action: {}",
                            contextToNotify.action()),
                        e);
                }
                @Override
                public void onFailure(Exception e) {
                    logger.warn(
                        () -> new ParameterizedMessage(
                            "failed to notify response handler on exception, action: {}",
                            contextToNotify.action()),
                        e);
                }
                @Override
                protected void doRun() throws Exception {
                    contextToNotify.handler().handleException(sendRequestException);
                }
            });
        } else {
            logger.debug("Exception while sending request, handler likely already notified due to timeout", e);
        }
    }
}
// org.elasticsearch.transport.RemoteConnectionManager.ProxyConnection#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
    throws IOException, TransportException {
    connection.sendRequest(requestId, TransportActionProxy.getProxyAction(action),
        TransportActionProxy.wrapRequest(targetNode, request), options);
}
// org.elasticsearch.transport.TcpTransport.NodeChannels#sendRequest
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
    throws IOException, TransportException {
    if (isClosing.get()) {
        throw new NodeNotConnectedException(node, "connection already closed");
    }
    TcpChannel channel = channel(options.type());
    outboundHandler.sendRequest(node, channel, requestId, action, request, options, getVersion(), compress, false);
}
// org.elasticsearch.transport.OutboundHandler#sendRequest
/**
    * Sends the request to the given channel. This method should be used to send {@link TransportRequest}
    * objects back to the caller.
    */
void sendRequest(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
                    final TransportRequest request, final TransportRequestOptions options, final Version channelVersion,
                    final boolean compressRequest, final boolean isHandshake) throws IOException, TransportException {
    Version version = Version.min(this.version, channelVersion);
    OutboundMessage.Request message = new OutboundMessage.Request(threadPool.getThreadContext(), features, request, version, action,
        requestId, isHandshake, compressRequest);
    ActionListener<Void> listener = ActionListener.wrap(() ->
        messageListener.onRequestSent(node, requestId, action, request, options));
    sendMessage(channel, message, listener);
}
// org.elasticsearch.transport.OutboundHandler#sendMessage
private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, ActionListener<Void> listener) throws IOException {
    MessageSerializer serializer = new MessageSerializer(networkMessage, bigArrays);
    SendContext sendContext = new SendContext(channel, serializer, listener, serializer);
    internalSend(channel, sendContext);
}
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
    channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
    BytesReference reference = sendContext.get();
    // stash thread context so that channel event loop is not polluted by thread context
    try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
        channel.sendMessage(reference, sendContext);
    } catch (RuntimeException ex) {
        sendContext.onFailure(ex);
        CloseableChannel.closeChannel(channel);
        throw ex;
    }
}
// org.elasticsearch.transport.netty4.Netty4TcpChannel#sendMessage
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
    // netty 發(fā)送數(shù)據(jù),異步回調(diào),完成異步請(qǐng)求
    channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));

    if (channel.eventLoop().isShutdown()) {
        listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
    }
}

簡(jiǎn)單說(shuō),就是依托于netty的pipeline機(jī)制以及eventLoop實(shí)現(xiàn)遠(yuǎn)程異步請(qǐng)求,至于具體實(shí)現(xiàn)如何,請(qǐng)參考之前文章或各網(wǎng)文。

以上就是詳解Java ES多節(jié)點(diǎn)任務(wù)的高效分發(fā)與收集實(shí)現(xiàn)的詳細(xì)內(nèi)容,更多關(guān)于Java ES多節(jié)點(diǎn)任務(wù)的高效分發(fā) 收集實(shí)現(xiàn)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java中Cookie和Session詳解及區(qū)別總結(jié)

    Java中Cookie和Session詳解及區(qū)別總結(jié)

    這篇文章主要介紹了Java中Cookie和Session詳解,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,感興趣的小伙伴可以參考一下
    2022-06-06
  • SpringMVC4 + MyBatis3 + SQL Server 2014整合教程(含增刪改查分頁(yè))

    SpringMVC4 + MyBatis3 + SQL Server 2014整合教程(含增刪改查分頁(yè))

    這篇文章主要給大家介紹了關(guān)于SpringMVC4 + MyBatis3 + SQL Server 2014整合的相關(guān)資料,文中包括介紹了增刪改查分頁(yè)等相關(guān)內(nèi)容,通過(guò)示例代碼介紹的非常詳細(xì),分享出來(lái)供大家參考學(xué)習(xí),下面來(lái)一起看看吧。
    2017-06-06
  • XFire構(gòu)建web service客戶端的五種方式

    XFire構(gòu)建web service客戶端的五種方式

    本篇文章主要介紹了XFire構(gòu)建web service客戶端的五種方式。具有很好的參考價(jià)值,下面跟著小編一起來(lái)看下吧
    2017-01-01
  • Java struts2捕獲404錯(cuò)誤的方法匯總

    Java struts2捕獲404錯(cuò)誤的方法匯總

    這篇文章主要為大家詳細(xì)匯總了Java struts2捕獲404錯(cuò)誤的常用方法,感興趣的小伙伴們可以參考一下
    2016-05-05
  • Springboot WebFlux集成Spring Security實(shí)現(xiàn)JWT認(rèn)證的示例

    Springboot WebFlux集成Spring Security實(shí)現(xiàn)JWT認(rèn)證的示例

    這篇文章主要介紹了Springboot WebFlux集成Spring Security實(shí)現(xiàn)JWT認(rèn)證的示例,幫助大家更好的理解和學(xué)習(xí)使用springboot框架,感興趣的朋友可以了解下
    2021-04-04
  • springmvc注解配置實(shí)現(xiàn)解析

    springmvc注解配置實(shí)現(xiàn)解析

    這篇文章主要介紹了springmvc注解配置實(shí)現(xiàn)詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09
  • 最新評(píng)論