欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang無限緩存channel的設計與實現(xiàn)解析

 更新時間:2023年07月21日 11:15:29   作者:Y先森0.0  
這篇文章主要為大家介紹了Golang無限緩存channel的設計與實現(xiàn)解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

一.引言

Go語言的Channel有兩種類型,一種是無緩存的channle,一個種是有緩存的channel,但是對于有緩存的channle來說,其緩存長度在創(chuàng)建時就已經(jīng)固定了,中間也不能擴縮容,這導致對某些特定的業(yè)務場景來說不太方便

業(yè)務場景如下 :

爬蟲場景,想爬取某個URL頁面上可達的所有URL

一個channle中存在待處理的URL

一堆worker groutine從channle中讀取URL,下載解析網(wǎng)頁并且提取URL,再把URL放入channle

這種場景下,使用消息隊列或sync包可以解決這個問題,但是比較復雜,如果有一個可以無限緩存的Channle或許是比較好的解決方案

二.設計

基于以上特定的業(yè)務場景,我們的無限緩存Channle應該滿足以下要求:

緩存無限,最核心的基本要求。

不能阻塞寫,普通channle的寫操作之所以阻塞,是因為緩存滿了,無限緩存的channle不應該存在這個問題。

無數(shù)據(jù)時阻塞讀,此特性保持和普通channle一樣。

讀寫都應通過channle操作 :通過channle的 <- 和 ->,第一個是方便,仍遵循普通channle的語法,第二是不能暴露內(nèi)部緩存

channle被關閉后,未讀取的數(shù)據(jù)應該仍然可讀,此特性和普通channle保持一致

可基于數(shù)據(jù)量自動擴縮容,在數(shù)據(jù)量很大的時候要求可以自適應的擴容,在數(shù)據(jù)量變小后,為了避免內(nèi)存浪費,要求可以自適應的縮容

針對以上要求,設計思想如下:

內(nèi)部含有兩個普通channle,分別用于讀寫,我們將其稱作In和Out,往In中寫入數(shù)據(jù),然后從Out中讀取數(shù)據(jù)

內(nèi)部有一個可以自適應擴縮容的buf,當寫channle滿了寫不了之后,寫入到此buf中

內(nèi)部含有一個工作goroutine,總是In中數(shù)據(jù)放入到Out或者buf中

內(nèi)部的自適應擴縮容buf可以采用雙向環(huán)形鏈表

和采用數(shù)組實現(xiàn)相比,優(yōu)點如下:

數(shù)組大小是有限制的,語言層面就做不到真正的無限緩存

數(shù)組擴容代價大,而采用雙向環(huán)形鏈表則只用增加節(jié)點即可,縮容同樣

type T interface{}
type UnlimitSizeChan struct {
bufCount int64    // 統(tǒng)計元素數(shù)量,原子操作
In       chan<- T // 寫入channle
Out      <-chan T    // 讀取channle
buffer   *RingBuffer // 自適應擴縮容Buf
}

雙向環(huán)形鏈表 如何寫入和讀取數(shù)據(jù),并且做到自適應擴縮容?

雙向環(huán)形鏈表buf其結構類似于一個手串,手串上的珠子就可以當做是一個節(jié)點,每個節(jié)點可以是一個固定大小的數(shù)組

雙向環(huán)形鏈表buf上分別有兩個讀寫指針readCell和writeCell,指向將要進行讀寫操作的cell,負責進行數(shù)據(jù)讀寫

readCell永遠追趕writeCell,當追上時,代表寫滿了,進行擴容操作

擴容操作即在寫指針的后面插入一個新建的空閑cell

當buf中沒有數(shù)據(jù)時,代表此時的流量高峰應該已經(jīng)過去了,應該進行縮容操作

縮容操作修改鏈表指向即可,讓buf恢復原樣,僅保持兩個cell即可,其他cell由于不再被引用,會被GC自動回收

cell上也有兩個讀寫指針r和w,分別負責進行cell上的讀寫,也是r讀指針永遠追趕w寫指針

type cell struct {
	Data     []T   // 數(shù)據(jù)部分
	fullFlag bool  // cell滿的標志
	next     *cell // 指向后一個cellBuffer
	pre      *cell // 指向前一個cellBuffer
	r int // 下一個要讀的指針
	w int // 下一個要下的指針
}
type RingBuffer struct {
	cellCount int // cell 數(shù)量統(tǒng)計
	readCell  *cell // 下一個要讀的cell
	writeCell *cell // 下一個要寫的cell
}

數(shù)據(jù)FIFO原則是如何保證的?

無限緩存Channle內(nèi)部的Goroutine,我們稱其為Worker

當Out channle還沒有滿時并且Buf中沒有數(shù)據(jù)時,Worker將讀取In中數(shù)據(jù),將其放入Out,直到Out滿

當Buf中有數(shù)據(jù)時,無論Out是否滿,都將將In中讀到的數(shù)據(jù),直接寫入到Buf中,目的就是為了保證數(shù)據(jù)的FIFO原則

當cell標記為滿時,就算此cell中已經(jīng)被讀取了一部分數(shù)據(jù)了,此cell在讀取完所有數(shù)據(jù)之前也不能用于寫,目的也是為了保證數(shù)據(jù)的FIFO原則

三.實現(xiàn)

1.雙向環(huán)形鏈表實現(xiàn)

package unlimitSizeChan
import (
	"errors"
	"fmt"
)
var ErrRingIsEmpty = errors.New("ringbuffer is empty")
// CellInitialSize cell的初始容量
var CellInitialSize = 1024
// CellInitialCount 初始化cell數(shù)量
var CellInitialCount = 2
type cell struct {
	Data     []T   // 數(shù)據(jù)部分
	fullFlag bool  // cell滿的標志
	next     *cell // 指向后一個cellBuffer
	pre      *cell // 指向前一個cellBuffer
	r int // 下一個要讀的指針
	w int // 下一個要下的指針
}
type RingBuffer struct {
	cellCount int // cell 數(shù)量統(tǒng)計
	readCell  *cell // 下一個要讀的cell
	writeCell *cell // 下一個要寫的cell
}
// NewRingBuffer 新建一個ringbuffe,包含兩個cell
func NewRingBuffer() *RingBuffer {
	rootCell := &cell{
		Data: make([]T, CellInitialSize),
	}
	lastCell := &cell{
		Data: make([]T, CellInitialSize),
	}
	rootCell.pre = lastCell
	lastCell.pre = rootCell
	rootCell.next = lastCell
	lastCell.next = rootCell
	return &RingBuffer{
		cellCount: CellInitialCount,
		readCell:  rootCell,
		writeCell: rootCell,
	}
}
// Read 讀取數(shù)據(jù)
func (r *RingBuffer) Read() (T, error) {
	// 無數(shù)據(jù)
	if r.IsEmpty() {
		return nil, ErrRingIsEmpty
	}
	// 讀取數(shù)據(jù),并將讀指針向右移動一位
	value := r.readCell.Data[r.readCell.r]
	r.readCell.r++
	// 此cell已經(jīng)讀完
	if r.readCell.r == CellInitialSize {
		// 讀指針歸零,并將該cell狀態(tài)置為非滿
		r.readCell.r = 0
		r.readCell.fullFlag = false
		// 將readCell指向下一個cell
		r.readCell = r.readCell.next
	}
	return value, nil
}
// Pop 讀一個元素,讀完后移動指針
func (r *RingBuffer) Pop() T {
	value, err := r.Read()
	if err != nil {
		panic(err.Error())
	}
	return value
}
// Peek 窺視 讀一個元素,僅讀但不移動指針
func (r *RingBuffer) Peek() T {
	if r.IsEmpty() {
		panic(ErrRingIsEmpty.Error())
	}
	// 僅讀
	value := r.readCell.Data[r.readCell.r]
	return value
}
// Write 寫入數(shù)據(jù)
func (r *RingBuffer) Write(value T) {
	// 在 r.writeCell.w 位置寫入數(shù)據(jù),指針向右移動一位
	r.writeCell.Data[r.writeCell.w] = value
	r.writeCell.w++
	// 當前cell寫滿了
	if r.writeCell.w == CellInitialSize {
		// 指針置0,將該cell標記為已滿,并指向下一個cell
		r.writeCell.w = 0
		r.writeCell.fullFlag = true
		r.writeCell = r.writeCell.next
	}
	// 下一個cell也已滿,擴容
	if r.writeCell.fullFlag == true {
		r.grow()
	}
}
// grow 擴容
func (r *RingBuffer) grow() {
	// 新建一個cell
	newCell := &cell{
		Data: make([]T, CellInitialSize),
	}
	// 總共三個cell,writeCell,preCell,newCell
	// 本來關系: preCell <===> writeCell
	// 現(xiàn)在將newcell插入:preCell <===> newCell <===> writeCell
	pre := r.writeCell.pre
	pre.next = newCell
	newCell.pre = pre
	newCell.next = r.writeCell
	r.writeCell.pre = newCell
	// 將writeCell指向新建的cell
	r.writeCell = r.writeCell.pre
	// cell 數(shù)量加一
	r.cellCount++
}
// IsEmpty 判斷ringbuffer是否為空
func (r *RingBuffer) IsEmpty() bool {
	// readCell和writeCell指向同一個cell,并且該cell的讀寫指針也指向同一個位置,并且cell狀態(tài)為非滿
	if r.readCell == r.writeCell && r.readCell.r == r.readCell.w && r.readCell.fullFlag == false {
		return true
	}
	return false
}
// Capacity ringBuffer容量
func (r *RingBuffer) Capacity() int {
	return r.cellCount * CellInitialSize
}
// Reset 重置為僅指向兩個cell的ring
func (r *RingBuffer) Reset() {
	lastCell := r.readCell.next
	lastCell.w = 0
	lastCell.r = 0
	r.readCell.r = 0
	r.readCell.w = 0
	r.cellCount = CellInitialCount
	lastCell.next = r.readCell
}

2.無限緩存Channle實現(xiàn)

package unlimitSizeChan
import "sync/atomic"
type T interface{}
// UnlimitSizeChan 無限緩存的Channle
type UnlimitSizeChan struct {
	bufCount int64       // 統(tǒng)計元素數(shù)量,原子操作
	In       chan<- T    // 寫入channle
	Out      <-chan T    // 讀取channle
	buffer   *RingBuffer // 自適應擴縮容Buf
}
// Len uc中總共的元素數(shù)量
func (uc UnlimitSizeChan) Len() int {
	return len(uc.In) + uc.BufLen() + len(uc.Out)
}
// BufLen uc的buf中的元素數(shù)量
func (uc UnlimitSizeChan) BufLen() int {
	return int(atomic.LoadInt64(&uc.bufCount))
}
// NewUnlimitSizeChan 新建一個無限緩存的Channle,并指定In和Out大小(In和Out設置得一樣大)
func NewUnlimitSizeChan(initCapacity int) *UnlimitSizeChan {
	return NewUnlitSizeChanSize(initCapacity, initCapacity)
}
// NewUnlitSizeChanSize 新建一個無限緩存的Channle,并指定In和Out大小(In和Out設置得不一樣大)
func NewUnlitSizeChanSize(initInCapacity, initOutCapacity int) *UnlimitSizeChan {
	in := make(chan T, initInCapacity)
	out := make(chan T, initOutCapacity)
	ch := UnlimitSizeChan{In: in, Out: out, buffer: NewRingBuffer()}
	go process(in, out, &ch)
	return &ch
}
// 內(nèi)部Worker Groutine實現(xiàn)
func process(in, out chan T, ch *UnlimitSizeChan) {
	defer close(out) // in 關閉,數(shù)據(jù)讀取后也把out關閉
	// 不斷從in中讀取數(shù)據(jù)放入到out或者ringbuf中
loop:
	for {
		// 第一步:從in中讀取數(shù)據(jù)
		value, ok := <-in
		if !ok {
			// in 關閉了,退出loop
			break loop
		}
		// 第二步:將數(shù)據(jù)存儲到out或者buf中
		if atomic.LoadInt64(&ch.bufCount) > 0 {
			// 當buf中有數(shù)據(jù)時,新數(shù)據(jù)優(yōu)先存放到buf中,確保數(shù)據(jù)FIFO原則
			ch.buffer.Write(value)
			atomic.AddInt64(&ch.bufCount, 1)
		} else {
			// out 沒有滿,數(shù)據(jù)放入out中
			select {
			case out <- value:
				continue
			default:
			}
			// out 滿了,數(shù)據(jù)放入buf中
			ch.buffer.Write(value)
			atomic.AddInt64(&ch.bufCount, 1)
		}
		// 第三步:處理buf,一直嘗試把buf中的數(shù)據(jù)放入到out中,直到buf中沒有數(shù)據(jù)
		for !ch.buffer.IsEmpty() {
			select {
			// 為了避免阻塞in,還要嘗試從in中讀取數(shù)據(jù)
			case val, ok := <-in:
				if !ok {
					// in 關閉了,退出loop
					break loop
				}
				// 因為這個時候out是滿的,新數(shù)據(jù)直接放入buf中
				ch.buffer.Write(val)
				atomic.AddInt64(&ch.bufCount, 1)
			// 將buf中數(shù)據(jù)放入out
			case out <- ch.buffer.Peek():
				ch.buffer.Pop()
				atomic.AddInt64(&ch.bufCount, -1)
				if ch.buffer.IsEmpty() { // 避免內(nèi)存泄露
					ch.buffer.Reset()
					atomic.StoreInt64(&ch.bufCount, 0)
				}
			}
		}
	}
	// in被關閉退出loop后,buf中還有可能有未處理的數(shù)據(jù),將他們?nèi)雘ut中,并重置buf
	for !ch.buffer.IsEmpty() {
		out <- ch.buffer.Pop()
		atomic.AddInt64(&ch.bufCount, -1)
	}
	ch.buffer.Reset()
	atomic.StoreInt64(&ch.bufCount, 0)
}

四.使用

ch := NewUnlimitSizeChan(1000)
// or ch := NewUnlitSizeChanSize(100,200)
go func() {
? ? for ...... {
? ? ? ? ...
? ? ? ? ch.In <- ... // send values
? ? ? ? ...
? ? }
? ? close(ch.In) // close In channel
}()
for v := range ch.Out { // read values
? ? fmt.Println(v)
}

以上就是Golang無限緩存channel的設計與實現(xiàn)解析的詳細內(nèi)容,更多關于Golang無限緩存channel的資料請關注腳本之家其它相關文章!

相關文章

  • Go集成swagger實現(xiàn)在線接口文檔的教程指南

    Go集成swagger實現(xiàn)在線接口文檔的教程指南

    wagger是一個用于設計,構建和文檔化API的開源框架,在Go語言中,Swagger可以幫助后端開發(fā)人員快速創(chuàng)建和定義RESTful API,并提供自動生成接口文檔的功能,所以本文給大家介紹了Go集成swagger實現(xiàn)在線接口文檔的方法,需要的朋友可以參考下
    2024-11-11
  • Golang 語言極簡類型轉換庫cast的使用詳解

    Golang 語言極簡類型轉換庫cast的使用詳解

    本文我們通過 cast.ToString() 函數(shù)的使用,簡單介紹了cast 的使用方法,除此之外,它還支持很多其他類型,在這沒有多多介紹,對Golang 類型轉換庫 cast相關知識感興趣的朋友一起看看吧
    2021-11-11
  • Golang優(yōu)雅關閉channel的方法示例

    Golang優(yōu)雅關閉channel的方法示例

    Goroutine和channel是Go在“并發(fā)”方面兩個核心feature,下面這篇文章主要給大家介紹了關于Golang如何優(yōu)雅關閉channel的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考解決,下面來一起看看吧。
    2017-11-11
  • go語言計算兩個時間的時間差方法

    go語言計算兩個時間的時間差方法

    這篇文章主要介紹了go語言計算兩個時間的時間差方法,涉及Python操作時間的技巧,需要的朋友可以參考下
    2015-03-03
  • Go routine調(diào)度詳解

    Go routine調(diào)度詳解

    這篇文章主要介紹了Go routine調(diào)度詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • 淺析goland等待鎖問題

    淺析goland等待鎖問題

    這篇文章主要介紹了goland等待鎖問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2020-11-11
  • golang中struct和interface的基礎使用教程

    golang中struct和interface的基礎使用教程

    Go不同于一般的面向對象語言,需要我們好好的學習研究,下面這篇文章主要給大家介紹了關于golang中struct和interface的基礎使用的相關資料,需要的朋友可以參考借鑒,下面隨著小編來一起學習學習吧。
    2018-03-03
  • GO語言Defer用法實例分析

    GO語言Defer用法實例分析

    這篇文章主要介紹了GO語言Defer用法,實例分析了Defer的使用技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-02-02
  • go build -tags構建約束試驗示例解析

    go build -tags構建約束試驗示例解析

    這篇文章主要為大家介紹了go build -tags構建約束試驗示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-09-09
  • go benchmark 基準測試詳解

    go benchmark 基準測試詳解

    這篇文章主要介紹了go benchmark 基準測試詳解,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-03-03

最新評論