欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot+dubbo實現(xiàn)時間輪算法

 更新時間:2025年04月08日 11:01:11   作者:DREAM LINER SU  
時間輪是一種高效利用線程資源進(jìn)行批量化調(diào)度的算法,本文主要介紹了springboot+dubbo實現(xiàn)時間輪算法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

前言

時間輪(TimingWheel)是一種高效利用線程資源進(jìn)行批量化調(diào)度的算法,廣泛應(yīng)用于各種操作系統(tǒng)的定時任務(wù)調(diào)度中,如Linux的crontab,以及Java開發(fā)中常用的Dubbo、Netty、Kafka等框架。時間輪的核心思想是將時間劃分為若干個時間間隔(bucket),每個時間間隔代表一個時間段,并通過一個循環(huán)的數(shù)據(jù)結(jié)構(gòu)(類似于時鐘)來管理這些時間間隔。

文章基于3.1.0版本進(jìn)行分析

		<dependency>
            <groupId>org.apache.dubbo</groupId>
            <artifactId>dubbo</artifactId>
            <version>3.1.0</version>
        </dependency>

一、參數(shù)說明

在這里插入圖片描述

  • tickDuration:表示一個槽所代表的時間范圍 默認(rèn)100ms
  • ticksPerWheel:表示該時間輪有多少個槽 默認(rèn)512
  • startTime:表示該時間輪的開始時間
  • interval:時間輪所能表示的時間跨度,也就是 tickDuration * ticksPerWheel
  • currentTime:表示當(dāng)前時間,也就是時間輪指針指向的時間
  • wheel:表示TimerTaskList的數(shù)組,即各個槽,每個bucket都是一個 HashedWheelBucket 。
  • HashedWheelBucket:存儲TimerTaskEntry的雙向鏈表
  • HashedWheelTimeout:延遲任務(wù),有兩個值 deadline 和 remainingRounds
  • deadline:TimerTask 最后的執(zhí)行時間
  • remainingRounds:剩余圈數(shù)
  • timeouts:用來保存新增的HashedWheelTimeout,每次執(zhí)行會拿出10W個放入HashedWheelBucket

二、具體實現(xiàn)

1、HashedWheelTimer

時間輪實現(xiàn)類

	public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel,
            long maxPendingTimeouts) {
		// 檢查參數(shù)
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
		// 創(chuàng)建時間輪
        wheel = createWheel(ticksPerWheel);
		
		// 位運(yùn)算標(biāo)識 
		// 因為一圈的長度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline& mast == deadline % wheel.length    
		// deadline = System.nanoTime() + unit.toNanos(delay) - startTime; TODO
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
		// 時間輪的基本時間跨度,轉(zhuǎn)成最小時間單位Nanos
        this.tickDuration = unit.toNanos(tickDuration);

        // Prevent overflow.
		// 時間跨度限制不能太大,計算會有問題
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
		// 創(chuàng)建時間輪工作線程
        workerThread = threadFactory.newThread(worker);

        this.maxPendingTimeouts = maxPendingTimeouts;
		// 延遲任務(wù)太多的時間,警告日志
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

參數(shù)說明:

  • threadFactory
    線程工廠,創(chuàng)建時間輪線程
  • tickDuration
    每一tick的時間
  • timeUnit
    tickDuration的時間單位
  • ticksPerWheel
    就是輪子一共有多個格子,即要多少個tick才能走完這個wheel一圈。

2、createWheel

創(chuàng)建時間輪

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }
		// 如果不是2^n 則調(diào)整為2^n
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
		// 初始化時間輪槽
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

3、newTimeout

添加新任務(wù)

	public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
		// 如果任務(wù)大于最大量,則不允許繼續(xù)添加
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }
		// 啟動時間輪工作線程
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
		// 如果為負(fù)數(shù),那么說明超過了long的最大值
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

4、start

啟動時間輪

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
			// 如果是初始化 啟動實踐論
            case WORKER_STATE_INIT:
				// 保證只啟動一次,原子性
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
			// 已經(jīng)啟動過了
            case WORKER_STATE_STARTED:
                break;
			// 時間輪停止了
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
		// 線程啟動執(zhí)行任務(wù)是異步的,這里是等待workerThread.start(),線程已經(jīng)啟動了
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

5、run

workerThread.start()啟動后,會執(zhí)行Worker的run方法

public void run() {
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    // Notify the other threads waiting for the initialization at start().
	// 喚醒被阻塞的start()方法。
    startTimeInitialized.countDown();

    do {
		// 等下一個槽的到達(dá)時間,開始執(zhí)行上一個槽的任務(wù) TODO 不明白這里的設(shè)計,哪位大佬知道可以指點一下
        final long deadline = waitForNextTick();
        if (deadline > 0) {
			// 計算時間輪的槽位
            int idx = (int) (tick & mask);
			// 移除取消的了task
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // 將newTimeout()方法中加入到待處理定時任務(wù)隊列中的任務(wù)加入到指定的格子中
			transferTimeoutsToBuckets();
			// 運(yùn)行目前指針指向的槽中的bucket鏈表中的任務(wù),執(zhí)行到期的延時任務(wù)
            bucket.expireTimeouts(deadline);
            tick++;
        }
		
    } 
	// 如果Worker_State一只是started狀態(tài),就一直循環(huán)
	while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // Fill the unprocessedTimeouts so we can return them from stop() method.
    for (HashedWheelBucket bucket : wheel) {
		// 清除時間輪中超時未處理的任務(wù)
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (; ; ) {
		// 遍歷任務(wù)隊列,發(fā)現(xiàn)如果有任務(wù)被取消,則添加到unprocessedTimeouts,也就是不需要處理的隊列中。
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
	// 再次移除取消的了task
    processCancelledTasks();
}

6、waitForNextTick

一個鐘表上的間隔是代表一個單位時間的間隔,那么waitForNextTick就是根據(jù)當(dāng)前時間計算出跳動到下個時間的時間間隔,然后進(jìn)行sleep,結(jié)束后進(jìn)入下一個時間間隔,下一個間隔到來的時候返回。

	/**
     * 根據(jù)startTime和當(dāng)前槽位計算目標(biāo)nanoTime,
     * 等待時間到達(dá)
     *
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    private long waitForNextTick() {
		// tick槽位,tickDuration表示每個時間格的跨度,所以deadline返回的是下一次時間輪指針跳動的時間
        long deadline = tickDuration * (tick + 1);

        for (; ; ) {
			// 計算當(dāng)前時間距離啟動時間的時間間隔,期間休眠
            final long currentTime = System.nanoTime() - startTime;
			// 計算sleepTimeMs先加999999,應(yīng)該是不足1ms的,補(bǔ)足1ms
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
			
            // sleepTimeMs小于零表示走到了下一個時間槽位置
			if (sleepTimeMs <= 0) {
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE;
                } else {
                    return currentTime;
                }
            }
			// Windows 時間換算
            if (isWindows()) {
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
				// 當(dāng)前時間距離下一次tick時間還有一段距離,需要sleep
                Thread.sleep(sleepTimeMs);
            } catch (InterruptedException ignored) {
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

7、transferTimeoutsToBuckets

轉(zhuǎn)移任務(wù)到時間輪的具體槽中

	// 將延時任務(wù)隊列中等待添加到時間輪中的延時任務(wù)轉(zhuǎn)移到時間輪的指定位置
	private void transferTimeoutsToBuckets() {
        // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
        // adds new timeouts in a loop.
		// 循環(huán)100000次,也就是每次轉(zhuǎn)移10w個任務(wù)
		// 為了防止這個操作銷毀太多時間,導(dǎo)致更多的任務(wù)時間不準(zhǔn),因此一次最多操作10w個
        for (int i = 0; i < 100000; i++) {
			// 從阻塞隊列中獲得具體的任務(wù)
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                // all processed
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                // Was cancelled in the meantime.
                continue;
            }

            // 計算tick次數(shù),deadline表示當(dāng)前任務(wù)的延遲時間,tickDuration表示時間槽的間隔,兩者相除就可以計算當(dāng)前任務(wù)需要tick幾次才能被執(zhí)行
			long calculated = timeout.deadline / tickDuration;
            // 計算剩余的輪數(shù), 只有 timer 走夠輪數(shù), 并且到達(dá)了 task 所在的 slot, task 才會過期.(被執(zhí)行)
			timeout.remainingRounds = (calculated - tick) / wheel.length;

            // Ensure we don't schedule for past.
			// 如果任務(wù)在timeouts隊列里面放久了, 以至于已經(jīng)過了執(zhí)行時間, 這個時候就使用當(dāng)前tick, 也就是放到當(dāng)前bucket, 此方法調(diào)用完后就會被執(zhí)行
            final long ticks = Math.max(calculated, tick);
            // 算出任務(wù)應(yīng)該插入的 wheel 的 slot, stopIndex = tick 次數(shù) & mask, mask = wheel.length - 1
			int stopIndex = (int) (ticks & mask);

            // 把timeout任務(wù)插入到指定的bucket鏈中。
			HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);
        }
    }

8、expireTimeouts

當(dāng)指針跳動到某一個時間槽中時,會就觸發(fā)這個槽中的任務(wù)的執(zhí)行。該功能是通過expireTimeouts來實現(xiàn)

	void expireTimeouts(long deadline) {
		// 雙向鏈表
        HashedWheelTimeout timeout = head;

        // process all timeouts
        // 遍歷當(dāng)前時間槽中的所有任務(wù)
		while (timeout != null) {
            HashedWheelTimeout next = timeout.next;
			// 如果當(dāng)前任務(wù)要被執(zhí)行,那么remainingRounds應(yīng)該小于或者等于0
            if (timeout.remainingRounds <= 0) {
				// 從bucket鏈表中移除當(dāng)前timeout,并返回鏈表中下一個timeout
                next = remove(timeout);
				// 如果timeout的時間小于當(dāng)前的時間,那么就調(diào)用expire執(zhí)行task
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    // 不可能發(fā)生的情況,就是說round已經(jīng)為0了,deadline卻 > 當(dāng)前槽的deadline
					throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
            } else if (timeout.isCancelled()) {
                next = remove(timeout);
            } else {
				// 因為當(dāng)前的槽位已經(jīng)過了,說明已經(jīng)走了一圈了,把輪數(shù)減一
                timeout.remainingRounds--;
            }
			// 把指針放置到下一個timeout
            timeout = next;
        }
    }

總結(jié)

時間輪(TimingWheel)在計算機(jī)科學(xué)中,特別是在任務(wù)調(diào)度和時間管理方面,具有重要的意義,我們可以結(jié)合業(yè)務(wù)進(jìn)行使用

  • 節(jié)省cpu資源
  • 易于實現(xiàn)和維護(hù)
  • 批量化調(diào)度模型
  • 高效處理大量定時任務(wù)
  • 靈活適應(yīng)不同應(yīng)用場景

到此這篇關(guān)于springboot+dubbo實現(xiàn)時間輪算法的文章就介紹到這了,更多相關(guān)springboot dubbo時間輪內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家! 

相關(guān)文章

最新評論