欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

基于Golang實(shí)現(xiàn)延遲隊(duì)列(DelayQueue)

 更新時(shí)間:2022年09月26日 09:57:17   作者:jiaxwu  
延遲隊(duì)列是一種特殊的隊(duì)列,元素入隊(duì)時(shí)需要指定到期時(shí)間(或延遲時(shí)間),從隊(duì)頭出隊(duì)的元素必須是已經(jīng)到期的。本文將用Golang實(shí)現(xiàn)延遲隊(duì)列,感興趣的可以了解下

背景

延遲隊(duì)列是一種特殊的隊(duì)列,元素入隊(duì)時(shí)需要指定到期時(shí)間(或延遲時(shí)間),從隊(duì)頭出隊(duì)的元素必須是已經(jīng)到期的,而且最先到期的元素最先出隊(duì),也就是隊(duì)列里面的元素是按照到期時(shí)間排序的,添加元素和從隊(duì)頭出隊(duì)的時(shí)間復(fù)雜度是O(log(n))。

由于以上性質(zhì),延遲隊(duì)列一般可以用于以下場景(定時(shí)任務(wù)、延遲任務(wù)):

  • 緩存:用戶淘汰過期元素
  • 通知:在指定時(shí)間通知用戶,比如會(huì)議開始前30分鐘
  • 訂單:30分鐘未支付取消訂單
  • 超時(shí):服務(wù)器自動(dòng)斷開太長時(shí)間沒有心跳的連接

其實(shí)在Golang中是自帶定時(shí)器的,也就是time.After()、time.AfterFunc()等函數(shù),它們的性能也是非常好的,隨著Golang版本升級(jí)還會(huì)優(yōu)化。但是對(duì)于某些場景來說確實(shí)不夠方便,比如緩存場景我們需要能夠支持隨機(jī)刪除定時(shí)器,隨機(jī)重置過期時(shí)間,更加靈活的刪除一小批過期元素。而且像Kafka的時(shí)間輪算法(TimeWheel)里面也用到了延遲隊(duì)列,因此還是有必要了解下如何實(shí)現(xiàn)延遲隊(duì)列。

原理

延遲隊(duì)列每次出隊(duì)的是最小到期時(shí)間的元素,而堆就是用來獲取最值的數(shù)據(jù)結(jié)構(gòu)。使用堆我們可以實(shí)現(xiàn)O(log(n))時(shí)間復(fù)雜度添加元素和移除最小到期時(shí)間元素。

隨機(jī)刪除

有時(shí)候延遲隊(duì)列還需要具有隨機(jī)刪除元素的能力,可以通過以下方式實(shí)現(xiàn):

  • 元素添加刪除標(biāo)記字段:堆中每個(gè)元素都添加一個(gè)刪除標(biāo)記字段,并把這個(gè)元素的地址返回給用戶,用戶就可以標(biāo)記元素的這個(gè)字段為true,這樣元素到達(dá)堆頂時(shí)如果判斷到這個(gè)字段為true就會(huì)被清除,而延遲隊(duì)列里的元素邏輯上是一定會(huì)到達(dá)堆頂?shù)模ㄒ驗(yàn)闀r(shí)間會(huì)流逝)。這是一種懶刪除的方式。
  • 元素添加堆中下標(biāo)字段(或用map記錄下標(biāo)):堆中每個(gè)元素都添加一個(gè)堆中下標(biāo)字段,并把這個(gè)元素的地址返回給用戶,這樣我們就可以通過這個(gè)元素里面記錄的下標(biāo)快速定位元素在堆中的位置,從而刪除元素。詳細(xì)可以看文章如何實(shí)現(xiàn)一個(gè)支持O(log(n))隨機(jī)刪除元素的堆。

重置元素到期時(shí)間

如果需要重置延遲隊(duì)列里面元素的到期時(shí)間,則必須知道元素在堆中的下標(biāo),因?yàn)橹刂玫狡跁r(shí)間之后必須對(duì)堆進(jìn)行調(diào)整,因此只能是元素添加堆中下標(biāo)字段。

Golang實(shí)現(xiàn)

這里我們實(shí)現(xiàn)一個(gè)最簡單的延遲隊(duì)列,也就是不支持隨機(jī)刪除元素和重置元素的到期時(shí)間,因?yàn)橛行﹫鼍爸恍枰砑釉睾瞳@取到期元素這兩個(gè)功能,比如Kafka中的時(shí)間輪,而且這種簡單實(shí)現(xiàn)性能會(huì)高一點(diǎn)。

代碼地址

數(shù)據(jù)結(jié)構(gòu)

主要的結(jié)構(gòu)可以看到就是一個(gè)heap,Entry是每個(gè)元素在堆中的表示,Value是具體的元素值,Expired是為了堆中元素根據(jù)到期時(shí)間排序。

mutex是一個(gè)互斥鎖,主要是保證操作并發(fā)安全。

wakeup是一個(gè)緩沖區(qū)長度為1的通道,通過它實(shí)現(xiàn)添加元素的時(shí)候喚醒等待隊(duì)列不為空或者有更小到期時(shí)間元素加入的協(xié)程。(重點(diǎn))

type Entry[T any] struct {
	Value   T
	Expired time.Time // 到期時(shí)間
}

// 延遲隊(duì)列
type DelayQueue[T any] struct {
	h      *heap.Heap[*Entry[T]]
	mutex  sync.Mutex    // 保證并發(fā)安全
	wakeup chan struct{} // 喚醒通道
}

// 創(chuàng)建延遲隊(duì)列
func New[T any]() *DelayQueue[T] {
	return &DelayQueue[T]{
		h: heap.New(nil, func(e1, e2 *Entry[T]) bool {
			return e1.Expired.Before(e2.Expired)
		}),
		wakeup: make(chan struct{}, 1),
	}
}

實(shí)現(xiàn)原理

阻塞獲取元素的時(shí)候如果隊(duì)列已經(jīng)沒有元素,或者沒有元素到期,那么協(xié)程就需要掛起等待。而被喚醒的條件是元素到期、隊(duì)列不為空或者有更小到期時(shí)間元素加入。

其中元素到期協(xié)程在阻塞獲取元素時(shí)發(fā)現(xiàn)堆頂元素還沒到期,因此這個(gè)條件可以自己構(gòu)造并等待。但是條件隊(duì)列不為空和有更小到期時(shí)間元素加入則需要另外一個(gè)協(xié)程在添加元素時(shí)才能滿足,因此必須通過一個(gè)中間結(jié)構(gòu)來進(jìn)行協(xié)程間通信,一般Golang里面會(huì)使用Channel來實(shí)現(xiàn)。

添加元素

一開始加了一個(gè)互斥鎖,避免并發(fā)沖突,然后把元素加到堆里。

因?yàn)槲覀僒ake()操作,既阻塞獲取元素操作,在不滿足條件時(shí)會(huì)去等待wakeup通道,但是等待通道前必須釋放鎖,否則Push()無法寫入新元素去滿足條件隊(duì)列不為空和有更小到期時(shí)間元素加入。而從釋放鎖后到開始讀取wakeup通道這段時(shí)間是沒有鎖保護(hù)的,如果Push()在這期間插入新元素,為了保證通道不阻塞同時(shí)又能通知到Take()協(xié)程,我們的通道的長度需要是1,同時(shí)使用select+default保證在通道里面已經(jīng)有元素的時(shí)候不阻塞Push()協(xié)程。

// 添加延遲元素到隊(duì)列
func (q *DelayQueue[T]) Push(value T, delay time.Duration) {
	q.mutex.Lock()
	defer q.mutex.Unlock()
	entry := &Entry[T]{
		Value:   value,
		Expired: time.Now().Add(delay),
	}
	q.h.Push(entry)
	// 喚醒等待的協(xié)程
	// 這里表示新添加的元素到期時(shí)間是最早的,或者原來隊(duì)列為空
	// 因此必須喚醒等待的協(xié)程,因?yàn)榭梢阅玫礁绲狡诘脑?
	if q.h.Peek() == entry {
		select {
		case q.wakeup <- struct{}{}:
		default:
		}
	}
}

阻塞獲取元素

這里先判斷堆是否有元素,如果有獲取堆頂元素,然后判斷是否已經(jīng)到期,如果到期則直接出堆并返回。否則等待直到超時(shí)或者元素到期或者有新的元素到達(dá)。

這里在解鎖之前會(huì)清空wakeup通道,這樣可以保證下面讀取的wakeup通道里的元素肯定是新加入的。

// 等待直到有元素到期
// 或者ctx被關(guān)閉
func (q *DelayQueue[T]) Take(ctx context.Context) (T, bool) {
	for {
		var expired *time.Timer
		q.mutex.Lock()
		// 有元素
		if !q.h.Empty() {
			// 獲取元素
			entry := q.h.Peek()
			if time.Now().After(entry.Expired) {
				q.h.Pop()
				q.mutex.Unlock()
				return entry.Value, true
			}
			// 到期時(shí)間,使用time.NewTimer()才能夠調(diào)用Stop(),從而釋放定時(shí)器
			expired = time.NewTimer(time.Until(entry.Expired))
		}
		// 避免被之前的元素假喚醒
		select {
		case <-q.wakeup:
		default:
		}
		q.mutex.Unlock()

		// 不為空,需要同時(shí)等待元素到期
                // 并且除非expired到期,否則都需要關(guān)閉expired避免泄露
		if expired != nil {
			select {
			case <-q.wakeup: // 新的更快到期元素
				expired.Stop()
			case <-expired.C: // 首元素到期
			case <-ctx.Done(): // 被關(guān)閉
				expired.Stop()
				var t T
				return t, false
			}
		} else {
			select {
			case <-q.wakeup: // 新的更快到期元素
			case <-ctx.Done(): // 被關(guān)閉
				var t T
				return t, false
			}
		}
	}
}

Channel方式阻塞讀取

Golang里面可以使用Channel進(jìn)行流式消費(fèi),因此簡單包裝一個(gè)Channel形式的阻塞讀取接口,給通道一點(diǎn)緩沖區(qū)大小可以帶來更好的性能。

// 返回一個(gè)通道,輸出到期元素
// size是通道緩存大小
func (q *DelayQueue[T]) Channel(ctx context.Context, size int) <-chan T {
	out := make(chan T, size)
	go func() {
		for {
			entry, ok := q.Take(ctx)
			if !ok {
				return
			}
			out <- entry
		}
	}()
	return out
}

使用方式

for entry := range q.Channel(context.Background(), 10) {
    // do something
}

性能測試

這里進(jìn)行一個(gè)簡單的性能測試,也就是先添加元素,然后等待到期后全部拿出來。

func BenchmarkPushAndTake(b *testing.B) {
	q := New[int]()
	b.ResetTimer()
        
        // 添加元素
	for i := 0; i < b.N; i++ {
		q.Push(i, time.Duration(i))
	}
        
        // 等待全部元素到期
	b.StopTimer()
	time.Sleep(time.Duration(b.N))
	b.StartTimer()

        // 獲取元素
	for i := 0; i < b.N; i++ {
		_, ok := q.Take(context.Background())
		if !ok {
			b.Errorf("want %v, but %v", true, ok)
		}
	}
}

測試結(jié)果:

Benchmark-8      2331534               476.8 ns/op            76 B/op          1 allocs/op

總結(jié)

堆實(shí)現(xiàn)的延遲隊(duì)列是一種實(shí)現(xiàn)起來比較簡單的定時(shí)器(當(dāng)然阻塞讀取Take()是比較復(fù)雜的),由于時(shí)間復(fù)雜度是O(log(n)),因此可以滿足定時(shí)任務(wù)數(shù)量不是特別多的場景。堆實(shí)現(xiàn)的延遲隊(duì)列也是可以隨機(jī)刪除元素的,可以根據(jù)具體任務(wù)選擇是否實(shí)現(xiàn)。如果對(duì)定時(shí)器性能要求比較敏感的話可以選擇使用時(shí)間輪實(shí)現(xiàn)定時(shí)器,它可以在O(1)的時(shí)間復(fù)雜度添加和刪除一個(gè)定時(shí)器,不過實(shí)現(xiàn)起來比較復(fù)雜(挖個(gè)坑,下篇文章實(shí)現(xiàn))。

到此這篇關(guān)于基于Golang實(shí)現(xiàn)延遲隊(duì)列(DelayQueue)的文章就介紹到這了,更多相關(guān)Golang延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Goland和IDEA換行符的設(shè)置方式

    Goland和IDEA換行符的設(shè)置方式

    這篇文章主要介紹了Goland和IDEA換行符的設(shè)置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • golang雙指針快速排序的實(shí)現(xiàn)代碼

    golang雙指針快速排序的實(shí)現(xiàn)代碼

    這篇文章主要介紹了golang雙指針快速排序的實(shí)現(xiàn)代碼,通過實(shí)例代碼補(bǔ)充介紹了Golang實(shí)現(xiàn)快速排序和歸并排序以及堆排序算法全注釋,需要的朋友可以參考下
    2024-03-03
  • Go語言使用時(shí)會(huì)遇到的錯(cuò)誤及解決方法詳解

    Go語言使用時(shí)會(huì)遇到的錯(cuò)誤及解決方法詳解

    這篇文章主要為大家詳細(xì)介紹了Go語言使用時(shí)常常會(huì)遇到的一些錯(cuò)誤及解決方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下
    2023-07-07
  • 使用自定義錯(cuò)誤碼攔截grpc內(nèi)部狀態(tài)碼問題

    使用自定義錯(cuò)誤碼攔截grpc內(nèi)部狀態(tài)碼問題

    這篇文章主要介紹了使用自定義錯(cuò)誤碼攔截grpc內(nèi)部狀態(tài)碼問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • golang1.21新特性全面講解

    golang1.21新特性全面講解

    經(jīng)過了半年左右的開發(fā),golang?1.21?最近正式發(fā)布了,這個(gè)版本中有不少重要的新特性和變更,尤其是在泛型相關(guān)的代碼上,下面小編就來和大家好好嘮嘮吧
    2023-08-08
  • 淺析Golang中rune類型的使用

    淺析Golang中rune類型的使用

    從golang源碼中看出,rune關(guān)鍵字是int32的別名(-231~231-1),對(duì)比byte(-128~127),可表示的字符更多,本文就來簡單聊聊它的使用方法吧,希望對(duì)大家有所幫助
    2023-05-05
  • golang實(shí)現(xiàn)http server提供文件下載功能

    golang實(shí)現(xiàn)http server提供文件下載功能

    這篇文章主要介紹了golang實(shí)現(xiàn)http server提供文件下載功能,本文給大家簡單介紹了Golang的相關(guān)知識(shí),非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-02-02
  • Go cobra庫使用教程

    Go cobra庫使用教程

    cobra既是一個(gè)用于創(chuàng)建強(qiáng)大現(xiàn)代CLI應(yīng)用程序的庫,也是一個(gè)生成應(yīng)用程序和命令文件的程序。cobra被用在很多go語言的項(xiàng)目中,比如 Kubernetes、Docker、Istio、ETCD、Hugo、Github CLI等等
    2022-12-12
  • 詳解Go如何優(yōu)雅的對(duì)時(shí)間進(jìn)行格式化

    詳解Go如何優(yōu)雅的對(duì)時(shí)間進(jìn)行格式化

    這篇文章主要為大家詳細(xì)介紹了Go語言中是如何優(yōu)雅的對(duì)時(shí)間進(jìn)行格式化的,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起了解一下
    2023-06-06
  • Go實(shí)現(xiàn)簡單的數(shù)據(jù)庫表轉(zhuǎn)結(jié)構(gòu)體詳解

    Go實(shí)現(xiàn)簡單的數(shù)據(jù)庫表轉(zhuǎn)結(jié)構(gòu)體詳解

    這篇文章主要為大家介紹了Go實(shí)現(xiàn)簡單的數(shù)據(jù)庫表轉(zhuǎn)結(jié)構(gòu)體詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01

最新評(píng)論