欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

PowerJob的ServerDiscoveryService工作流程源碼解讀

 更新時(shí)間:2023年12月24日 15:26:55   作者:codecraft  
這篇文章主要為大家介紹了PowerJob的ServerDiscoveryService工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJob的ServerDiscoveryService

ServerDiscoveryService

tech/powerjob/worker/background/ServerDiscoveryService.java

@Slf4j
public class ServerDiscoveryService {
    private final Long appId;
    private final PowerJobWorkerConfig config;
    private String currentServerAddress;
    private final Map<String, String> ip2Address = Maps.newHashMap();
    /**
     *  服務(wù)發(fā)現(xiàn)地址
     */
    private static final String DISCOVERY_URL = "http://%s/server/acquire?%s";
    /**
     * 失敗次數(shù)
     */
    private static int FAILED_COUNT = 0;
    /**
     * 最大失敗次數(shù)
     */
    private static final int MAX_FAILED_COUNT = 3;
    public ServerDiscoveryService(Long appId, PowerJobWorkerConfig config) {
        this.appId = appId;
        this.config = config;
    }
    //......
}
ServerDiscoveryService定義了currentServerAddress、ip2Address、服務(wù)發(fā)現(xiàn)url模版,失敗次數(shù),最大失敗次數(shù)

start

public void start(ScheduledExecutorService timingPool) {
        this.currentServerAddress = discovery();
        if (StringUtils.isEmpty(this.currentServerAddress) && !config.isEnableTestMode()) {
            throw new PowerJobException("can't find any available server, this worker has been quarantined.");
        }
        // 這里必須保證成功
        timingPool.scheduleAtFixedRate(() -> {
                    try {
                        this.currentServerAddress = discovery();
                    } catch (Exception e) {
                        log.error("[PowerDiscovery] fail to discovery server!", e);
                    }
                }
                , 10, 10, TimeUnit.SECONDS);
    }
其start方法先通過(guò)discovery方法獲取currentServerAddress,然后注冊(cè)定時(shí)任務(wù)每隔10s重新刷新一下currentServerAddress

discovery

private String discovery() {
        if (ip2Address.isEmpty()) {
            config.getServerAddress().forEach(x -> ip2Address.put(x.split(":")[0], x));
        }
        String result = null;
        // 先對(duì)當(dāng)前機(jī)器發(fā)起請(qǐng)求
        String currentServer = currentServerAddress;
        if (!StringUtils.isEmpty(currentServer)) {
            String ip = currentServer.split(":")[0];
            // 直接請(qǐng)求當(dāng)前Server的HTTP服務(wù),可以少一次網(wǎng)絡(luò)開(kāi)銷(xiāo),減輕Server負(fù)擔(dān)
            String firstServerAddress = ip2Address.get(ip);
            if (firstServerAddress != null) {
                result = acquire(firstServerAddress);
            }
        }
        for (String httpServerAddress : config.getServerAddress()) {
            if (StringUtils.isEmpty(result)) {
                result = acquire(httpServerAddress);
            }else {
                break;
            }
        }
        if (StringUtils.isEmpty(result)) {
            log.warn("[PowerDiscovery] can't find any available server, this worker has been quarantined.");
            // 在 Server 高可用的前提下,連續(xù)失敗多次,說(shuō)明該節(jié)點(diǎn)與外界失聯(lián),Server已經(jīng)將秒級(jí)任務(wù)轉(zhuǎn)移到其他Worker,需要?dú)⑺辣镜氐娜蝿?wù)
            if (FAILED_COUNT++ > MAX_FAILED_COUNT) {
                log.warn("[PowerDiscovery] can't find any available server for 3 consecutive times, It's time to kill all frequent job in this worker.");
                List<Long> frequentInstanceIds = HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys();
                if (!CollectionUtils.isEmpty(frequentInstanceIds)) {
                    frequentInstanceIds.forEach(instanceId -> {
                        HeavyTaskTracker taskTracker = HeavyTaskTrackerManager.removeTaskTracker(instanceId);
                        taskTracker.destroy();
                        log.warn("[PowerDiscovery] kill frequent instance(instanceId={}) due to can't find any available server.", instanceId);
                    });
                }
                FAILED_COUNT = 0;
            }
            return null;
        } else {
            // 重置失敗次數(shù)
            FAILED_COUNT = 0;
            log.debug("[PowerDiscovery] current server is {}.", result);
            return result;
        }
    }
discovery方法從config.getServerAddress()解析地址放到ip2Address,若currentServerAddress有值則acquire,否則遍歷config.getServerAddress()執(zhí)行acquire;若還沒(méi)有獲取到則判斷FAILED_COUNT是否超出MAX_FAILED_COUNT,超出則遍歷HeavyTaskTrackerManager.getAllFrequentTaskTrackerKeys()挨個(gè)執(zhí)行remove及destory

acquire

private String acquire(String httpServerAddress) {
        String result = null;
        String url = buildServerDiscoveryUrl(httpServerAddress);
        try {
            result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
        }catch (Exception ignore) {
        }
        if (!StringUtils.isEmpty(result)) {
            try {
                ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
                if (resultDTO.isSuccess()) {
                    return resultDTO.getData().toString();
                }
            }catch (Exception ignore) {
            }
        }
        return null;
    }
    private String buildServerDiscoveryUrl(String address) {
        ServerDiscoveryRequest serverDiscoveryRequest = new ServerDiscoveryRequest()
                .setAppId(appId)
                .setCurrentServer(currentServerAddress)
                .setProtocol(config.getProtocol().name().toUpperCase());
        String query = Joiner.on(OmsConstant.AND).withKeyValueSeparator(OmsConstant.EQUAL).join(serverDiscoveryRequest.toMap());
        return String.format(DISCOVERY_URL, address, query);
    }
acquire方法通過(guò)buildServerDiscoveryUrl構(gòu)建url,然后執(zhí)行HttpUtils.get(url)獲取地址

ServerController

tech/powerjob/server/web/controller/ServerController.java

@RestController
@RequestMapping("/server")
@RequiredArgsConstructor
public class ServerController implements ServerInfoAware {
    private ServerInfo serverInfo;
    private final TransportService transportService;
    private final ServerElectionService serverElectionService;
    private final AppInfoRepository appInfoRepository;
    private final WorkerClusterQueryService workerClusterQueryService;
    //......
    @GetMapping("/acquire")
    public ResultDTO<String> acquireServer(ServerDiscoveryRequest request) {
        return ResultDTO.success(serverElectionService.elect(request));
    }
    //......    
}
acquireServer方法執(zhí)行serverElectionService.elect(request)返回server地址

elect

tech/powerjob/server/remote/server/election/ServerElectionService.java

public String elect(ServerDiscoveryRequest request) {
        if (!accurate()) {
            final String currentServer = request.getCurrentServer();
            // 如果是本機(jī),就不需要查數(shù)據(jù)庫(kù)那么復(fù)雜的操作了,直接返回成功
            Optional<ProtocolInfo> localProtocolInfoOpt = Optional.ofNullable(transportService.allProtocols().get(request.getProtocol()));
            if (localProtocolInfoOpt.isPresent() && localProtocolInfoOpt.get().getAddress().equals(currentServer)) {
                log.debug("[ServerElectionService] this server[{}] is worker's current server, skip check", currentServer);
                return currentServer;
            }
        }
        return getServer0(request);
    }
elect方法判斷如果是本機(jī)就直接返回,否則執(zhí)行g(shù)etServer0

getServer0

private String getServer0(ServerDiscoveryRequest discoveryRequest) {
        final Long appId = discoveryRequest.getAppId();
        final String protocol = discoveryRequest.getProtocol();
        Set<String> downServerCache = Sets.newHashSet();
        for (int i = 0; i < RETRY_TIMES; i++) {
            // 無(wú)鎖獲取當(dāng)前數(shù)據(jù)庫(kù)中的Server
            Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
            if (!appInfoOpt.isPresent()) {
                throw new PowerJobException(appId + " is not registered!");
            }
            String appName = appInfoOpt.get().getAppName();
            String originServer = appInfoOpt.get().getCurrentServer();
            String activeAddress = activeAddress(originServer, downServerCache, protocol);
            if (StringUtils.isNotEmpty(activeAddress)) {
                return activeAddress;
            }
            // 無(wú)可用Server,重新進(jìn)行Server選舉,需要加鎖
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = lockService.tryLock(lockName, 30000);
            if (!lockStatus) {
                try {
                    Thread.sleep(500);
                }catch (Exception ignore) {
                }
                continue;
            }
            try {
                // 可能上一臺(tái)機(jī)器已經(jīng)完成了Server選舉,需要再次判斷
                AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                String address = activeAddress(appInfo.getCurrentServer(), downServerCache, protocol);
                if (StringUtils.isNotEmpty(address)) {
                    return address;
                }
                // 篡位,如果本機(jī)存在協(xié)議,則作為Server調(diào)度該 worker
                final ProtocolInfo targetProtocolInfo = transportService.allProtocols().get(protocol);
                if (targetProtocolInfo != null) {
                    // 注意,寫(xiě)入 AppInfoDO#currentServer 的永遠(yuǎn)是 default 的地址,僅在返回的時(shí)候特殊處理為協(xié)議地址
                    appInfo.setCurrentServer(transportService.defaultProtocol().getAddress());
                    appInfo.setGmtModified(new Date());
                    appInfoRepository.saveAndFlush(appInfo);
                    log.info("[ServerElection] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                    return targetProtocolInfo.getAddress();
                }
            }catch (Exception e) {
                log.error("[ServerElection] write new server to db failed for app {}.", appName, e);
            } finally {
                lockService.unlock(lockName);
            }
        }
        throw new PowerJobException("server elect failed for app " + appId);
    }
getServer0方法先判斷appInfoRepository中當(dāng)前appId的originServer是否存活,是則直接返回,否則加鎖將transportService.defaultProtocol().getAddress()寫(xiě)入到appInfo的currentServer

小結(jié)

PowerJob的ServerDiscoveryService定義了start方法,它先通過(guò)discovery方法獲取currentServerAddress,然后注冊(cè)定時(shí)任務(wù)每隔10s重新刷新一下currentServerAddress;

discovery方法主要是遍歷config.getServerAddress()執(zhí)行acquire;

acquire方法通過(guò)buildServerDiscoveryUrl構(gòu)建url,然后執(zhí)行HttpUtils.get(url)獲取該appId的server地址。

以上就是PowerJob的ServerDiscoveryService工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob ServerDiscoveryService的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java并發(fā)編程——volatile關(guān)鍵字

    Java并發(fā)編程——volatile關(guān)鍵字

    這篇文章主要介紹了Java并發(fā)編程——volatile關(guān)鍵字的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)Java并發(fā)編程,感興趣的朋友可以了解下
    2020-10-10
  • 深入剖析構(gòu)建JSON字符串的三種方式(推薦)

    深入剖析構(gòu)建JSON字符串的三種方式(推薦)

    下面小編就為大家?guī)?lái)一篇深入剖析構(gòu)建JSON字符串的三種方式(推薦)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-10-10
  • Java項(xiàng)目防止SQL注入的幾種方式

    Java項(xiàng)目防止SQL注入的幾種方式

    SQL注入是一種常見(jiàn)的攻擊方式,黑客試圖通過(guò)操縱應(yīng)用程序的輸入來(lái)執(zhí)行惡意SQL查詢(xún),從而繞過(guò)認(rèn)證和授權(quán),竊取、篡改或破壞數(shù)據(jù)庫(kù)中的數(shù)據(jù),本文主要介紹了Java項(xiàng)目防止SQL注入的幾種方式,感興趣的可以了解一下
    2023-12-12
  • SpringBoot加載不出來(lái)application.yml文件的解決方法

    SpringBoot加載不出來(lái)application.yml文件的解決方法

    這篇文章主要介紹了SpringBoot加載不出來(lái)application.yml文件的解決方法,文中通過(guò)示例代碼講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作有一定的幫助,需要的朋友跟著小編來(lái)一起來(lái)學(xué)習(xí)吧
    2023-12-12
  • Java使用itext生成pdf標(biāo)簽的操作方法

    Java使用itext生成pdf標(biāo)簽的操作方法

    iText是著名的開(kāi)放源碼的站點(diǎn)sourceforge一個(gè)項(xiàng)目,是用于生成PDF文檔的一個(gè)java類(lèi)庫(kù),通過(guò)iText不僅可以生成PDF或rtf的文檔,而且可以將XML、Html文件轉(zhuǎn)化為PDF文件,本文給大家介紹了Java使用itext生成pdf標(biāo)簽的操作方法,需要的朋友可以參考下
    2024-12-12
  • 關(guān)于kafka發(fā)送消息的三種方式總結(jié)

    關(guān)于kafka發(fā)送消息的三種方式總結(jié)

    這篇文章主要介紹了關(guān)于kafka發(fā)送消息的三種方式總結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • springboot集成elasticsearch7的圖文方法

    springboot集成elasticsearch7的圖文方法

    本文記錄springboot集成elasticsearch7的方法,本文通過(guò)圖文實(shí)例代碼相結(jié)合給大家介紹的非常詳細(xì),需要的朋友參考下吧
    2021-05-05
  • Java垃圾回收機(jī)制算法詳解

    Java垃圾回收機(jī)制算法詳解

    這篇文章主要介紹了Java垃圾回收機(jī)制算法詳解,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • spring循環(huán)依賴(lài)策略解析

    spring循環(huán)依賴(lài)策略解析

    這篇文章主要為大家詳細(xì)介紹了spring循環(huán)依賴(lài)策略,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-09-09
  • Spring擴(kuò)展點(diǎn)之BeanFactoryPostProcessor詳解

    Spring擴(kuò)展點(diǎn)之BeanFactoryPostProcessor詳解

    這篇文章主要介紹了Spring擴(kuò)展點(diǎn)之BeanFactoryPostProcessor詳解,Spring的設(shè)計(jì)非常優(yōu)雅,有很多的擴(kuò)展點(diǎn)供我們對(duì)項(xiàng)目進(jìn)行擴(kuò)展,今天學(xué)習(xí)一下Spring其中擴(kuò)展點(diǎn)之一的BeanFactoryPostProcessor,需要的朋友可以參考下
    2023-11-11

最新評(píng)論